创建使用调用方权限的 Snowpark Container Services 服务¶
简介¶
在本教程中,您将探索构建一项服务,呈现一个 Web UI,该服务在代表用户执行 SQL 查询时使用调用方权限功能。
您创建一项服务(名为 query_service
),用于执行请求中提供的查询。默认情况下,应用程序容器使用服务的所有者角色以服务用户身份连接到 Snowflake。但此应用程序使用调用方权限功能,以最终用户的身份连接到服务端点,并使用授予该用户的权限。
测试时,您从 Web 浏览器使用该服务,因为只有在使用网络入口访问服务时,才支持调用方权限功能。使用服务函数访问服务时,调用方权限功能不可用。
该服务执行以下操作:
显示一个公共端点。
用户登录到端点时,服务会提供 Web UI 以提供查询。该服务在 Snowflake 中执行查询并返回结果。在本教程中,您将执行以下 SQL 命令:
SELECT CURRENT_USER(), CURRENT_ROLE();
该命令返回当前登录用户和当前活动角色的名称,这两个名称都取决于是否使用了调用方权限。
当使用调用方权限时,服务以调用用户和用户的默认角色连接到 Snowflake。该命令返回您的用户名和默认角色。
不使用调用方权限时,默认行为将在服务连接到 Snowflake 时启动,Snowflake 是服务用户和服务所有者角色。因此,该命令返回服务用户名,格式为:
SF$SERVICE$unique-id
、TEST_ROLE
。
本教程分为两个部分:
第 1 部分:创建并测试服务。 您将下载为本教程提供的代码,并按照分步说明进行操作:
下载为本教程提供的服务代码。
为 Snowpark Container Services 构建 Docker 镜像,并将该镜像上传到账户的仓库中。
创建服务。
通过网络入口与服务通信,以连接到服务公开的公共端点。使用 Web 浏览器登录到公共端点并执行 SELECT CURRENT_USER(); 命令。验证命令输出,确保容器以登录用户身份执行命令。
第 2 部分:了解服务。本节概述了服务代码,并重点介绍了应用程序代码如何使用调用方权限。
准备¶
按照 通用设置 配置先决条件,并创建本文档中提供的所有 Snowpark Container Services 教程所需的 Snowflake 资源。
下载服务代码¶
提供用于创建查询服务的代码(Python 应用程序)。
下载:download:
SnowparkContainerServices -Tutorials.zip </samples/spcs/SnowparkContainerServices-Tutorials.zip>
。解压内容,其中包含每个教程的一个目录。
Tutorial-6-callers-rights
目录包含以下文件:Dockerfile
main.py
templates/basic_ui.html
构建镜像并上传¶
为 Snowpark Container Services 支持的 linux/amd64 平台构建镜像,然后将镜像上传到账户中的镜像仓库(请参阅 通用设置)。
您将需要有关仓库(仓库 URL 和注册表主机名)的信息,才能构建和上传镜像。有关更多信息,请参阅 注册表和镜像仓库。
获取有关仓库的信息
要获取镜像仓库 URL,请执行 SHOW IMAGE REPOSITORIES SQL 命令。
SHOW IMAGE REPOSITORIES;
输出中的
repository_url
列提供 URL。示例如下:<orgname>-<acctname>.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository
仓库 URL 中的主机名是注册表主机名。示例如下:
<orgname>-<acctname>.registry.snowflakecomputing.cn
构建镜像并将其上传到仓库
打开终端窗口,然后切换到包含解压文件的目录。
要构建 Docker 镜像,请使用 Docker CLI 执行以下
docker build
命令。请注意,该命令指定当前工作目录 (.
),作为用于构建镜像的文件的PATH
。docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
对于
image_name
,请使用query_service:latest
。
示例
docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository/query_service:latest .
将镜像上传到 Snowflake 账户中的仓库。要使 Docker 代表您将镜像上传到存储库,您必须首先使用 Snowflake 对 Docker 进行身份验证。
要使用 Snowflake 注册表对 Docker 进行身份验证,请执行以下命令:
docker login <registry_hostname> -u <username>
对于
username
,请指定您的 Snowflake 用户名。Docker 将提示您输入密码。
要上传镜像,请执行以下命令:
docker push <repository_url>/<image_name>
示例
docker push myorg-myacct.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository/query_service:latest
创建服务¶
在本节中,您将创建一项服务 (query_service)。
验证计算池是否已准备就绪,以及您是否处于创建服务的正确上下文中。
之前,您在 通用设置 步骤中设置了上下文。要确保您处于此步骤中 SQL 语句的正确上下文中,请执行以下步骤:
USE ROLE test_role; USE DATABASE tutorial_db; USE SCHEMA data_schema; USE WAREHOUSE tutorial_warehouse;
要确保您在 通用设置 中创建的计算池已准备就绪,请执行
DESCRIBE COMPUTE POOL
,并且验证state
是ACTIVE
,还是IDLE
。如果state
是STARTING
,您需要等到state
更改为ACTIVE
或IDLE
。
DESCRIBE COMPUTE POOL tutorial_compute_pool;
要创建服务,请使用
test_role
执行以下命令:CREATE SERVICE query_service IN COMPUTE POOL tutorial_compute_pool FROM SPECIFICATION $$ spec: containers: - name: main image: /tutorial_db/data_schema/tutorial_repository/query_service:latest env: SERVER_PORT: 8000 readinessProbe: port: 8000 path: /healthcheck endpoints: - name: execute port: 8000 public: true capabilities: securityContext: executeAsCaller: true serviceRoles: - name: ui_usage endpoints: - execute $$;
备注
如果已存在具有该名称的服务,请使用 DROP SERVICE 命令删除之前创建的服务,然后再创建此服务。
执行以下 SQL 命令,获取有关您刚刚创建的服务的详细信息。有关更多信息,请参阅 Snowpark Container Services:使用服务。
要列出账户中的服务,请执行 SHOW SERVICES 命令:
SHOW SERVICES;
要获取服务的状态,请执行 SHOW SERVICE CONTAINERS IN SERVICE 命令:
SHOW SERVICE CONTAINERS IN SERVICE query_service;
要获取服务的相关信息,请执行 DESCRIBE SERVICE 命令:
DESCRIBE SERVICE query_service;
使用服务¶
在本节中,您将验证为服务配置的 调用方权限 是否有效。您从浏览器登录到公共端点,执行查询,并验证服务创建的 Snowflake 会话是以调用用户的身份运行,而不是以服务用户的身份运行。
首先,要为本节中的 SQL 语句设置上下文,请执行以下命令:
USE ROLE test_role;
USE DATABASE tutorial_db;
USE SCHEMA data_schema;
USE WAREHOUSE tutorial_warehouse;
该服务公开了一个公共端点(请参阅 CREATE SERVICE 命令中提供的内联规范);因此,首先使用 Web 浏览器登录到该端点,然后使用服务向互联网公开的 Web UI 向服务端点发送查询请求。
找到服务公开的公共端点的 URL:
SHOW ENDPOINTS IN SERVICE query_service;
响应中的
ingress_url
列提供 URL。示例
p6bye-myorg-myacct.snowflakecomputing.app
将
/ui
追加到端点 URL,并将其粘贴到 Web 浏览器中。这样会导致服务执行ui()
函数(请参阅main.py
)。请注意,首次访问端点 URL 时,系统将要求您登录 Snowflake。
使用用于创建服务的同一用户。登录成功后,服务将显示以下 Web UI。
在文本框中输入以下命令,然后按回车键查看结果。
SELECT CURRENT_USER(), CURRENT_ROLE()DONE;
因为您在服务规范中包含了
executeAsCaller
功能,所以当请求到达时,Snowflake 会在请求中插入Sf-Context-Current-User-Token
标头,然后将请求转发到您的服务端点。为了便于说明,本教程中的服务代码以调用方和服务用户的身份执行查询。
代表调用方(入口用户)执行查询: 在这种情况下,代码使用 Snowflake 提供的用户令牌来构建登录令牌,以便与 Snowflake 连接。因此,服务使用调用方权限。Snowflake 代表调用方执行查询,在查询结果中显示调用方名称和活动角色名称。例如:
['TESTUSER, PUBLIC']
代表服务用户执行查询:在这种情况下,代码在构建登录令牌以连接 Snowflake 时,不会使用 Snowflake 在请求中提供的用户令牌。因此,服务不会利用调用方权限,导致 Snowflake 代表服务用户执行查询。查询结果显示服务用户的名称(与服务名称相同)和活动角色。
['QUERY_SERVICE, TEST_ROLE']
当服务代表调用方执行查询 (SELECT CURRENT_USER(), CURRENT_ROLE();
) 时,Snowflake 不需要用户的仓库来执行这一简单的查询。因此,该服务不需要任何 调用方权限。在下一节中,服务代表调用用户执行非简单查询,该查询要求您向服务授予 调用方权限。
备注
您可以通过编程方式访问入口端点。有关示例代码,请参阅 来自 Snowflake 外部的公共端点访问及身份验证。请注意,您需要将 /ui
追加到代码中的端点 URL,以便 Snowflake 可以将请求路由到服务代码中的 ui()
函数。
将服务与调用方权限一起使用¶
在本节中,服务代表调用方(登录服务入口端点的用户)执行以下查询。
SELECT * FROM ingress_user_db.ingress_user_schema.ingress_user_table;
服务没有访问表的权限,也没有在默认仓库中运行查询的权限。要使服务能够代表调用方执行此查询,您需要向服务授予所需的 调用方权限。
为了演示场景,您创建一个新角色 (ingress_user_role
) 和一个表 (ingress_user_table
),新角色可访问该表,但服务的所有者角色 (test_role
) 无法访问该表。因此,当服务尝试使用服务凭据执行查询时,Snowflake 返回错误。但是当服务代表用户执行查询时,Snowflake 执行查询并返回结果。
创建角色和资源¶
创建只有此角色才能访问的角色 (
ingress_user_role
) 和数据库 (ingress_user_db
)。然后将此角色授予用户,以便用户登录到服务的公共端点并查询此表。USE ROLE accountadmin; CREATE ROLE ingress_user_role; GRANT ROLE ingress_user_role TO USER <your_user_name>; GRANT USAGE ON WAREHOUSE tutorial_warehouse TO ROLE ingress_user_role; CREATE DATABASE IF NOT EXISTS ingress_user_db; GRANT OWNERSHIP ON DATABASE ingress_user_db TO ROLE ingress_user_role COPY CURRENT GRANTS;
创建只有
ingress_user_role
角色才能访问的表 (ingress_user_table
)。USE ROLE ingress_user_role; CREATE SCHEMA IF NOT EXISTS ingress_user_db.ingress_user_schema; USE WAREHOUSE tutorial_warehouse; CREATE TABLE ingress_user_db.ingress_user_schema.ingress_user_table (col string) AS ( SELECT 'this table is only accessible to the ingress_user_role' );
请注意,当服务尝试代表调用方查询表时,服务仅作为 ``test_role``(创建服务时使用的角色,即服务所有者角色)运行。此角色没有访问用户表的权限。
将调用方权限授予服务的所有者角色 (
test_role
),以查询ingress_user_db
数据库中的表。此权限仅允许服务在以下情况下查询此数据库中的表:服务正在使用 调用方权限会话。
在会话中,调用方还具有执行这些查询的权限。
USE ROLE accountadmin; GRANT CALLER USAGE ON DATABASE ingress_user_db TO ROLE test_role; GRANT INHERITED CALLER USAGE ON ALL SCHEMAS IN DATABASE ingress_user_db TO ROLE test_role; GRANT INHERITED CALLER SELECT ON ALL TABLES IN DATABASE ingress_user_db TO ROLE test_role; GRANT CALLER USAGE ON WAREHOUSE tutorial_warehouse TO ROLE test_role; SHOW CALLER GRANTS TO ROLE test_role;
配置默认仓库和默认辅助角色。
为用户创建会话时,Snowflake 将激活登录用户的默认主角色、默认次要角色和默认仓库。在本教程中:
您将
DEFAULT_SECONDARY_ROLES
设置为 ALL,以便在为当前用户创建会话时,Snowflake 会将当前次要角色设置为授予该用户的所有角色。您还可以将默认仓库设置为
tutorial_warehouse
,以便在其中执行ingress_user_table
查询。
ALTER USER SET DEFAULT_SECONDARY_ROLES = ('ALL'); ALTER USER SET DEFAULT_WAREHOUSE = TUTORIAL_WAREHOUSE;
请注意以下事项:
在本教程中,您将登录到服务的公共端点。用户具有
test_role
作为主要角色,ingress_user_role
作为次要角色。这使得会话可以执行ingress_user_role
允许的任何操作。默认角色和默认仓库仅影响服务代表您的用户建立会话时激活的角色和仓库。调用方权限会话建立后,您不能更改角色,但可以更改仓库。
使用该服务并测试调用方权限¶
找到服务公开的公共端点的 URL:
SHOW ENDPOINTS IN SERVICE tutorial_db.data_schema.query_service;
响应中的
ingress_url
列提供 URL。示例
p6bye-myorg-myacct.snowflakecomputing.app
将
/ui
追加到端点 URL,并将其粘贴到 Web 浏览器中。这会导致服务执行ui()
函数(请参阅echo_service.py
)。请注意,首次访问端点 URL 时,系统将要求您登录到 Snowflake。对于此测试,请使用用于创建服务的同一用户,以确保用户具有必要的权限。使用用于创建服务的同一用户。登录成功后,服务将显示以下 Web UI。
在文本框中输入以下命令,然后按回车键查看结果。
SELECT * FROM ingress_user_db.ingress_user_schema.ingress_user_table;
为了便于说明,本教程中的服务代码以调用方和服务用户的身份执行查询。
代表调用方(入口用户)执行查询: 在这种情况下,代码使用 Snowflake 提供的用户令牌来构建登录令牌,以便与 Snowflake 连接。因此,服务使用调用方权限。Snowflake 代表调用方执行查询。由于调用方使用的是具有查询
ingress_user_table
表权限的ingress_user_role role
,查询结果返回了一行数据:['this table is only accessible to ingress_user_role']
代表服务用户执行查询: 在这种情况下,代码在构建登录令牌以连接 Snowflake 时,不会使用 Snowflake 在请求中提供的用户令牌。因此,Snowflake 代表服务用户执行查询。因为服务所有者使用默认
test_role
,而默认角色没有查询表的权限,所以您会看到一个错误:Encountered an error when executing query:... SQL compilation error: Database 'INGRESS_USER_DB' does not exist or not authorized.
清理¶
您应该移除您创建的计费资源。有关更多信息,请参阅 教程 3 中的第 5 步。
查看服务代码¶
本部分包括以下主题:
检查教程代码:查看用于实施查询服务的代码文件。
检查教程代码¶
您在第 1 步中下载的 Zip 文件包含以下文件:
Dockerfile
main.py
templates/basic_ui.html
在创建服务时,您还可以使用服务规范。以下部分介绍这些代码组件如何协同工作以创建服务。
main.py 文件¶
此 Python 文件包含实施最小 HTTP 服务器的代码,该服务器在请求中执行查询并返回查询结果。该代码提供了用于提交 echo 请求的 Web 用户界面 (UI)。
from flask import Flask
from flask import request
from flask import render_template
import logging
import os
import sys
from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *
# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")
# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")
SERVICE_HOST = os.getenv("SERVER_HOST", "0.0.0.0")
SERVER_PORT = os.getenv("SERVER_PORT", 8080)
def get_logger(logger_name):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
handler.setFormatter(
logging.Formatter("%(name)s [%(asctime)s] [%(levelname)s] %(message)s")
)
logger.addHandler(handler)
return logger
def get_login_token():
"""
Read the login token supplied automatically by Snowflake. These tokens
are short lived and should always be read right before creating any new connection.
"""
with open("/snowflake/session/token", "r") as f:
return f.read()
def get_connection_params(ingress_user_token=None):
"""
Construct Snowflake connection params from environment variables.
"""
if os.path.exists("/snowflake/session/token"):
if ingress_user_token:
logger.info("Creating a session on behalf of user.")
token = get_login_token() + "." + ingress_user_token
else:
logger.info("Creating a session as service user.")
token = get_login_token()
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"authenticator": "oauth",
"token": token,
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA,
}
else:
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"user": SNOWFLAKE_USER,
"password": SNOWFLAKE_PASSWORD,
"role": SNOWFLAKE_ROLE,
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA,
}
logger = get_logger("query-service")
app = Flask(__name__)
@app.get("/healthcheck")
def readiness_probe():
return "I'm ready!"
@app.route("/ui", methods=["GET", "POST"])
def ui():
"""
Main handler for providing a web UI.
"""
if request.method == "POST":
# get ingress user token
ingress_user = request.headers.get("Sf-Context-Current-User")
ingress_user_token = request.headers.get("Sf-Context-Current-User-Token")
if ingress_user:
logger.info(f"Received a request from user {ingress_user}")
# getting input in HTML form
query = request.form.get("query")
if query:
logger.info(f"Received a request for query: {query}.")
query_result_ingress_user = (
run_query(query, ingress_user_token)
if ingress_user_token
else "Token is missing. Can't execute as ingress user."
)
query_result_service_user = run_query(query)
return render_template(
"basic_ui.html",
query_input=query,
query_result_ingress_user=query_result_ingress_user,
query_result_service_user=query_result_service_user,
)
return render_template("basic_ui.html")
@app.route("/query", methods=["GET"])
def query():
"""
Main handler for providing programmatic access.
"""
# get ingress user token
query = request.args.get("query")
logger.info(f"Received query request: {query}.")
if query:
ingress_user = request.headers.get("Sf-Context-Current-User")
ingress_user_token = request.headers.get("Sf-Context-Current-User-Token")
if ingress_user:
logger.info(f"Received a request from user {ingress_user}")
res = run_query(query, ingress_user_token)
return str(res)
return "DONE"
def run_query(query, ingress_user_token=None):
# start a Snowflake session as the ingress user
try:
with Session.builder.configs(
get_connection_params(ingress_user_token)
).create() as session:
logger.info(
f"Snowflake connection established (id={session.session_id}). Now executing query: {query}."
)
try:
res = session.sql(query).collect()
logger.info(f"Query execution done: {query}.")
return (
"[Empty Result]"
if len(res) == 0
else [", ".join(row) for row in res]
)
except Exception as e:
return "Encountered an error when executing query: " + str(e)
except Exception as e:
return "Encountered an error when connecting to Snowflake: " + str(e)
if __name__ == '__main__':
app.run(host=SERVICE_HOST, port=SERVER_PORT)
在代码中:
ui
函数显示以下 Web 表单,并处理从 Web 表单提交的查询请求。此函数使用
@app.route()
装饰器,以指定/ui
的请求由此函数处理:@app.route("/ui", methods=["GET", "POST"]) def ui():
查询服务公开显示
execute
端点(请参见创建服务时提供的内联服务规范),从而支持通过 Web 与服务进行通信。当您在浏览器中加载已追加 /ui 的公共端点的 URL 时,浏览器会为此路径发送 HTTP GET 请求,然后服务器将请求路由到此函数。函数执行并返回简单的 HTML 表单,供用户输入查询。用户输入查询并提交表单后,浏览器会发送此路径的 HTTP POST 请求。由于服务规范包括
executeAsCaller
功能,Snowflake 会将Sf-Context-Current-User-Token
标头添加到传入请求中,并将请求转发到相同的函数(请参阅 使用调用方权限连接到 Snowflake)。代码执行两次
run_query
函数:作为入口用户。在这种情况下,登录令牌是 OAuth 令牌和入口用户令牌的串联。
token = get_login_token() + "." + ingress_user_token
作为服务用户。在这种情况下,登录令牌只是 OAuth 令牌。
token = get_login_token()
readiness_probe
函数使用@app.get()
装饰器,以指定/healthcheck
的请求由此函数处理:@app.get("/healthcheck") def readiness_probe():
此函数使 Snowflake 能够检查服务的就绪情况。当容器启动时,Snowflake 希望确认应用程序正在运行,并且服务已准备好处理请求。Snowflake 为此路径发送 HTTP GET 请求(作为运行状况探测、就绪情况探测),以确保只有正常运行的容器才能提供流量。该函数可以做任何您想做的事。
get_logger
函数有助于设置日志记录。
Dockerfile¶
此文件包含使用 Docker 构建镜像时的所有命令。
ARG BASE_IMAGE=python:3.10-slim-buster
FROM $BASE_IMAGE
COPY main.py ./
COPY templates/ ./templates/
RUN pip install --upgrade pip && pip install flask snowflake-snowpark-python
CMD ["python", "main.py"]
Dockerfile 包含在 Docker 容器中安装 Flask 库的说明。main.py
中的代码依赖 Flask 库来处理 HTTP 请求。
/template/basic_ui.html¶
查询服务公开显示 echoendpoint
端点(请参阅服务规范),允许通过 Web 与服务进行通信。当您在浏览器中加载已追加 /ui
的公共端点 URL 时,查询服务将显示此表单。

您可以在表单中输入查询并提交表单,该服务会在 HTTP 响应中返回结果。
<!DOCTYPE html>
<html lang="en">
<head>
<title>Welcome to the query service!</title>
</head>
<body>
<h1>Welcome to the query service!</h1>
<form action="{{ url_for("ui") }}" method="post">
<label for="query">query:<label><br>
<input type="text" id="query" name="query" size="50"><br>
</form>
<h2>Query:</h2>
{{ query_input }}
<h2>Result (executed on behalf of ingress user):</h2>
{{ query_result_ingress_user }}
<h2>Result (executed as service user):</h2>
{{ query_result_service_user }}
</body>
</html>
服务规范¶
Snowflake 使用您在此规范中提供的信息来配置和运行服务。
spec:
containers:
- name: main
image: /tutorial_db/data_schema/tutorial_repository/query_service:latest
env:
SERVER_PORT: 8000
readinessProbe:
port: 8000
path: /healthcheck
endpoints:
- name: execute
port: 8000
public: true
capabilities:
securityContext:
executeAsCaller: true
serviceRoles:
- name: ui_usage
endpoints:
- execute
在服务规范中,spec
、capabilities
和 serviceRoles
是顶级字段。
spec
提供了规范详细信息(请参阅 服务规范参考)。请注意,服务公开了一个公共端点 (execute
),该端点允许从公共 Web 对服务进行入口访问。capabilities
指定executeAsCaller
功能。这告诉 Snowflake 应用程序打算使用 调用方权限。serviceRoles
指定一个服务角色 (ui_usage
) 和端点名称 (execute
),以授予 USAGE 权限。readinessProbe
字段标识port
和path
,Snowflake 可以用来将 HTTP GET 请求发送到就绪情况探测,以验证服务是否已准备好处理流量。服务代码 (
echo_python.py
) 按如下方式实施就绪情况探测:@app.get("/healthcheck") def readiness_probe():
因此,规范文件相应地包括
container.readinessProbe
字段。
有关服务规范的更多信息,请参阅 服务规范参考。
下一步是什么?¶
现在,您已完成本教程,可以返回 使用服务 以探索其他主题。