Snowpark Container Services:SQL 执行

您的应用程序容器可以连接到 Snowflake 并执行 SQL。本主题介绍容器代码如何获取连接到 Snowflake 所需的信息,包括身份验证凭据、服务的数据库和架构上下文,以及用于运行 SQL 语句的仓库。

凭据配置选项

Snowflake 建议应用程序容器在执行 SQL 时,使用 Snowflake 提供的凭据进行身份验证。虽然可以通过外部访问集成 (EAI) 使用其他凭据,但通过 EAI 连接会将服务视为在 Snowflake 外部运行,并经由互联网连接到 Snowflake。

您可以从服务容器通过以下三种选项连接到 Snowflake:

  • 使用 Snowflake 提供的服务用户凭据: Snowflake 为每个服务提供凭据,称为服务凭据。服务使用这些凭据以“服务用户”身份连接到 Snowflake。

  • 使用 Snowflake 提供的调用方凭据(调用方权限): 当您使用调用方权限配置服务时,Snowflake 还会为服务提供凭据,以便以“调用用户”身份连接到 Snowflake。

  • 使用其他凭据: 在这种情况下,您使用外部访问集成 (EAI),允许您的服务通过有效的身份验证凭据连接到 Snowflake 的互联网端点。此选项需要管理员创建 EAI,然后将该集成的 USAGE 权限授予服务所有者角色。

    备注

    如果您使用外部访问集成访问 Snowflake,您可能会通过互联网发送潜在的敏感信息。

有关使用各种 Snowflake 驱动程序连接到 Snowflake 的代码示例,请参阅 Snowflake 连接示例 (https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/spcs/sf-connection)。

使用 Snowflake 提供的服务用户凭据

使用 Snowflake 提供的服务凭据时,请注意以下影响:

  • Snowflake 中的每个对象都有一个 所有者角色,即用于创建该对象的角色。服务的所有者角色决定了该服务在与 Snowflake 交互时所拥有的权限。这些权限包括执行 SQL、访问暂存区以及执行服务间网络通信。

  • 当您创建服务时,Snowflake 还会创建一个该服务专用的服务用户。该服务用户仅能访问两个角色:服务所有者角色和“PUBLIC”角色。服务用户的默认角色为服务所有者角色。

当您启动服务(包括作业服务)时,Snowflake 会执行多项操作。在每个应用程序容器中,Snowflake 允许容器代码使用驱动程序连接到 Snowflake 并执行 SQL,这与计算机上连接到 Snowflake 的任何其他代码类似。以下列表显示了 Snowflake 在您启动服务时执行的操作:

  • 在容器中名为 /snowflake/session/token 的文件中提供凭据(OAuth 令牌)。容器代码使用这些凭据作为“服务用户”进行身份验证。此 OAuth 令牌无法在 Snowpark Container Services 之外使用。

  • 设置以下环境变量,以便您在服务代码中配置 Snowflake 客户端:

    • SNOWFLAKE_ACCOUNT:选择使用 时默认使用的角色和仓库。此变量设置为当前运行该服务的 Snowflake 账户的 账户定位器

    • SNOWFLAKE_HOST:选择使用 时默认使用的角色和仓库。此变量提供用于连接到 Snowflake 的主机名。

当您以服务用户身份创建与 Snowflake 的连接时,容器代码必须使用 SNOWFLAKE_HOST、SNOWFLAKE_ACCOUNT 以及 OAuth 令牌。如果不配合使用 SNOWFLAKE_HOST,则无法使用 OAuth 令牌。

示例

教程 2 <tutorials/tutorial-2>`(请参阅 :file:`main.py)中,代码读取环境变量,如以下示例所示:

SNOWFLAKE_ACCOUNT = os.getenv('SNOWFLAKE_ACCOUNT')
SNOWFLAKE_HOST = os.getenv('SNOWFLAKE_HOST')
Copy

代码将这些变量传递给所选 Snowflake 客户端的连接创建代码。容器使用这些凭据创建一个新会话,并将该服务的“所有者角色”作为会话的主角色来运行查询。以下示例显示了在 Python 中创建 Snowflake 连接所需的最简代码:

def get_login_token():
  with open('/snowflake/session/token', 'r') as f:
    return f.read()

conn = snowflake.connector.connect(
  host = os.getenv('SNOWFLAKE_HOST'),
  account = os.getenv('SNOWFLAKE_ACCOUNT'),
  token = get_login_token(),
  authenticator = 'oauth'
)
Copy

请注意关于此 OAuth 令牌的以下详细信息:

  • Snowflake 每隔几分钟就会刷新 /snowflake/session/token 文件的内容。每个令牌的有效期最长为一小时。容器成功连接到 Snowflake 后,过期时间不再适用于该连接,这与用户直接创建的任何会话情况相同。

  • 此 OAuth 令牌仅在特定的 Snowflake 服务中有效。您无法复制 OAuth 令牌并在该服务之外使用。

  • 如果您使用 OAuth 令牌进行连接,它会创建一个新会话。OAuth 令牌不与任何现有 SQL 会话关联。

    备注

    执行存储过程与执行服务之间的一个显著区别是:存储过程与运行该过程的 SQL 在同一个会话中运行。但容器每次建立新连接时,都会创建一个新会话。

要查看特定服务用户发出的查询,您可以使用 ACCOUNTADMIN 角色来查看 查询历史记录。服务用户的用户名以下列形式显示:

  • 对于 8.35 服务器版本之前创建的服务,服务用户名的格式为 SF$SERVICE$unique-id

  • 对于 8.35 服务器版本之后创建的服务,服务用户名与服务名称相同。

备注

服务的“所有者角色”是指创建该服务的角色。您可以定义一个或多个“服务角色”,以管理对服务所公开端点的访问。有关更多信息,请参阅 管理与服务相关的权限

关于使用 Snowflake 提供的调用方凭据(调用方权限)

在某些应用场景中,您可能需要使用最终用户的上下文(而非上一节所述的服务用户上下文)来执行查询。在此上下文中需使用调用方的权限功能。

例如,假设您创建了一个服务,为某个 Web 应用程序公开公共端点,该程序用于显示使用 Snowflake 所存数据的仪表板。您通过授予 Snowflake 账户中的其他用户 服务角色,来允许其访问该仪表板。当用户登录时,仪表板应仅显示该用户有权访问的数据。

然而,由于容器默认使用“服务用户”和“服务所有者角色”执行查询,因此无论哪个最终用户连接到端点,仪表板都会显示“服务所有者角色”有权访问的数据。结果就是,仪表板显示的内容不限于最终用户有权访问的数据,从而导致登录用户能看到其不应访问的数据。

为了限制仪表板仅显示登录用户可访问的数据,应用程序容器必须使用授予该最终用户的权限来执行 SQL。您可以通过在应用程序中使用调用方权限来启用此功能。

备注

  • 仅在使用网络入口 访问服务 时支持调用方权限功能。使用服务函数访问服务时,此功能不可用。

  • |native-app|(使用容器的应用程序)当前不支持调用方权限功能。

为您的服务配置调用方权限

为应用程序配置调用方权限分为两个步骤。

  1. 服务规范 中,将 executeAsCaller 设置为 true,如下面的规范片段所示:

    spec:
      containers:
      ...
    capabilities:
      securityContext:
        executeAsCaller: true
    
    Copy

    此设置会告知 Snowflake 该应用打算使用调用方权限,并使 Snowflake 在将每个传入请求发送到应用容器之前,先在请求中插入 Sf-Context-Current-User-Token 标头。此用户令牌有助于以调用用户的身份执行查询。如果未指定,则 executeAsCaller 默认为 false

    指定 executeAsCaller 选项不会影响服务以“服务用户”身份和“服务所有者角色”执行查询的能力。启用 executeAsCaller 后,服务可以选择以调用用户和服务用户身份连接到 Snowflake。

  2. 要代表调用用户建立 Snowflake 连接,请更新应用代码以创建一个登录令牌,该令牌需同时包含 Snowflake 提供给服务的 OAuth 令牌 以及来自 Sf-Context-Current-User-Token 标头的用户令牌。

    登录令牌必须遵循以下格式:<service-oauth-token>.<Sf-Context-Current-User-Token>

    以下 Python 代码片段演示了此更新:

    # Environment variables below will be automatically populated by Snowflake.
    SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
    SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
    
    def get_login_token():
        with open("/snowflake/session/token", "r") as f:
            return f.read()
    
    def get_connection_params(ingress_user_token = None):
        # start a Snowflake session as ingress user
        # (if user token header provided)
        if ingress_user_token:
            logger.info("Creating a session on behalf of the current user.")
            token = get_login_token() + "." + ingress_user_token
        else:
            logger.info("Creating a session as the service user.")
            token = get_login_token()
    
        return {
            "account": SNOWFLAKE_ACCOUNT,
            "host": SNOWFLAKE_HOST,
            "authenticator": "oauth",
            "token": token
        }
    
    def run_query(request, query):
        ingress_user_token = request.headers.get('Sf-Context-Current-User-Token')
        # ingress_user_token is None if header not present
        connection_params = get_connection_params(ingress_user_token)
        with Session.builder.configs(connection_params).create() as session:
          # use the session to execute a query.
    
    Copy

在上面的例子中:

  • get_login_token 函数会读取 Snowflake 在其中复制 OAuth 令牌供容器使用的文件。

  • get_connection_params 函数通过级联 OAuth 令牌和来自 Sf-Context-Current-User-Token 标头的用户令牌来构建令牌。该函数将此令牌包含在应用程序用来连接到 Snowflake 的参数字典中。

备注

当服务使用调用方的权限时,它可以作为多个用户连接到 Snowflake。您负责管理对非 Snowflake 管理的资源的访问权限。

例如,在 Streamlit 应用程序中,st.connection 对象通过在全局状态中使用 st.cache_resource 来自动缓存连接,使其可以在由不同用户启动的 Streamlit 会话之间进行访问。使用调用方权限时,请考虑使用 st.session_state 按会话存储连接,以避免在用户之间共享连接。

有关包含分步说明的示例,请参阅 使用已启用的调用方权限创建服务

访问已配置调用方权限的服务

配置调用方权限 意味着您的服务代表调用方建立 Snowflake 连接。登录服务入口端点的方式(无论是通过编程方式还是使用浏览器)保持不变。登录后,以下行为和选项适用:

  • 使用浏览器访问公共端点: 登录到端点后,服务将代表调用用户,使用该用户的默认角色建立与 Snowflake 的连接。如果没有为用户配置默认角色,则使用 PUBLIC 角色。

  • 以编程方式访问公共端点: 当使用 JWT 令牌 以编程方式登录到端点 时,可以选择设置 scope 参数以指定要激活的角色

目前,在服务代表调用方与 Snowflake 建立调用方权限连接后,不支持切换角色。如果您的应用程序需要使用不同的角色来访问不同的对象,则必须更改用户的默认次要角色属性。

  • 要将用户设置为默认激活所有次要角色,请使用 ALTER USER 命令将用户的 DEFAULT_SECONDARY_ROLES 属性设置为 ('ALL'),如以下示例所示:

ALTER USER my_user SET DEFAULT_SECONDARY_ROLES = ( 'ALL' );
Copy

管理服务的调用方授权

当服务创建调用方权限会话时,该会话以“调用用户”身份运行,而非“服务用户”。当使用此会话执行操作时,Snowflake 会按顺序应用两项权限检查:

  1. 第一项权限检查与用户直接创建会话时的检查相同。此检查属于 Snowflake 为用户执行的常规权限检查。

  2. 第二项权限检查验证是否允许服务代表用户执行该操作。Snowflake 通过确保服务的所有者角色已被授予必要的调用方权限来验证这一点。

在调用方权限会话中,正常权限检查和服务所有者角色的 调用方授权 检查都必须允许操作;这称为 受限调用方权限。默认情况下,服务无权代表用户执行任何操作。您必须明确为服务授予调用方权限,以便它能够以调用方权限运行。

例如,假设用户 U1 使用角色 R1,该角色在表 T1 上具有 SELECT 权限。当 U1 登录到服务的公共端点 (example_service) 时(该端点配置为使用调用方权限),服务将代表 U1 与 Snowflake 建立连接。

要允许服务代表 U1 查询表 T1,您需要为服务的所有者角色授予以下权限:

  • 解析表名称的权限,方法是授予调用方权限,允许服务以该表的数据库和架构的 USAGE 权限运行。

  • 使用仓库执行查询的权限,方法是授予调用方权限,允许服务以仓库的 USAGE 权限运行。

  • 通过授予调用者权限来查询表的权限,该权限允许服务以表 T1 上的 SELECT 权限运行。

以下示例演示如何向服务的所有者角色授予以下权限:

-- Permissions to resolve the table's name.
GRANT CALLER USAGE ON DATABASE <db_name> TO ROLE <service_owner_role>;
GRANT CALLER USAGE ON SCHEMA <schema_name> TO ROLE <service_owner_role>;
-- Permissions to use a warehouse
GRANT CALLER USAGE ON WAREHOUSE <warehouse_name> TO ROLE <service_owner_role>;
-- Permissions to query the table.
GRANT CALLER SELECT ON TABLE T1 TO ROLE <service_owner_role>;
Copy

您账户中任何具有全局 MANAGE CALLER GRANT 权限的角色都可以授予调用方权限。有关调用方授权的详细信息,请参阅 GRANT CALLER受限的调用方权限

示例

文中提供了一个服务示例,演示了在代表用户执行 SQL 查询时如何使用调用方权限功能。有关更多信息,请参阅 使用已启用的调用方权限创建服务

使用其他凭据连接到 Snowflake

您可以使用其他形式的身份验证来连接到 Snowflake,而不仅仅使用 Snowflake 提供的 OAuth 令牌。为此,您需要创建外部访问集成 (EAI),使您的容器能够连接到 Snowflake,就好像容器在 Snowflake 之外运行并通过互联网进行连接一样。 当您以这种方式连接时,无需配置客户端使用的主机。

备注

由于这些连接需经过 EAI,Snowflake 身份验证会同时强制执行网络策略。如果您的业务需要网络策略,则不支持使用其他凭据进行连接。

例如,以下连接指定了用于身份验证的用户名和密码:

conn = snowflake.connector.connect(
  account = '<acct-name>',
  user = '<user-name>',
  password = '<password>'
)
Copy

要使用默认主机名,您需要通过网络规则进行外部访问集成,该规则允许从您的服务访问您账户的 Snowflake 互联网主机名。例如,如果您的账户名是组织 MYACCOUNT 中的 MYORG,则主机名为 myorg-myaccount.snowflakecomputing.cn。有关更多信息,请参阅 配置网络出口。不支持 Privatelink 主机名

  • 创建与您账户的 Snowflake API 主机名匹配的网络规则:

    CREATE OR REPLACE NETWORK RULE snowflake_egress_access
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('myorg-myaccount.snowflakecomputing.cn');
    
    Copy
  • 创建使用上述网络规则的集成:

    CREATE EXTERNAL ACCESS INTEGRATION snowflake_egress_access_integration
      ALLOWED_NETWORK_RULES = (snowflake_egress_access)
      ENABLED = TRUE;
    
    Copy

配置用于执行 SQL 的数据库和架构上下文

除了提供凭据外,Snowflake 还提供创建该服务时所在的数据库和架构上下文。容器代码可以使用此信息,在与服务相同的数据库和架构上下文中执行 SQL。

本节介绍两个概念:

  • Snowflake 用于确定要在其中创建服务的数据库和架构的逻辑。

  • Snowflake 将此信息传达给容器,从而使容器代码能够在同一数据库和架构上下文中执行 SQL 的方法。

Snowflake 使用服务名称来确定要在其中创建服务的数据库和架构:

  • 示例 1:在以下 CREATE SERVICE 和 EXECUTE JOB SERVICE 命令中,服务名称不会显式指定数据库和架构名称。Snowflake 在当前数据库和架构中创建服务和作业服务。

    -- Create a service.
    CREATE SERVICE test_service IN COMPUTE POOL ...
    
    -- Execute a job service.
    EXECUTE JOB SERVICE
      IN COMPUTE POOL tutorial_compute_pool
      NAME = example_job_service ...
    
    Copy
  • 示例 2:在以下 CREATE SERVICE 和 EXECUTE JOB SERVICE 命令中,服务名称包括数据库和架构名称。Snowflake 在指定的数据库 (test_db) 和架构 (test_schema) 中创建服务和作业服务,无论当前架构如何。

    -- Create a service.
    CREATE SERVICE test_db.test_schema.test_service IN COMPUTE POOL ...
    
    -- Execute a job service.
    EXECUTE JOB SERVICE
      IN COMPUTE POOL tutorial_compute_pool
      NAME = test_db.test_schema.example_job_service ...
    
    Copy

当 Snowflake 启动服务时,它会使用以下环境变量向正在运行的容器提供数据库和架构信息:

  • SNOWFLAKE_DATABASE

  • SNOWFLAKE_SCHEMA

容器代码可以使用连接代码中的环境变量来确定要使用的数据库和架构,如以下示例所示:

conn = snowflake.connector.connect(
  host = os.getenv('SNOWFLAKE_HOST'),
  account = os.getenv('SNOWFLAKE_ACCOUNT'),
  token = get_login_token(),
  authenticator = 'oauth',
  database = os.getenv('SNOWFLAKE_DATABASE'),
  schema = os.getenv('SNOWFLAKE_SCHEMA')
)
Copy

示例

教程 2 中,您可以创建与 Snowflake 连接并执行 SQL 语句的 Snowflake 作业服务。以下步骤总结了教程代码如何使用环境变量:

  1. 在通用设置中(请参阅 通用设置 部分),您可以创建资源,包括数据库和架构。您还可以为会话设置当前数据库和架构:

    USE DATABASE tutorial_db;
    ...
    USE SCHEMA data_schema;
    
    Copy
  2. 创建作业服务(通过运行 EXECUTE JOB SERVICE)后,Snowflake 启动容器,并将容器中的以下环境变量设置为会话的当前数据库和架构:

    • 将 SNOWFLAKE_DATABASE 设置为“TUTORIAL_DB”

    • 将 SNOWFLAKE_SCHEMA 设置为“DATA_SCHEMA”

  3. 作业代码(请参阅教程 2 中的 main.py)读取以下环境变量:

    SNOWFLAKE_DATABASE = os.getenv('SNOWFLAKE_DATABASE')
    SNOWFLAKE_SCHEMA = os.getenv('SNOWFLAKE_SCHEMA')
    
    Copy
  4. 作业代码将数据库和架构设置为执行 SQL 语句(main.py 中的 run_job() 函数)的上下文:

    {
       "account": SNOWFLAKE_ACCOUNT,
       "host": SNOWFLAKE_HOST,
       "authenticator": "oauth",
       "token": get_login_token(),
       "warehouse": SNOWFLAKE_WAREHOUSE,
       "database": SNOWFLAKE_DATABASE,
       "schema": SNOWFLAKE_SCHEMA
    }
    ...
    
    Copy

    备注

    SNOWFLAKE_ACCOUNT、SNOWFLAKE_HOST、SNOWFLAKE_DATABASE、SNOWFLAKE_SCHEMA 是 Snowflake 为应用程序容器生成的环境变量,但 SNOWFLAKE_WAREHOUSE 不是(教程 2 应用程序代码创建了此变量,因为 Snowflake 不会将仓库名称传递给容器)。

为容器指定仓库

如果服务连接到 Snowflake,以在 Snowflake 仓库中执行查询,则您可以使用以下选项来指定仓库:

  • 在应用程序代码中指定仓库。 在启动 Snowflake 会话以在代码中运行查询时,在连接配置中指定仓库。有关示例,请参阅 教程 2

  • 在创建服务时指定默认仓库。CREATE SERVICEEXECUTE JOB SERVICE 命令中指定可选的 QUERY_WAREHOUSE 参数以提供默认仓库。如果您的应用程序代码未在连接配置中提供仓库,Snowflake 将使用默认仓库。如果应用程序代码未在连接配置中提供仓库,则 Snowflake 将使用默认仓库。使用 ALTER SERVICE 命令更改默认仓库。

    备注

    使用 QUERY_WAREHOUSE 参数指定的仓库仅作为 服务用户 的默认仓库。当服务代表另一个用户连接到 Snowflake 时(在 调用方权限场景 中),Snowflake 会使用该用户的默认仓库。

如果同时通过这两种方法指定了仓库,则将使用应用程序代码中指定的仓库。

访问服务用户查询历史记录

您可以通过筛选 QUERY_HISTORY 视图QUERY_HISTORY 函数(其中 user_type 为 SNOWFLAKE_SERVICE)来查找由您的服务以服务用户身份执行的查询。

示例 1: 提取一项服务运行的查询。

SELECT *
FROM snowflake.account_usage.query_history
WHERE user_type = 'SNOWFLAKE_SERVICE'
AND user_name = '<service_name>'
AND user_database_name = '<service_db_name>'
AND user_schema_name = '<service_schema_name>'
order by start_time;
Copy

在 WHERE 子句中:

  • user_name = '<service_name>':您将服务名称指定为用户名,是因为服务以 服务用户 的身份执行查询,且服务用户的名称与服务名称相同。

  • user_type = 'SNOWFLAKE_SERVICE'user_name = '<service_name>':这样会限制查询结果,即仅检索一项服务执行的查询。

  • user_database_nameuser_schema_name:对于服务用户而言,这些名称是服务的数据库和架构。

调用 QUERY_HISTORY 函数可以得到相同的结果。

SELECT *
FROM TABLE(<service_db_name>.information_schema.query_history())
WHERE user_database_name = '<service_db_name>'
AND user_schema_name = '<service_schema_name>'
AND user_type = 'SNOWFLAKE_SERVICE'
AND user_name = '<service_name>'
order by start_time;
Copy

在 WHERE 子句中:

  • user_type = 'SNOWFLAKE_SERVICE'user_name = '<service_name>' 会限制查询结果,仅检索一项服务执行的查询。

  • 对于服务用户而言,user_database_nameuser_schema_name 名称是服务的数据库和架构。

示例 2: 获取多项服务运行的查询和相应的服务信息。

SELECT query_history.*, services.*
FROM snowflake.account_usage.query_history
JOIN snowflake.account_usage.services
ON query_history.user_name = services.service_name
AND query_history.user_schema_id = services.service_schema_id
AND query_history.user_type = 'SNOWFLAKE_SERVICE'
Copy

该查询联接 QUERY_HISTORY 和 SERVICES 视图,以检索与查询和执行查询的服务相关的信息。请注意以下事项:

  • 对于由服务运行的查询,query_history.user_name 是服务用户的名称,与服务名称相同。

  • 该查询使用架构 IDs(而非架构名称)联接视图,以确保您引用相同的架构,因为如果删除并重新创建架构,架构 ID 会发生变化,但名称保持不变。

您可以向查询添加可选的筛选器。例如:

  • 筛选 query_history,从而仅检索执行特定查询的服务。

  • 筛选 services,从而仅检索特定服务执行的查询。

示例 3: 获取每项服务的服务用户信息。

SELECT services.*, users.*
FROM snowflake.account_usage.users
JOIN snowflake.account_usage.services
ON users.name = services.service_name
AND users.schema_id = services.service_schema_id
AND users.type = 'SNOWFLAKE_SERVICE'
Copy

查询在 ACCOUNT_USAGE 架构中联接 SERVICES 和 USERS 视图,以检索服务和服务用户信息。请注意以下事项:

  • 当服务运行查询时,它会以服务用户身份运行查询,服务用户名与服务名称相同。因此,您可以指定联接条件:users.name = services.service_name

  • 服务名称仅在一个架构内唯一。因此,该查询指定联接条件 (users.schema_id = services.service_schema_id),以确保每个服务用户均与其所属的特定服务(而非在不同架构中运行的任何其他同名服务)相匹配。

语言: 中文