教程 3:使用 Snowflake Python APIs 创建服务和作业

简介

In Tutorial 1 and Tutorial 2, you use the SQL interface to create a Snowpark Container Services service and job. In this tutorial you use the Snowflake Python APIs to create the same service and job and thus explore using the Snowflake Python APIs to manage Snowpark Container Services resources.

The tutorial uses a Snowflake notebook to execute the Python code, but the code is independent of the notebook and you can execute the code in other environments.

1:初始配置

在此初始配置中,您将创建一个 Snowflake 笔记本,导入所需的库,并定义单元格在后续步骤中使用的常量。

  1. 创建 Snowflake 笔记本。

    1. Create a notebook. For instructions, see Create a new notebook. Note that the Python environment you choose in the UI (Run on warehouse or Run on container) doesn’t matter.
    2. From the Packages drop-down menu, choose the “snowflake” package and install the latest version of the Snowflake Python APIs library.
    3. (可选)默认情况下,删除笔记本中提供的单元格。按照本教程中的步骤操作时,将 Python 单元格添加到笔记本中。
  2. 创建并运行单元格以导入本教程中许多单元格使用的 Python 库。

    from snowflake.snowpark.context import get_active_session
    from snowflake.core import Root
    from snowflake.core import CreateMode
  3. 创建并运行单元格以定义在后续单元格中使用的常量。下面提供的值与教程 1 和 2 匹配。您可以选择更改这些值。

    current_user = get_active_session().get_current_user()
    user_role_name = "test_role"
    compute_pool_name = "tutorial_compute_pool"
    warehouse_name = "tutorial_warehouse"
    database_name = "tutorial_db"
    schema_name = "data_schema"
    repo_name = "tutorial_repository"
    stage_name = "tutorial_stage"
    service_name = "echo_service"
    print("configured!")

2:创建 Snowflake 对象

Before you can create a service, you need Snowflake objects, such as a database, a user, a role, a compute pool, and an image repository. Some of these objects are account-scoped object that require administrative privileges to create them. The names of the objects created are defined in the preceding step.

2.1:创建账户范围的 Snowflake 对象

以下 Python 代码将创建这些对象:

  • Role (test_role). You grant this role all the privileges required to create and use the service. In the code, you grant this role to the current user to enable the user to create and use the service.
  • Database (tutorial_db). In the next step, you create a schema in this database.
  • Compute pool (tutorial_compute_pool). Your service container executes in this compute pool.
  • Warehouse (tutorial_warehouse). When the service connects to Snowflake and executes queries, this warehouse is used to execute the queries.

创建并运行该单元格,以使用 ACCOUNTADMIN 角色创建这些账户范围的对象。请注意,仅当资源不存在时,脚本才会创建资源。代码中的注释显示等效的 SQL 语句。

from snowflake.core.compute_pool import ComputePool
from snowflake.core.database import Database
from snowflake.core.grant import Grant, Grantees, Privileges, Securable, Securables
from snowflake.core.role import Role
from snowflake.core.warehouse import Warehouse

session = get_active_session()
session.use_role("ACCOUNTADMIN")
root = Root(session)

# CREATE ROLE test_role;
root.roles.create(
    Role(name=user_role_name),
    mode=CreateMode.if_not_exists)
print(f"Created role:", user_role_name)

# GRANT ROLE test_role TO USER <user_name>
root.grants.grant(Grant(
    securable=Securables.role(user_role_name),
    grantee=Grantees.user(name=current_user),
    ))

# CREATE COMPUTE POOL IF NOT EXISTS tutorial_compute_pool
#   MIN_NODES = 1 MAX_NODES = 1
#   INSTANCE_FAMILY = CPU_X64_XS
root.compute_pools.create(
    mode=CreateMode.if_not_exists,
    compute_pool=ComputePool(
        name=compute_pool_name,
        instance_family="CPU_X64_XS",
        min_nodes=1,
        max_nodes=2,
    )
)

# GRANT USAGE, OPERATE, MONITOR ON COMPUTE POOL tutorial_compute_pool TO ROLE test_role
root.grants.grant(Grant(
    privileges=[Privileges.usage, Privileges.operate, Privileges.monitor],
    securable=Securables.compute_pool(compute_pool_name),
    grantee=Grantees.role(name=user_role_name)
    ))

print(f"Created compute pool:", compute_pool_name)

# CREATE DATABASE IF NOT EXISTS tutorial_db;
root.databases.create(
    Database(name=database_name),
    mode=CreateMode.if_not_exists)

# GRANT ALL ON DATABASE tutorial_db TO ROLE test_role;
root.grants.grant(Grant(
    privileges=[Privileges.all_privileges],
    securable=Securables.database(database_name),
    grantee=Grantees.role(name=user_role_name),
    ))

print("Created database:", database_name)

# CREATE OR REPLACE WAREHOUSE tutorial_warehouse WITH WAREHOUSE_SIZE='X-SMALL';
root.warehouses.create(
    Warehouse(name=warehouse_name, warehouse_size="X-SMALL"),
    mode=CreateMode.if_not_exists)

# GRANT USAGE ON WAREHOUSE tutorial_warehouse TO ROLE test_role;
root.grants.grant(Grant(
    privileges=[Privileges.usage],
    grantee=Grantees.role(name=user_role_name),
    securable=Securables.warehouse(warehouse_name)
    ))

print("Created warehouse:", warehouse_name)

# GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE test_role
root.grants.grant(Grant(
    privileges=[Privileges.bind_service_endpoint],
    securable=Securables.current_account,
    grantee=Grantees.role(name=user_role_name)
    ))

print("Done: GRANT BIND SERVICE ENDPOINT")

As you create resources, the code also grants required privileges to the role (test_role) so the role can use these resources. Additionally, note that the echo service you create in this tutorial exposes one public endpoint. This public endpoint allows other users in your account to access the service from the public web (ingress). To create a service with a public endpoint, the role (test_role) must have the BIND SERVICE ENDPOINT privilege on the account.

2.2 创建架构范围的对象

The Python code in this section uses the test_role role to create a schema and objects in that schema. You don’t need administrative privileges to create these resources.

  • Schema (data_schema). You create an image repository, service, and job in this schema.
  • Image repository (tutorial_repository). You store your application image in this repository.
  • Stage (tutorial_stage). The stage is created only for illustration. While not demonstrated in this tutorial, stages can be used to pass data into or collect data from your services.

请注意,仅当资源不存在时,脚本才会创建资源。

from snowflake.core.image_repository import ImageRepository
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage, StageDirectoryTable

session = get_active_session()
session.use_role(user_role_name)
root = Root(session)

# CREATE SCHEMA IF NOT EXISTS {schema_name}
schema = root.databases[database_name].schemas.create(
    Schema(name=schema_name),
    mode=CreateMode.if_not_exists)
print("Created schema:", schema.name)

# CREATE IMAGE REPOSITORY IF NOT EXISTS {repo}
repo = schema.image_repositories.create(
    ImageRepository(name=repo_name),
    mode=CreateMode.if_not_exists)
print("Create image repository:", repo.fully_qualified_name)

repo_url = repo.fetch().repository_url
print("image registry hostname:", repo_url.split("/")[0])
print("image repository url:", repo_url + "/")


#CREATE STAGE IF NOT EXISTS tutorial_stage
#  DIRECTORY = ( ENABLE = true );
stage = schema.stages.create(
    Stage(
        name=stage_name,
        directory_table=StageDirectoryTable(enable=True)),
    mode=CreateMode.if_not_exists)
print("Created stage:", stage.fully_qualified_name)

Python 代码还会打印有关您在向存储库推送镜像时所用存储库(存储库 URL)的有用信息。

3:构建镜像并上传

You download locally the code as described in Tutorial 1, use Docker commands to build the image, and upload it to the image repository in your account.

  1. 创建并运行该单元格以获取镜像注册表的主机名和镜像仓库的 URL。

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    repo =  schema.image_repositories[repo_name]
    
    repo_url = repo.fetch().repository_url
    print("image registry hostname:", repo_url.split("/")[0])
    print("image repository url:", repo_url + "/")

    The Python code retrieves the image repository resource object (repo), accesses the model object, and extracts the repository URL from it.

  2. Follow Tutorial 1 steps 1 and 2 to download the service code, build an image, and upload it to the repository.

  3. 创建并运行该单元格,以验证镜像是否在存储库中。

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    for image in repo.list_images_in_repository():
     print(image.image_path)

    The code enumerates the images from the image repository resource (repo) and prints the image_path for each image.

4:创建服务

创建服务和服务函数,与服务进行通信。

  1. Verify that the compute pool is ready. After you create a compute pool, it takes some time for Snowflake to provision all the nodes. Ensure that the compute pool is ready before creating a service, because service containers execute within the specified compute pool.

创建并运行单元格以获取计算池状态:

import time

session = get_active_session()
session.use_role(user_role_name)
root = Root(session)

cp = root.compute_pools[compute_pool_name]

cpm = cp.fetch()
print(cpm.state, cpm.status_message)
if cpm.state == 'SUSPENDED':
 cp.resume()
while cpm.state in ['STARTING', 'SUSPENDED']:
 time.sleep(5)
 cpm = cp.fetch()
 print(cpm.state, cpm.status_message)

The code fetches the compute pool model (cpm) from the compute pool resource (cp) to retrieve the current compute pool state. If the compute pool is suspended, the code resumes the compute pool. The code loops, pausing for five seconds each time, until the compute pool is no longer in the STARTING or SUSPENDED state.

The last line of output should be “IDLE” or “ACTIVE”, which indicates that the compute pool is ready to run your service. For more information, see Compute pool lifecycle. If the compute pool is not ready, your services can’t start.

  1. 创建并运行单元格,以创建 echo 服务。

    from snowflake.core.service import Service, ServiceSpec
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    repo_url = repo.fetch().repository_url
    
    specification = f"""
     spec:
       containers:
       - name: echo
         image: {repo_url}/my_echo_service_image:latest
         env:
           SERVER_PORT: 8000
           CHARACTER_NAME: Bob
         readinessProbe:
           port: 8000
           path: /healthcheck
       endpoints:
       - name: echoendpoint
         port: 8000
         public: true
    
     """
    echo_service = schema.services.create(Service(
     name=service_name,
     compute_pool=compute_pool_name,
     spec=ServiceSpec(specification),
     min_instances=1,
     max_instances=1),
     mode=CreateMode.if_not_exists)
    print("created service:", echo_service.name)

    The code retrieves the repository URL, as done in the preceding step. The code then creates the echo_service using an inline specification and the image from the specified image repository.

从 Python 代码中可以看出,参数化资源的名称非常简单。以下是创建服务但不使用参数的等效 SQL 命令。

CREATE SERVICE echo_service
  IN COMPUTE POOL tutorial_compute_pool
  FROM SPECIFICATION $$
 spec:
   containers:
   - name: echo
     image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
     env:
       SERVER_PORT: 8000
       CHARACTER_NAME: Bob
     readinessProbe:
       port: 8000
       path: /healthcheck
   endpoints:
   - name: echoendpoint
     port: 8000
     public: true
  $$
  MIN_INSTANCES=1
  MAX_INSTANCES=1;
  1. Run the cell to create a service function (my_echo_function). A service function is one of the ways of using the service.

    from snowflake.core.function import ServiceFunction, FunctionArgument
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    # CREATE FUNCTION my_echo_udf (inputtext VARCHAR)
    #  RETURNS VARCHAR
    #  SERVICE=echo_service
    #  ENDPOINT=echoendpoint
    #  AS '/echo';
    svcfn = schema.functions.create(mode=CreateMode.or_replace,
     function=ServiceFunction(
         name="my_echo_function",
         arguments=[FunctionArgument(name="inputtext", datatype="TEXT")],
         returns="TEXT",
         service=service_name,
         endpoint="echoendpoint",
         path="/echo"))
    print("created service function:", svcfn.name_with_args)

    The code calls the create method on the functions collection of the schema to create the service function (my_echo_function).

5:使用服务

在本部分中,您将按如下方式使用该服务:

  • 调用服务函数。
  • 使用浏览器与服务的公共端点交互。
  1. 调用服务函数。

    svcfn = schema.functions["my_echo_function(TEXT)"]
    print(svcfn.execute(["hello"]))

    Snowflake sends a POST request to the service endpoint (echoendpoint). Upon receiving the request, the service echoes the input string in the response.

输出:

+--------------------------+
| **MY_ECHO_UDF('HELLO!')**|
|------------------------- |
| Bob said hello!          |
+--------------------------+
  1. 从浏览器访问服务公开的公共端点。
    1. 获取公共端点的 URL。

      # helper to check if service is ready and return endpoint url
      def get_ingress_for_endpoint(svc, endpoint):
          for _ in range(10): # only try 10 times
        # Find the target endpoint.
        target_endpoint = None
        for ep in svc.get_endpoints():
            if ep.is_public and ep.name == endpoint:
                target_endpoint = ep
                break;
        else:
            print(f"Endpoint {endpoint} not found")
            return None
      
        # Return endpoint URL or wait for it to be provisioned.
        if target_endpoint.ingress_url.startswith("Endpoints provisioning "):
            print(f"{target_endpoint.ingress_url} is still in provisioning. Wait for 10 seconds.")
            time.sleep(10)
        else:
            return target_endpoint.ingress_url
          print("Timed out waiting for endpoint to become available")
      
      endpoint_url = get_ingress_for_endpoint(echo_service, "echoendpoint")
      print(f"https://{endpoint_url}/ui")
    2. Paste the printed URL in a browser window. This causes the service to execute the ui() function (see echo_service.py).

请注意,首次访问端点 URL 时,系统将要求您登录 Snowflake。对于此测试,请使用用于创建服务的同一用户,以确保用户具有必要的权限。

Web form to communicate with echo service.
  1. 输入 框中输入字符串“Hello”,然后按 Return Web form showing response from the Echo service.

6:创建作业

In Tutorial 2, you use the SQL interface to create a Snowpark Container Services job. In this section, you create the same job using the Snowflake Python APIs.

  1. 创建并运行该单元格以获取镜像注册表的主机名和镜像仓库的 URL。

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    repo =  schema.image_repositories[repo_name]
    
    repo_url = repo.fetch().repository_url
    print("image registry hostname:", repo_url.split("/")[0])
    print("image repository url:", repo_url + "/")

    The Python code retrieves the image repository resource object (repo), accesses the model object, and extracts the repository URL from it.

  2. Follow Tutorial 2 steps 1 and 2 to download the service code, build an image, and upload it to the repository.

  3. 创建并运行该单元格,以验证镜像是否在存储库中。

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    for image in repo.list_images_in_repository():
     print(image.image_path)

    The code enumerates the images from the image repository resource (repo) and prints the image_path for each image.

  4. 创建并运行单元格,以创建作业。

    from snowflake.core.service import JobService, ServiceSpec
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    repo_url = repo.fetch().repository_url
    
    job_name = "test_job"
    
    # cleanup previous job if present.
    schema.services[job_name].drop()(if_exists=True)
    
    specification = f"""
     spec:
       containers:
       - name: main
         image: {repo_url}/my_job_image:latest
         env:
           SNOWFLAKE_WAREHOUSE: {warehouse_name}
         args:
         - "--query=select current_time() as time,'hello'"
         - "--result_table=results"
     """
    job = schema.services.execute_job(JobService(
     name=job_name,
     compute_pool=compute_pool_name,
     spec=ServiceSpec(specification)))
    print("executed job:", job.name, "status:", job.fetch().status)
    
    print("job logs:")
    print(job.get_service_logs(0, "main"))

该作业运行给定查询并将结果存储在表中。

  1. 运行以下单元格以查看写入表的结果。此代码使用 Snowpark Python 查询该表。
    session = get_active_session()
    session.use_role(user_role_name)
    # show that above job wrote to results table
    session.sql(f"select * from {database_name}.{schema_name}.results").collect()

7:清理

  1. Stop the service and drop it. After dropping the service, Snowflake by default automatically suspends the compute pool (assuming there are no other services and job services running). For more information, see compute pool lifecycle.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    # now let's clean up
    
    schema.functions["my_echo_function(TEXT)"].drop()
    schema.services[service_name].drop()
  2. 删除镜像存储库以避免支付存储费用。请注意,如果存储库中存储了任何其他镜像,系统会将其删除。

    schema.image_repositories[repo_name].drop()
  3. 删除架构。删除架构还会删除该架构中的所有对象。对于本教程,其中包括您创建的服务、函数、镜像存储库和暂存区。

    root.databases[database_name].schemas[schema_name].drop()
  4. 您还可以显式暂停计算池,而不是等待 Snowflake 暂停计算池。在这种情况下,Snowflake 会暂停任何正在运行的服务,并等待任何正在运行的作业完成,然后暂停计算池。

    root.compute_pool[compute_pool_name].suspend()

下一步是什么?

This tutorial demonstrates using Snowflake Python APIs to create and manage Snowpark Container Services services and jobs. For more information about the Snowflake Python APIs, see Snowflake Python APIs: Managing Snowflake objects with Python.