教程 3:使用 Snowflake Python APIs 创建服务和作业

简介

教程 1教程 2 中,您将使用 SQL 界面来创建 Snowpark Container Services 服务和作业。在本教程中,您将使用 Snowflake Python APIs 创建相同的服务和作业,从而使用 Snowflake Python APIs 进行探索,管理 Snowpark Container Services 资源。

本教程使用 Snowflake 笔记本 来执行 Python 代码,但该代码独立于笔记本,您可以在其他环境中执行该代码。

1:初始配置

在此初始配置中,您将创建一个 Snowflake 笔记本,导入所需的库,并定义单元格在后续步骤中使用的常量。

  1. 创建 Snowflake 笔记本。

    1. 创建笔记本。有关说明,请参阅 创建新笔记本。请注意,您在 UI(在仓库上运行或在容器上运行)中选择的 Python 环境 无关紧要。

    2. Packages 下拉菜单中,选择“snowflake”包并安装最新版本的 Snowflake Python APIs 库。

    3. (可选)默认情况下,删除笔记本中提供的单元格。按照本教程中的步骤操作时,将 Python 单元格添加到笔记本中。

  2. 创建并运行单元格以导入本教程中许多单元格使用的 Python 库。

    from snowflake.snowpark.context import get_active_session
    from snowflake.core import Root
    from snowflake.core import CreateMode
    
    Copy
  3. 创建并运行单元格以定义在后续单元格中使用的常量。下面提供的值与教程 1 和 2 匹配。您可以选择更改这些值。

    current_user = get_active_session().get_current_user()
    user_role_name = "test_role"
    compute_pool_name = "tutorial_compute_pool"
    warehouse_name = "tutorial_warehouse"
    database_name = "tutorial_db"
    schema_name = "data_schema"
    repo_name = "tutorial_repository"
    stage_name = "tutorial_stage"
    service_name = "echo_service"
    print("configured!")
    
    Copy

2:创建 Snowflake 对象

在创建服务前,您需要 Snowflake 对象,例如数据库、用户、角色、计算池和镜像仓库。其中一些对象是账户范围的对象,需要管理权限才能创建。创建的对象的名称在上一步中定义。

2.1:创建账户范围的 Snowflake 对象

以下 Python 代码将创建这些对象:

  • 角色 (test_role)。您向此角色授予创建和使用服务所需的所有权限。在代码中,将此角色授予当前用户,使用户能够创建和使用该服务。

  • 数据库 (tutorial_db)。在下一步中,您将在此数据库中创建架构。

  • 计算池 (tutorial_compute_pool)。您的服务容器在此计算池中执行。

  • 仓库 (tutorial_warehouse)。当服务连接到 Snowflake 并执行查询时,此仓库用于执行查询。

创建并运行该单元格,以使用 ACCOUNTADMIN 角色创建这些账户范围的对象。请注意,仅当资源不存在时,脚本才会创建资源。代码中的注释显示等效的 SQL 语句。

from snowflake.core.compute_pool import ComputePool
from snowflake.core.database import Database
from snowflake.core.grant import Grant, Grantees, Privileges, Securable, Securables
from snowflake.core.role import Role
from snowflake.core.warehouse import Warehouse

session = get_active_session()
session.use_role("ACCOUNTADMIN")
root = Root(session)

# CREATE ROLE test_role;
root.roles.create(
    Role(name=user_role_name),
    mode=CreateMode.if_not_exists)
print(f"Created role:", user_role_name)

# GRANT ROLE test_role TO USER <user_name>
root.grants.grant(Grant(
    securable=Securables.role(user_role_name),
    grantee=Grantees.user(name=current_user),
    ))

# CREATE COMPUTE POOL IF NOT EXISTS tutorial_compute_pool
#   MIN_NODES = 1 MAX_NODES = 1
#   INSTANCE_FAMILY = CPU_X64_XS
root.compute_pools.create(
    mode=CreateMode.if_not_exists,
    compute_pool=ComputePool(
        name=compute_pool_name,
        instance_family="CPU_X64_XS",
        min_nodes=1,
        max_nodes=2,
    )
)

# GRANT USAGE, OPERATE, MONITOR ON COMPUTE POOL tutorial_compute_pool TO ROLE test_role
root.grants.grant(Grant(
    privileges=[Privileges.usage, Privileges.operate, Privileges.monitor],
    securable=Securables.compute_pool(compute_pool_name),
    grantee=Grantees.role(name=user_role_name)
    ))

print(f"Created compute pool:", compute_pool_name)

# CREATE DATABASE IF NOT EXISTS tutorial_db;
root.databases.create(
    Database(name=database_name),
    mode=CreateMode.if_not_exists)

# GRANT ALL ON DATABASE tutorial_db TO ROLE test_role;
root.grants.grant(Grant(
    privileges=[Privileges.all_privileges],
    securable=Securables.database(database_name),
    grantee=Grantees.role(name=user_role_name),
    ))

print("Created database:", database_name)

# CREATE OR REPLACE WAREHOUSE tutorial_warehouse WITH WAREHOUSE_SIZE='X-SMALL';
root.warehouses.create(
    Warehouse(name=warehouse_name, warehouse_size="X-SMALL"),
    mode=CreateMode.if_not_exists)

# GRANT USAGE ON WAREHOUSE tutorial_warehouse TO ROLE test_role;
root.grants.grant(Grant(
    privileges=[Privileges.usage],
    grantee=Grantees.role(name=user_role_name),
    securable=Securables.warehouse(warehouse_name)
    ))

print("Created warehouse:", warehouse_name)

# GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE test_role
root.grants.grant(Grant(
    privileges=[Privileges.bind_service_endpoint],
    securable=Securables.current_account,
    grantee=Grantees.role(name=user_role_name)
    ))

print("Done: GRANT BIND SERVICE ENDPOINT")
Copy

在创建资源时,代码还会向角色 (test_role) 授予所需权限,以便角色可以使用这些资源。此外,请注意,您在本教程中创建的 echo 服务公开了一个公共端点。此公共端点允许您账户中的其他用户从公共 Web(入口)访问服务。要创建具有公共端点的服务,角色 (test_role) 必须具有对账户的 BIND SERVICE ENDPOINT 权限。

2.2 创建架构范围的对象

本部分中的 Python 代码使用 test_role 角色创建架构和该架构中的对象。您不需要管理权限即可创建这些资源。

  • 架构 (data_schema)。您可以在此架构中创建镜像仓库、服务和作业。

  • 镜像仓库 (tutorial_repository)。您将应用程序镜像存储在此存储库中。

  • 暂存区 (tutorial_stage)。创建的暂存区仅供说明使用。虽然本教程中未进行演示,但暂存区可用于将数据传递到服务或从服务中收集数据。

请注意,仅当资源不存在时,脚本才会创建资源。

from snowflake.core.image_repository import ImageRepository
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage, StageDirectoryTable

session = get_active_session()
session.use_role(user_role_name)
root = Root(session)

# CREATE SCHEMA IF NOT EXISTS {schema_name}
schema = root.databases[database_name].schemas.create(
    Schema(name=schema_name),
    mode=CreateMode.if_not_exists)
print("Created schema:", schema.name)

# CREATE IMAGE REPOSITORY IF NOT EXISTS {repo}
repo = schema.image_repositories.create(
    ImageRepository(name=repo_name),
    mode=CreateMode.if_not_exists)
print("Create image repository:", repo.fully_qualified_name)

repo_url = repo.fetch().repository_url
print("image registry hostname:", repo_url.split("/")[0])
print("image repository url:", repo_url + "/")


#CREATE STAGE IF NOT EXISTS tutorial_stage
#  DIRECTORY = ( ENABLE = true );
stage = schema.stages.create(
    Stage(
        name=stage_name,
        directory_table=StageDirectoryTable(enable=True)),
    mode=CreateMode.if_not_exists)
print("Created stage:", stage.fully_qualified_name)
Copy

Python 代码还会打印有关您在向存储库推送镜像时所用存储库(存储库 URL)的有用信息。

3:构建镜像并上传

您可以按照 教程 1 中所述在本地下载代码,使用 Docker 命令构建镜像,并将其上传到您账户中的镜像存储库。

  1. 创建并运行该单元格以获取镜像注册表的主机名和镜像仓库的 URL。

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    repo =  schema.image_repositories[repo_name]
    
    repo_url = repo.fetch().repository_url
    print("image registry hostname:", repo_url.split("/")[0])
    print("image repository url:", repo_url + "/")
    
    Copy

    Python 代码检索镜像存储库 资源 对象 (repo),访问 模型 对象,并从中提取存储库 URL。

  2. 按照 教程 1 步骤 1 和 2 下载服务代码,构建镜像,然后将其上传到存储库。

  3. 创建并运行该单元格,以验证镜像是否在存储库中。

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    for image in repo.list_images_in_repository():
        print(image.image_path)
    
    Copy

    该代码枚举镜像存储库资源 (repo) 中的镜像并打印每个镜像的 image_path

4:创建服务

创建服务和服务函数,与服务进行通信。

  1. 验证计算池是否已准备就绪。创建计算池后,Snowflake 需要一些时间来预置所有节点。在创建服务之前,请确保计算池已准备就绪,因为服务容器在指定的计算池中执行。

    创建并运行单元格以获取计算池状态:

    import time
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    cp = root.compute_pools[compute_pool_name]
    
    cpm = cp.fetch()
    print(cpm.state, cpm.status_message)
    if cpm.state == 'SUSPENDED':
        cp.resume()
    while cpm.state in ['STARTING', 'SUSPENDED']:
        time.sleep(5)
        cpm = cp.fetch()
        print(cpm.state, cpm.status_message)
    
    Copy

    该代码从计算池资源 (cp) 中提取计算池模型 (cpm),以检索当前计算池状态。如果计算池已暂停,则代码将恢复计算池。代码循环,每次暂停 5 秒,直到计算池不再处于 STARTING 或 SUSPENDED 状态。

    输出的最后一行应该是“IDLE”或“ACTIVE”,这表示计算池已准备好运行服务。有关更多信息,请参阅 计算池生命周期。如果计算池尚未准备就绪,则服务无法启动。

  2. 创建并运行单元格,以创建 echo 服务。

    from snowflake.core.service import Service, ServiceSpec
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    repo_url = repo.fetch().repository_url
    
    specification = f"""
        spec:
          containers:
          - name: echo
            image: {repo_url}/my_echo_service_image:latest
            env:
              SERVER_PORT: 8000
              CHARACTER_NAME: Bob
            readinessProbe:
              port: 8000
              path: /healthcheck
          endpoints:
          - name: echoendpoint
            port: 8000
            public: true
    
        """
    echo_service = schema.services.create(Service(
        name=service_name,
        compute_pool=compute_pool_name,
        spec=ServiceSpec(specification),
        min_instances=1,
        max_instances=1),
        mode=CreateMode.if_not_exists)
    print("created service:", echo_service.name)
    
    Copy

    代码检索存储库 URL,如上一步所示。然后,代码会使用内联规范和指定镜像存储库中的镜像创建 echo_service

    从 Python 代码中可以看出,参数化资源的名称非常简单。以下是创建服务但不使用参数的等效 SQL 命令。

    CREATE SERVICE echo_service
      IN COMPUTE POOL tutorial_compute_pool
      FROM SPECIFICATION $$
        spec:
          containers:
          - name: echo
            image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
            env:
              SERVER_PORT: 8000
              CHARACTER_NAME: Bob
            readinessProbe:
              port: 8000
              path: /healthcheck
          endpoints:
          - name: echoendpoint
            port: 8000
            public: true
      $$
      MIN_INSTANCES=1
      MAX_INSTANCES=1;
    
    Copy
  3. 运行单元格以创建服务函数 (my_echo_function)。服务函数是使用服务的一种方式。

    from snowflake.core.function import ServiceFunction, FunctionArgument
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    # CREATE FUNCTION my_echo_udf (inputtext VARCHAR)
    #  RETURNS VARCHAR
    #  SERVICE=echo_service
    #  ENDPOINT=echoendpoint
    #  AS '/echo';
    svcfn = schema.functions.create(mode=CreateMode.or_replace,
        function=ServiceFunction(
            name="my_echo_function",
            arguments=[FunctionArgument(name="inputtext", datatype="TEXT")],
            returns="TEXT",
            service=service_name,
            endpoint="echoendpoint",
            path="/echo"))
    print("created service function:", svcfn.name_with_args)
    
    Copy

    该代码在 schemafunctions 集合上调用 create 方法,创建服务函数 (my_echo_function)。

5:使用服务

在本部分中,您将按如下方式使用该服务:

  • 调用服务函数。

  • 使用浏览器与服务的公共端点交互。

  1. 调用服务函数。

    svcfn = schema.functions["my_echo_function(TEXT)"]
    print(svcfn.execute(["hello"]))
    
    Copy

    Snowflake 向服务端点 (echoendpoint) 发送 POST 请求。收到请求后,服务会在响应中回显输入字符串。

    输出:

    +--------------------------+
    | **MY_ECHO_UDF('HELLO!')**|
    |------------------------- |
    | Bob said hello!          |
    +--------------------------+
    
  2. 从浏览器访问服务公开的公共端点。

    1. 获取公共端点的 URL。

      # helper to check if service is ready and return endpoint url
      def get_ingress_for_endpoint(svc, endpoint):
          for _ in range(10): # only try 10 times
              # Find the target endpoint.
              target_endpoint = None
              for ep in svc.get_endpoints():
                  if ep.is_public and ep.name == endpoint:
                      target_endpoint = ep
                      break;
              else:
                  print(f"Endpoint {endpoint} not found")
                  return None
      
              # Return endpoint URL or wait for it to be provisioned.
              if target_endpoint.ingress_url.startswith("Endpoints provisioning "):
                  print(f"{target_endpoint.ingress_url} is still in provisioning. Wait for 10 seconds.")
                  time.sleep(10)
              else:
                  return target_endpoint.ingress_url
          print("Timed out waiting for endpoint to become available")
      
      endpoint_url = get_ingress_for_endpoint(echo_service, "echoendpoint")
      print(f"https://{endpoint_url}/ui")
      
      Copy
    2. 在浏览器窗口中粘贴打印的 URL。这样会导致服务执行 ui() 函数(请参阅 echo_service.py)。

      请注意,首次访问端点 URL 时,系统将要求您登录 Snowflake。对于此测试,请使用用于创建服务的同一用户,以确保用户具有必要的权限。

      用于与 Echo 服务进行通信的 Web 表单。
    3. 输入 框中输入字符串“Hello”,然后按 Return

      用于显示 Echo 服务的响应的 Web 表单。

6:创建作业

在教程 2 中,您将使用 SQL 界面创建 Snowpark Container Services 作业。在本部分中,您将使用 Snowflake Python APIs 创建同样的作业。

  1. 创建并运行该单元格以获取镜像注册表的主机名和镜像仓库的 URL。

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    repo =  schema.image_repositories[repo_name]
    
    repo_url = repo.fetch().repository_url
    print("image registry hostname:", repo_url.split("/")[0])
    print("image repository url:", repo_url + "/")
    
    Copy

    Python 代码检索镜像存储库资源对象 (repo),访问模型对象,并从中提取存储库 URL。

  2. 按照 教程 2 步骤 1 和 2 下载服务代码,构建镜像,然后将其上传到存储库。

  3. 创建并运行该单元格,以验证镜像是否在存储库中。

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    for image in repo.list_images_in_repository():
        print(image.image_path)
    
    Copy

    该代码枚举镜像存储库资源 (repo) 中的镜像并打印每个镜像的 image_path

  4. 创建并运行单元格,以创建作业。

    from snowflake.core.service import JobService, ServiceSpec
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    repo_url = repo.fetch().repository_url
    
    job_name = "test_job"
    
    # cleanup previous job if present.
    schema.services[job_name].drop()(if_exists=True)
    
    specification = f"""
        spec:
          containers:
          - name: main
            image: {repo_url}/my_job_image:latest
            env:
              SNOWFLAKE_WAREHOUSE: {warehouse_name}
            args:
            - "--query=select current_time() as time,'hello'"
            - "--result_table=results"
        """
    job = schema.services.execute_job(JobService(
        name=job_name,
        compute_pool=compute_pool_name,
        spec=ServiceSpec(specification)))
    print("executed job:", job.name, "status:", job.fetch().status)
    
    print("job logs:")
    print(job.get_service_logs(0, "main"))
    
    Copy

    该作业运行给定查询并将结果存储在表中。

  5. 运行以下单元格以查看写入表的结果。此代码使用 Snowpark Python 查询该表。

    session = get_active_session()
    session.use_role(user_role_name)
    # show that above job wrote to results table
    session.sql(f"select * from {database_name}.{schema_name}.results").collect()
    
    Copy

7:清理

  1. 停止服务并将其删除。删除服务后,默认情况下,Snowflake 会自动暂停计算池(假设没有其他服务和作业服务正在运行)。有关更多信息,请参阅 计算池生命周期

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    # now let's clean up
    
    schema.functions["my_echo_function(TEXT)"].drop()
    schema.services[service_name].drop()
    
    Copy
  2. 删除镜像存储库以避免支付存储费用。请注意,如果存储库中存储了任何其他镜像,系统会将其删除。

    schema.image_repositories[repo_name].drop()
    
    Copy
  3. 删除架构。删除架构还会删除该架构中的所有对象。对于本教程,其中包括您创建的服务、函数、镜像存储库和暂存区。

    root.databases[database_name].schemas[schema_name].drop()
    
    Copy
  4. 您还可以显式暂停计算池,而不是等待 Snowflake 暂停计算池。在这种情况下,Snowflake 会暂停任何正在运行的服务,并等待任何正在运行的作业完成,然后暂停计算池。

    root.compute_pool[compute_pool_name].suspend()
    
    Copy

下一步是什么?

本教程演示如何使用 Snowflake Python APIs 创建和管理 Snowpark Container Services 服务和作业。有关 Snowflake Python APIs 的更多信息,请参阅 Snowflake Python APIs:使用 Python 管理 Snowflake 对象

语言: 中文