教程 2:创建 Snowpark Container Services 作业服务

重要

Snowpark Container Services 作业服务功能目前为非公开预览版,并受预览版条款的约束,详情请访问: ` <https://snowflake.com/legal (https://snowflake.com/legal)> `_。有关更多信息,请联系您的 Snowflake 代表。

简介

完成 教程通用设置 后,您就可以创建作业服务了。在本教程中,您将创建简单的作业服务,该服务连接到 Snowflake、执行 SQL SELECT 查询并将结果保存到表中。

本教程分为两个部分:

第 1 部分:创建并测试作业服务。 您将下载为本教程提供的代码,并按照分步说明进行操作:

  1. 下载为本教程提供的作业服务代码。

  2. 为 Snowpark Container Services 构建 Docker 镜像,并将该镜像上传到账户的仓库中。

  3. 暂存服务规范文件,该文件为 Snowflake 提供容器配置信息。除用于启动容器的镜像的名称外,规范文件还指定了三个实参:SELECT 查询、用于执行查询的虚拟仓库,以及用于保存结果的表的名称。

  4. 执行作业服务。使用 EXECUTE JOB SERVICE 命令,您可以通过提供规范文件和 Snowflake 可以运行容器的计算池来执行作业服务。最后,验证服务结果。

第 2 部分:了解作业服务代码。本部分概述作业服务代码,并重点介绍不同组件的协作方式。

1:下载作业服务代码

提供代码(Python 应用程序)来实施作业服务。

  1. 下载:download: SnowparkContainerServices -Tutorials.zip </samples/spcs/SnowparkContainerServices-Tutorials.zip>

  2. 解压内容,其中包含每个教程的一个目录。Tutorial-2 目录包含以下文件:

    • main.py

    • Dockerfile

    • my_job_spec.yaml

2:构建并上传镜像

为 Snowpark Container Services 支持的 linux/amd64 平台构建镜像,然后将镜像上传到账户中的镜像仓库(请参阅 通用设置)。

您将需要有关仓库(仓库 URL 和注册表主机名)的信息,才能构建和上传镜像。有关更多信息,请参阅 注册表和镜像仓库

获取有关仓库的信息

  1. 要获取镜像仓库 URL,请执行 SHOW IMAGE REPOSITORIES SQL 命令。

    SHOW IMAGE REPOSITORIES;
    
    Copy
    • 输出中的 repository_url 列提供 URL。示例如下:

      <orgname>-<acctname>.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository
      
    • 镜像仓库 URL 中的主机名是注册表主机名。示例如下:

      <orgname>-<acctname>.registry.snowflakecomputing.cn
      

构建镜像并将其上传到仓库

  1. 打开终端窗口,然后切换到包含解压文件的目录。

  2. 要构建 Docker 镜像,请使用 Docker CLI 执行以下 docker build 命令。请注意,该命令指定当前工作目录 (.),作为用于构建镜像的文件的 PATH

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    
    Copy
    • 对于 image_name,请使用 my_job_image:latest

    示例

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
    
    Copy
  3. 将镜像上传到 Snowflake 账户中的仓库。要使 Docker 代表您将镜像上传到仓库,您必须首先使用 Snowflake 对 Docker 进行身份验证。

    1. 要使用 Snowflake 注册表对 Docker 进行身份验证,请执行以下命令。

      docker login <registry_hostname> -u <username>
      
      Copy
      • 对于 username,请指定您的 Snowflake 用户名。Docker 将提示您输入密码。

    2. 要上传镜像,请执行以下命令:

      docker push <repository_url>/<image_name>
      
      Copy

      示例

      docker push myorg-myacct.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
      
      Copy

3:暂存规范文件

  • 要将您的服务规范文件 (my_job_spec.yaml) 上传到暂存区,请使用以下选项之一:

    • Snowsight Web 界面: 有关说明,请参阅 为本地文件选择内部暂存区

    • SnowSQL CLI: 执行以下 PUT 命令:

      PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      例如:

      • Linux 或 macOS

        PUT file:///tmp/my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy
      • Windows

        PUT file://C:\temp\my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy

      您还可以指定相对路径。

      PUT file://./my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      此命令设置 OVERWRITE=TRUE,以便您可以在需要时再次上传文件(例如,如果您修复了规范文件中的错误)。如果 PUT 命令成功执行,则会打印出有关上传文件的信息。

4:执行作业服务

现在,您可以创建作业了。

  1. 要开始作业服务,请运行 EXECUTE JOB SERVICE 命令:

    EXECUTE JOB SERVICE
      IN COMPUTE POOL tutorial_compute_pool
      NAME=tutorial_2_job_service
      FROM @tutorial_stage
      SPEC='my_job_spec.yaml';
    
    Copy

    请注意以下事项:

    • FROM 和 SPEC 提供暂存区名称和作业服务规范文件的名称。执行作业服务时,作业服务会运行 SQL 语句并将结果保存到表中,如 my_job_spec.yaml 中所述。

      SQL 语句不在 Docker 容器内执行。相反,正在运行的容器会连接 Snowflake,并在 Snowflake 仓库中运行 SQL 语句。

    • COMPUTE_POOL 提供 Snowflake 在其中执行作业服务的计算资源。

    • EXECUTE JOB SERVICE 返回包含作业名称的输出,如以下示例输出所示:

      +------------------------------------------------------------------------------------+
      |                      status                                                        |
      -------------------------------------------------------------------------------------+
      | Job TUTORIAL_2_JOB_SERVICE completed successfully with status: DONE.               |
      +------------------------------------------------------------------------------------+
      
  2. 作业服务运行简单的查询并将结果保存到结果表中。您可以通过查询结果表来验证作业服务是否已成功完成:

    SELECT * FROM results;
    
    Copy

    示例输出:

    +----------+-----------+
    | TIME     | TEXT      |
    |----------+-----------|
    | 10:56:52 | hello     |
    +----------+-----------+
    
  3. 如果要调试作业服务的执行,请使用系统函数。例如,使用 SYSTEM$GET_SERVICE_STATUS 来确定作业服务是否仍在运行、是否启动失败或者启动失败的原因。此外,假设代码将有用的日志输出为标准输出或标准错误,您可以使用 SYSTEM$GET_SERVICE_LOGS 访问日志。

    1. 要获取作业服务状态,请调用系统函数 SYSTEM$GET_SERVICE_STATUS

      SELECT SYSTEM$GET_SERVICE_STATUS('tutorial_2_job_service');
      
      Copy

      示例输出:

      [
        {
            "status":"DONE",
            "message":"Completed successfully",
            "containerName":"main",
            "instanceId":"0",
            "serviceName":"TUTORIAL_2_JOB_SERVICE",
            "image":"orgname-acctname.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
            "restartCount":0,
            "startTime":""
        }
      ]
      
      Copy
    2. 要获取作业服务日志信息,请使用系统函数 SYSTEM$GET_SERVICE_LOGS

      SELECT SYSTEM$GET_SERVICE_LOGS('tutorial_2_job_service', 0, 'main')
      
      Copy
      job-tutorial - INFO - Job started
      job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE"
      job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results]
      job-tutorial - INFO - Job finished
      

5:清理

如果您不打算继续学习 教程 3,则应移除您创建的可计费资源。有关更多信息,请参阅 教程 3 中的第 5 步。

6:审查作业服务代码

本部分包括以下主题:

检查提供的文件

您在教程开始时下载的 Zip 文件包括以下文件:

  • main.py

  • Dockerfile

  • my_job_spec.yaml

本部分概述代码。

main.py 文件

#!/opt/conda/bin/python3

import argparse
import logging
import os
import sys

from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *

# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")

# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")


def get_arg_parser():
  """
  Input argument list.
  """
  parser = argparse.ArgumentParser()
  parser.add_argument("--query", required=True, help="query text to execute")
  parser.add_argument(
    "--result_table",
    required=True,
    help="name of the table to store result of query specified by flag --query")

  return parser


def get_logger():
  """
  Get a logger for local logging.
  """
  logger = logging.getLogger("job-tutorial")
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
  handler.setFormatter(formatter)
  logger.addHandler(handler)
  return logger


def get_login_token():
  """
  Read the login token supplied automatically by Snowflake. These tokens
  are short lived and should always be read right before creating any new connection.
  """
  with open("/snowflake/session/token", "r") as f:
    return f.read()


def get_connection_params():
  """
  Construct Snowflake connection params from environment variables.
  """
  if os.path.exists("/snowflake/session/token"):
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "authenticator": "oauth",
      "token": get_login_token(),
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }
  else:
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "user": SNOWFLAKE_USER,
      "password": SNOWFLAKE_PASSWORD,
      "role": SNOWFLAKE_ROLE,
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }

def run_job():
  """
  Main body of this job.
  """
  logger = get_logger()
  logger.info("Job started")

  # Parse input arguments
  args = get_arg_parser().parse_args()
  query = args.query
  result_table = args.result_table

  # Start a Snowflake session, run the query and write results to specified table
  with Session.builder.configs(get_connection_params()).create() as session:
    # Print out current session context information.
    database = session.get_current_database()
    schema = session.get_current_schema()
    warehouse = session.get_current_warehouse()
    role = session.get_current_role()
    logger.info(
      f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
    )

    # Execute query and persist results in a table.
    logger.info(
      f"Executing query [{query}] and writing result to table [{result_table}]"
    )
    res = session.sql(query)
    # If the table already exists, the query result must match the table scheme.
    # If the table does not exist, this will create a new table.
    res.write.mode("append").save_as_table(result_table)

  logger.info("Job finished")


if __name__ == "__main__":
  run_job()
Copy

在代码中:

  • Python 代码在 main 处执行,然后执行 run_job() 函数:

    if __name__ == "__main__":
      run_job()
    
    Copy
  • run_job() 函数读取环境变量,并使用这些变量为各种参数设置默认值。容器使用这些参数连接 Snowflake。请注意:

    • 您可以使用服务规范中的 containers.envcontainers.args 字段覆盖服务中使用的参数值。有关更多信息,请参阅 服务规范参考

    • 当镜像在 Snowflake 中运行时,Snowflake 会自动填充其中的一些参数(请参阅源代码)。但是,在本地测试镜像时,需要明确提供这些参数(如下一部分 在本地构建和测试镜像 所示)。

Dockerfile

此文件包含使用 Docker 构建镜像时的所有命令。

ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
  conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
Copy

my_job_spec.yaml 文件(服务规范)

Snowflake 使用您在此规范中提供的信息来配置和运行作业服务。

spec:
container:
- name: main
   image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
   env:
      SNOWFLAKE_WAREHOUSE: tutorial_warehouse
   args:
   - "--query=select current_time() as time,'hello'"
   - "--result_table=results"
Copy

container.namecontainer.image 必填字段(请参阅 服务规范参考)外,该规范还包括用于列出参数的可选 container.args 字段:

  • --query 提供在服务运行时要执行的查询。

  • --result_table 标识用于保存查询结果的表。

在本地构建和测试镜像

您可以在本地测试 Docker 镜像,然后再将其上传到 Snowflake 账户中的仓库。在本地测试中,您的容器独立运行(它不是 Snowflake 执行的作业服务)。

使用以下步骤测试教程 2 的 Docker 镜像:

  1. 要创建 Docker 镜像,请在 Docker CLI 中执行以下 docker build 命令:

    docker build --rm -t my_service:local .
    
    Copy
  2. 要启动代码,请执行 docker run 命令,提供 <orgname>-<acctname><username><password>

    docker run --rm \
      -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \
      -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.cn \
      -e SNOWFLAKE_DATABASE=tutorial_db \
      -e SNOWFLAKE_SCHEMA=data_schema \
      -e SNOWFLAKE_ROLE=test_role \
      -e SNOWFLAKE_USER=<username> \
      -e SNOWFLAKE_PASSWORD=<password> \
      -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \
      my_job:local \
      --query="select current_time() as time,'hello'" \
      --result_table=tutorial_db.data_schema.results
    
    Copy

    请注意,在本地测试时,除三个实参(查询、用于运行查询的仓库和用于保存结果的表)外,您还为本地运行的容器提供连接参数,以连接 Snowflake。

    当您将容器作为服务运行时,Snowflake 将这些参数作为环境变量提供给容器。有关更多信息,请参阅 配置 Snowflake 客户端

    作业服务执行查询 (select current_time() as time,'hello'),并将结果写入表 (tutorial_db.data_schema.results)。如果表不存在,则创建表。如果表存在,则作业服务会添加一行。

    查询结果表的示例结果:

    +----------+----------+
    | TIME     | TEXT     |
    |----------+----------|
    | 10:56:52 | hello    |
    +----------+----------+
    

下一步是什么?

您现在可以测试 教程 3,该教程展示了服务间通信的工作原理。

语言: 中文