教程 2:创建 Snowpark Container Services 作业服务¶
重要
Snowpark Container Services 作业服务功能目前为非公开预览版,并受预览版条款的约束,详情请访问: ` <https://snowflake.com/legal (https://snowflake.com/legal)> `_。有关更多信息,请联系您的 Snowflake 代表。
简介¶
完成 教程通用设置 后,您就可以创建作业服务了。在本教程中,您将创建简单的作业服务,该服务连接到 Snowflake、执行 SQL SELECT 查询并将结果保存到表中。
本教程分为两个部分:
第 1 部分:创建并测试作业服务。 您将下载为本教程提供的代码,并按照分步说明进行操作:
下载为本教程提供的作业服务代码。
为 Snowpark Container Services 构建 Docker 镜像,并将该镜像上传到账户的仓库中。
暂存服务规范文件,该文件为 Snowflake 提供容器配置信息。除用于启动容器的镜像的名称外,规范文件还指定了三个实参:SELECT 查询、用于执行查询的虚拟仓库,以及用于保存结果的表的名称。
执行作业服务。使用 EXECUTE JOB SERVICE 命令,您可以通过提供规范文件和 Snowflake 可以运行容器的计算池来执行作业服务。最后,验证服务结果。
第 2 部分:了解作业服务代码。本部分概述作业服务代码,并重点介绍不同组件的协作方式。
1:下载作业服务代码¶
提供代码(Python 应用程序)来实施作业服务。
下载:download: SnowparkContainerServices -Tutorials.zip </samples/spcs/SnowparkContainerServices-Tutorials.zip>。
解压内容,其中包含每个教程的一个目录。
Tutorial-2
目录包含以下文件:main.py
Dockerfile
my_job_spec.yaml
2:构建并上传镜像¶
为 Snowpark Container Services 支持的 linux/amd64 平台构建镜像,然后将镜像上传到账户中的镜像仓库(请参阅 通用设置)。
您将需要有关仓库(仓库 URL 和注册表主机名)的信息,才能构建和上传镜像。有关更多信息,请参阅 注册表和镜像仓库。
获取有关仓库的信息
要获取镜像仓库 URL,请执行 SHOW IMAGE REPOSITORIES SQL 命令。
SHOW IMAGE REPOSITORIES;
输出中的
repository_url
列提供 URL。示例如下:<orgname>-<acctname>.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository
镜像仓库 URL 中的主机名是注册表主机名。示例如下:
<orgname>-<acctname>.registry.snowflakecomputing.cn
构建镜像并将其上传到仓库
打开终端窗口,然后切换到包含解压文件的目录。
要构建 Docker 镜像,请使用 Docker CLI 执行以下
docker build
命令。请注意,该命令指定当前工作目录 (.),作为用于构建镜像的文件的PATH
。docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
对于
image_name
,请使用my_job_image:latest
。
示例
docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
将镜像上传到 Snowflake 账户中的仓库。要使 Docker 代表您将镜像上传到仓库,您必须首先使用 Snowflake 对 Docker 进行身份验证。
要使用 Snowflake 注册表对 Docker 进行身份验证,请执行以下命令。
docker login <registry_hostname> -u <username>
对于
username
,请指定您的 Snowflake 用户名。Docker 将提示您输入密码。
要上传镜像,请执行以下命令:
docker push <repository_url>/<image_name>
示例
docker push myorg-myacct.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
3:暂存规范文件¶
要将您的服务规范文件 (
my_job_spec.yaml
) 上传到暂存区,请使用以下选项之一:Snowsight Web 界面: 有关说明,请参阅 为本地文件选择内部暂存区。
SnowSQL CLI: 执行以下 PUT 命令:
PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
例如:
Linux 或 macOS
PUT file:///tmp/my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Windows
PUT file://C:\temp\my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
您还可以指定相对路径。
PUT file://./my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
此命令设置 OVERWRITE=TRUE,以便您可以在需要时再次上传文件(例如,如果您修复了规范文件中的错误)。如果 PUT 命令成功执行,则会打印出有关上传文件的信息。
4:执行作业服务¶
现在,您可以创建作业了。
要开始作业服务,请运行 EXECUTE JOB SERVICE 命令:
EXECUTE JOB SERVICE IN COMPUTE POOL tutorial_compute_pool NAME=tutorial_2_job_service FROM @tutorial_stage SPEC='my_job_spec.yaml';
请注意以下事项:
FROM 和 SPEC 提供暂存区名称和作业服务规范文件的名称。执行作业服务时,作业服务会运行 SQL 语句并将结果保存到表中,如
my_job_spec.yaml
中所述。SQL 语句不在 Docker 容器内执行。相反,正在运行的容器会连接 Snowflake,并在 Snowflake 仓库中运行 SQL 语句。
COMPUTE_POOL 提供 Snowflake 在其中执行作业服务的计算资源。
EXECUTE JOB SERVICE 返回包含作业名称的输出,如以下示例输出所示:
+------------------------------------------------------------------------------------+ | status | -------------------------------------------------------------------------------------+ | Job TUTORIAL_2_JOB_SERVICE completed successfully with status: DONE. | +------------------------------------------------------------------------------------+
作业服务运行简单的查询并将结果保存到结果表中。您可以通过查询结果表来验证作业服务是否已成功完成:
SELECT * FROM results;
示例输出:
+----------+-----------+ | TIME | TEXT | |----------+-----------| | 10:56:52 | hello | +----------+-----------+
如果要调试作业服务的执行,请使用系统函数。例如,使用 SYSTEM$GET_SERVICE_STATUS 来确定作业服务是否仍在运行、是否启动失败或者启动失败的原因。此外,假设代码将有用的日志输出为标准输出或标准错误,您可以使用 SYSTEM$GET_SERVICE_LOGS 访问日志。
要获取作业服务状态,请调用系统函数 SYSTEM$GET_SERVICE_STATUS:
SELECT SYSTEM$GET_SERVICE_STATUS('tutorial_2_job_service');
示例输出:
[ { "status":"DONE", "message":"Completed successfully", "containerName":"main", "instanceId":"0", "serviceName":"TUTORIAL_2_JOB_SERVICE", "image":"orgname-acctname.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository/my_job_image:latest", "restartCount":0, "startTime":"" } ]
要获取作业服务日志信息,请使用系统函数 SYSTEM$GET_SERVICE_LOGS:
SELECT SYSTEM$GET_SERVICE_LOGS('tutorial_2_job_service', 0, 'main')
job-tutorial - INFO - Job started job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE" job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results] job-tutorial - INFO - Job finished
5:清理¶
6:审查作业服务代码¶
本部分包括以下主题:
检查提供的文件:审查用于实施作业服务的各种代码文件。
在本地构建和测试镜像。本部分介绍如何本地测试 Docker 镜像,然后将其上传到 Snowflake 账户的仓库。
检查提供的文件¶
您在教程开始时下载的 Zip 文件包括以下文件:
main.py
Dockerfile
my_job_spec.yaml
本部分概述代码。
main.py 文件¶
#!/opt/conda/bin/python3
import argparse
import logging
import os
import sys
from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *
# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")
# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")
def get_arg_parser():
"""
Input argument list.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--query", required=True, help="query text to execute")
parser.add_argument(
"--result_table",
required=True,
help="name of the table to store result of query specified by flag --query")
return parser
def get_logger():
"""
Get a logger for local logging.
"""
logger = logging.getLogger("job-tutorial")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def get_login_token():
"""
Read the login token supplied automatically by Snowflake. These tokens
are short lived and should always be read right before creating any new connection.
"""
with open("/snowflake/session/token", "r") as f:
return f.read()
def get_connection_params():
"""
Construct Snowflake connection params from environment variables.
"""
if os.path.exists("/snowflake/session/token"):
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"authenticator": "oauth",
"token": get_login_token(),
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
else:
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"user": SNOWFLAKE_USER,
"password": SNOWFLAKE_PASSWORD,
"role": SNOWFLAKE_ROLE,
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
def run_job():
"""
Main body of this job.
"""
logger = get_logger()
logger.info("Job started")
# Parse input arguments
args = get_arg_parser().parse_args()
query = args.query
result_table = args.result_table
# Start a Snowflake session, run the query and write results to specified table
with Session.builder.configs(get_connection_params()).create() as session:
# Print out current session context information.
database = session.get_current_database()
schema = session.get_current_schema()
warehouse = session.get_current_warehouse()
role = session.get_current_role()
logger.info(
f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
)
# Execute query and persist results in a table.
logger.info(
f"Executing query [{query}] and writing result to table [{result_table}]"
)
res = session.sql(query)
# If the table already exists, the query result must match the table scheme.
# If the table does not exist, this will create a new table.
res.write.mode("append").save_as_table(result_table)
logger.info("Job finished")
if __name__ == "__main__":
run_job()
在代码中:
Python 代码在
main
处执行,然后执行run_job()
函数:if __name__ == "__main__": run_job()
run_job()
函数读取环境变量,并使用这些变量为各种参数设置默认值。容器使用这些参数连接 Snowflake。请注意:您可以使用服务规范中的
containers.env
和containers.args
字段覆盖服务中使用的参数值。有关更多信息,请参阅 服务规范参考。当镜像在 Snowflake 中运行时,Snowflake 会自动填充其中的一些参数(请参阅源代码)。但是,在本地测试镜像时,需要明确提供这些参数(如下一部分 在本地构建和测试镜像 所示)。
Dockerfile¶
此文件包含使用 Docker 构建镜像时的所有命令。
ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
my_job_spec.yaml 文件(服务规范)¶
Snowflake 使用您在此规范中提供的信息来配置和运行作业服务。
spec:
container:
- name: main
image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
env:
SNOWFLAKE_WAREHOUSE: tutorial_warehouse
args:
- "--query=select current_time() as time,'hello'"
- "--result_table=results"
除 container.name
和 container.image
必填字段(请参阅 服务规范参考)外,该规范还包括用于列出参数的可选 container.args
字段:
--query
提供在服务运行时要执行的查询。--result_table
标识用于保存查询结果的表。
在本地构建和测试镜像¶
您可以在本地测试 Docker 镜像,然后再将其上传到 Snowflake 账户中的仓库。在本地测试中,您的容器独立运行(它不是 Snowflake 执行的作业服务)。
使用以下步骤测试教程 2 的 Docker 镜像:
要创建 Docker 镜像,请在 Docker CLI 中执行以下
docker build
命令:docker build --rm -t my_service:local .
要启动代码,请执行
docker run
命令,提供<orgname>-<acctname>
、<username>
和<password>
:docker run --rm \ -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \ -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.cn \ -e SNOWFLAKE_DATABASE=tutorial_db \ -e SNOWFLAKE_SCHEMA=data_schema \ -e SNOWFLAKE_ROLE=test_role \ -e SNOWFLAKE_USER=<username> \ -e SNOWFLAKE_PASSWORD=<password> \ -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \ my_job:local \ --query="select current_time() as time,'hello'" \ --result_table=tutorial_db.data_schema.results
请注意,在本地测试时,除三个实参(查询、用于运行查询的仓库和用于保存结果的表)外,您还为本地运行的容器提供连接参数,以连接 Snowflake。
当您将容器作为服务运行时,Snowflake 将这些参数作为环境变量提供给容器。有关更多信息,请参阅 配置 Snowflake 客户端。
作业服务执行查询 (
select current_time() as time,'hello'
),并将结果写入表 (tutorial_db.data_schema.results
)。如果表不存在,则创建表。如果表存在,则作业服务会添加一行。查询结果表的示例结果:
+----------+----------+ | TIME | TEXT | |----------+----------| | 10:56:52 | hello | +----------+----------+
下一步是什么?¶
您现在可以测试 教程 3,该教程展示了服务间通信的工作原理。