教程 2:创建 Snowpark Container Services 作业服务

简介

After completing the Tutorial Common Setup, you are ready to create a job service. In this tutorial, you create a simple job service that connects to Snowflake, executes a SQL SELECT query, and saves the result to a table.

本教程分为两个部分:

第 1 部分:创建并测试作业服务。 您将下载为本教程提供的代码,并按照分步说明进行操作:

  1. 下载为本教程提供的作业服务代码。
  2. 为 Snowpark Container Services 构建 Docker 镜像,并将该镜像上传到账户的仓库中。
  3. Stage the service specification file, which gives Snowflake the container configuration information. In addition to the name of the image to use to start a container, the specification file specifies three arguments: a SELECT query, a virtual warehouse to execute the query, and the name of the table to save the result to.
  4. Execute the job service. Using the EXECUTE JOB SERVICE command, you can execute the job service by providing the specification file and the compute pool where Snowflake can run the container. And finally, verify the service results.

第 2 部分:了解作业服务代码。本部分概述作业服务代码,并重点介绍不同组件的协作方式。

1:下载作业服务代码

提供代码(Python 应用程序)来实施作业服务。

  1. Download SnowparkContainerServices-Tutorials.zip.
  2. Unzip the content, which includes one directory for each tutorial. The Tutorial-2 directory has the following files:
    • main.py
    • Dockerfile
    • my_job_spec.yaml

2:构建并上传镜像

Build an image for the linux/amd64 platform that Snowpark Container Services supports, and then upload the image to the image repository in your account (see Common Setup).

You will need information about the repository (the repository URL and the registry hostname) before you can build and upload the image. For more information, see Registry and Repositories.

获取有关仓库的信息

  1. To get the repository URL, execute the SHOW IMAGE REPOSITORIES SQL command.

    SHOW IMAGE REPOSITORIES;
    • The repository_url column in the output provides the URL. An example is shown:

      <orgname>-<acctname>.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository
    • 镜像仓库 URL 中的主机名是注册表主机名。示例如下:

      <orgname>-<acctname>.registry.snowflakecomputing.cn

构建镜像并将其上传到仓库

  1. 打开终端窗口,然后切换到包含解压文件的目录。

  2. To build a Docker image, execute the following docker build command using the Docker CLI. Note the command specifies current working directory (.) as the PATH for files to use for building the image.

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    • For image_name, use my_job_image:latest.

示例

docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
  1. Upload the image to the repository in your Snowflake account. In order for Docker to upload an image on your behalf to your repository, you must first authenticate Docker with the registry.
    1. For Docker to upload an image on your behalf to your repository, first authenticate Docker with the registry.

      1. We recommend using Snowflake CLI to authenticate your local Docker instance with the image registry for your Snowflake account. Make sure that you configured Snowflake CLI to connect to Snowflake. For more information, see Configuring Snowflake CLI and connecting to Snowflake.
      2. To authenticate, execute the following Snowflake CLI command:
        snow spcs image-registry login
    2. 要上传镜像,请执行以下命令:

      docker push <repository_url>/<image_name>

示例

docker push myorg-myacct.registry.snowflakecomputing.cn/tutorial_db/data_schema/tutorial_repository/my_job_image:latest

3:暂存规范文件

  • To upload your service specification file (my_job_spec.yaml) to the stage, use one of the following options:
    • The Snowsight web interface: For instructions, see Choosing an internal stage for local files.

    • The SnowSQL CLI: Execute the following PUT command:

      PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;

例如:

  • Linux 或 macOS

    PUT file:///tmp/my_job_spec.yaml @tutorial_stage
      AUTO_COMPRESS=FALSE
      OVERWRITE=TRUE;
  • Windows

    PUT file://C:\temp\my_job_spec.yaml @tutorial_stage
      AUTO_COMPRESS=FALSE
      OVERWRITE=TRUE;

您还可以指定相对路径。

PUT file://./my_job_spec.yaml @tutorial_stage
  AUTO_COMPRESS=FALSE
  OVERWRITE=TRUE;

此命令设置 OVERWRITE=TRUE,以便您可以在需要时再次上传文件(例如,如果您修复了规范文件中的错误)。如果 PUT 命令成功执行,则会打印出有关上传文件的信息。

4:执行作业服务

现在,您可以创建作业了。

  1. 要开始作业服务,请运行 EXECUTE JOB SERVICE 命令:

    EXECUTE JOB SERVICE
      IN COMPUTE POOL tutorial_compute_pool
      NAME=tutorial_2_job_service
      FROM @tutorial_stage
      SPEC='my_job_spec.yaml';

请注意以下事项:

  • FROM and SPEC provide the stage name and the name of the job service specification file. When the job service is executed, it runs the SQL statement and saves the result to a table as specified in my_job_spec.yaml.

SQL 语句不在 Docker 容器内执行。相反,正在运行的容器会连接 Snowflake,并在 Snowflake 仓库中运行 SQL 语句。

  • COMPUTE_POOL 提供 Snowflake 在其中执行作业服务的计算资源。
  • 您可以选择性地包含 QUERY_WAREHOUSE 参数,以指定容器执行 SQL 语句的默认仓库。但是,本教程中的作业服务代码指定了一个环境变量来定义仓库,因此前面的命令省略了默认值。
  • EXECUTE JOB SERVICE 返回包含作业名称的输出,如以下示例输出所示:
    +------------------------------------------------------------------------------------+
    |                      status                                                        |
    -------------------------------------------------------------------------------------+
    | Job TUTORIAL_2_JOB_SERVICE completed successfully with status: DONE.               |
    +------------------------------------------------------------------------------------+
  1. The job service runs a simple query and saves result to the results table. You can verify the job service successfully completed by querying the results table:

    SELECT * FROM results;

示例输出:

+----------+-----------+
| TIME     | TEXT      |
|----------+-----------|
| 10:56:52 | hello     |
+----------+-----------+
  1. If you want to debug execution of your job service, execute SHOW SERVICE CONTAINERS IN SERVICE to determine if the job service is still running, if it failed to start, or why it failed if it did. Also, assuming your code outputs useful logs to standard output or standard error, you can access the logs using SYSTEM$GET_SERVICE_LOGS.
    1. To get the job service status, execute SHOW SERVICE CONTAINERS IN SERVICE:

      SHOW SERVICE CONTAINERS IN SERVICE tutorial_2_job_service;

示例输出:

+---------------+-------------+------------------------+-------------+----------------+--------+------------------------+----------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+---------------+------------+
| database_name | schema_name | service_name           | instance_id | container_name | status | message                | image_name                                                                                                                             | image_digest                                                            | restart_count | start_time |
|---------------+-------------+------------------------+-------------+----------------+--------+------------------------+----------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+---------------+------------|
| TUTORIAL_DB   | DATA_SCHEMA | TUTORIAL_2_JOB_SERVICE | 0           | main           | DONE   | Completed successfully | myorg-myacct.registry.snowflakecomputing.cn/tutorial_db/tutorial_db/data_schema/tutorial_repository/my_job_image:latest | sha256:aa3fa2e5c1552d16904a5bbc97d400316ebb4a608bb110467410485491d9d8d0 |             0 |            |
+---------------+-------------+------------------------+-------------+----------------+--------+------------------------+----------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+---------------+------------+
  1. To get the job service log information, use the system function SYSTEM$GET_SERVICE_LOGS:

    SELECT SYSTEM$GET_SERVICE_LOGS('tutorial_2_job_service', 0, 'main')
    job-tutorial - INFO - Job started
    job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE"
    job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results]
    job-tutorial - INFO - Job finished

5:清理

If you do not plan to continue with Tutorial 4, you should remove billable resources you created. For more information, see Step 5 in Tutorial 4.

6:审查作业服务代码

本部分包括以下主题:

  • 检查提供的文件: Review various code files that implement the job service.
  • 在本地构建和测试镜像. The section provides an explanation of how you can locally test the Docker image before uploading it to a repository in your Snowflake account.

检查提供的文件

您在教程开始时下载的 Zip 文件包括以下文件:

  • main.py
  • Dockerfile
  • my_job_spec.yaml

本部分概述代码。

main.py 文件

#!/opt/conda/bin/python3

import argparse
import logging
import os
import sys

from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *

# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")

# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")


def get_arg_parser():
  """
  Input argument list.
  """
  parser = argparse.ArgumentParser()
  parser.add_argument("--query", required=True, help="query text to execute")
  parser.add_argument(
    "--result_table",
    required=True,
    help="name of the table to store result of query specified by flag --query")

  return parser


def get_logger():
  """
  Get a logger for local logging.
  """
  logger = logging.getLogger("job-tutorial")
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
  handler.setFormatter(formatter)
  logger.addHandler(handler)
  return logger


def get_login_token():
  """
  Read the login token supplied automatically by Snowflake. These tokens
  are short lived and should always be read right before creating any new connection.
  """
  with open("/snowflake/session/token", "r") as f:
    return f.read()


def get_connection_params():
  """
  Construct Snowflake connection params from environment variables.
  """
  if os.path.exists("/snowflake/session/token"):
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "authenticator": "oauth",
      "token": get_login_token(),
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }
  else:
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "user": SNOWFLAKE_USER,
      "password": SNOWFLAKE_PASSWORD,
      "role": SNOWFLAKE_ROLE,
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }

def run_job():
  """
  Main body of this job.
  """
  logger = get_logger()
  logger.info("Job started")

  # Parse input arguments
  args = get_arg_parser().parse_args()
  query = args.query
  result_table = args.result_table

  # Start a Snowflake session, run the query and write results to specified table
  with Session.builder.configs(get_connection_params()).create() as session:
    # Print out current session context information.
    database = session.get_current_database()
    schema = session.get_current_schema()
    warehouse = session.get_current_warehouse()
    role = session.get_current_role()
    logger.info(
      f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
    )

    # Execute query and persist results in a table.
    logger.info(
      f"Executing query [{query}] and writing result to table [{result_table}]"
    )
    res = session.sql(query)
    # If the table already exists, the query result must match the table scheme.
    # If the table does not exist, this will create a new table.
    res.write.mode("append").save_as_table(result_table)

  logger.info("Job finished")


if __name__ == "__main__":
  run_job()

在代码中:

  • Python code executes at main, which then executes the run_job() function:

    if __name__ == "__main__":
      run_job()
  • The run_job() function reads the environment variables and uses them to set default values for various parameters. The container uses these parameters to connect to Snowflake. Note that:

    • You can override the parameter values, used in the service, using the containers.env and containers.args fields in the service specification. For more information, see Service specification reference.
    • When the image runs in Snowflake, Snowflake populates some of these parameters (see source code) automatically. However, when testing the image locally, you need to explicitly provide these parameters (as shown in the next section, 在本地构建和测试镜像).

Dockerfile

此文件包含使用 Docker 构建镜像时的所有命令。

ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.12 && \
  conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]

my_job_spec.yaml 文件(服务规范)

Snowflake 使用您在此规范中提供的信息来配置和运行作业服务。

spec:
  containers:
  - name: main
    image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
    env:
      SNOWFLAKE_WAREHOUSE: tutorial_warehouse
    args:
    - "--query=select current_time() as time,'hello'"
    - "--result_table=results"

In addition to the container.name and container.image required fields (see Service specification reference), the specification includes the optional container.args field to list the arguments:

  • --query provides the query to execute when the service runs.
  • --result_table identifies the table to save the query results.

在本地构建和测试镜像

您可以在本地测试 Docker 镜像,然后再将其上传到 Snowflake 账户中的仓库。在本地测试中,您的容器独立运行(它不是 Snowflake 执行的作业服务)。

使用以下步骤测试教程 2 的 Docker 镜像:

  1. To create a Docker image, in the Docker CLI, execute the docker build command:

    docker build --rm -t my_service:local .
  2. To launch your code, execute the docker run command, providing <orgname>-<acctname>, <username>, and <password>:

    docker run --rm \
      -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \
      -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.cn \
      -e SNOWFLAKE_DATABASE=tutorial_db \
      -e SNOWFLAKE_SCHEMA=data_schema \
      -e SNOWFLAKE_ROLE=test_role \
      -e SNOWFLAKE_USER=<username> \
      -e SNOWFLAKE_PASSWORD=<password> \
      -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \
      my_job:local \
      --query="select current_time() as time,'hello'" \
      --result_table=tutorial_db.data_schema.results

请注意,在本地测试时,除三个实参(查询、用于运行查询的仓库和用于保存结果的表)外,您还为本地运行的容器提供连接参数,以连接 Snowflake。

When you run the container as a service, Snowflake provides these parameters to the container as the environment variables. For more information, see Configure Snowflake Client.

The job service executes the query (select current_time() as time,'hello') and writes result to the table (tutorial_db.data_schema.results). If the table does not exist, it is created. If the table exists, the job service adds a row.

查询结果表的示例结果:

+----------+----------+
| TIME     | TEXT     |
|----------+----------|
| 10:56:52 | hello    |
+----------+----------+

下一步是什么?

You can now test Tutorial 4, which shows how service-to-service communication works.