教程 3:使用 Snowflake Python APIs 创建服务和作业¶
简介¶
在 教程 1 和 教程 2 中,您将使用 SQL 界面来创建 Snowpark Container Services 服务和作业。在本教程中,您将使用 Snowflake Python APIs 创建相同的服务和作业,从而使用 Snowflake Python APIs 进行探索,管理 Snowpark Container Services 资源。
本教程使用 Snowflake 笔记本 来执行 Python 代码,但该代码独立于笔记本,您可以在其他环境中执行该代码。
1:初始配置¶
在此初始配置中,您将创建一个 Snowflake 笔记本,导入所需的库,并定义单元格在后续步骤中使用的常量。
创建 Snowflake 笔记本。
创建笔记本。有关说明,请参阅 创建新笔记本。请注意,您在 UI(在仓库上运行或在容器上运行)中选择的 Python 环境 无关紧要。
从 Packages 下拉菜单中,选择“snowflake”包并安装最新版本的 Snowflake Python APIs 库。
(可选)默认情况下,删除笔记本中提供的单元格。按照本教程中的步骤操作时,将 Python 单元格添加到笔记本中。
创建并运行单元格以导入本教程中许多单元格使用的 Python 库。
from snowflake.snowpark.context import get_active_session from snowflake.core import Root from snowflake.core import CreateMode
创建并运行单元格以定义在后续单元格中使用的常量。下面提供的值与教程 1 和 2 匹配。您可以选择更改这些值。
current_user = get_active_session().get_current_user() user_role_name = "test_role" compute_pool_name = "tutorial_compute_pool" warehouse_name = "tutorial_warehouse" database_name = "tutorial_db" schema_name = "data_schema" repo_name = "tutorial_repository" stage_name = "tutorial_stage" service_name = "echo_service" print("configured!")
2:创建 Snowflake 对象¶
在创建服务前,您需要 Snowflake 对象,例如数据库、用户、角色、计算池和镜像仓库。其中一些对象是账户范围的对象,需要管理权限才能创建。创建的对象的名称在上一步中定义。
2.1:创建账户范围的 Snowflake 对象¶
以下 Python 代码将创建这些对象:
角色 (
test_role
)。您向此角色授予创建和使用服务所需的所有权限。在代码中,将此角色授予当前用户,使用户能够创建和使用该服务。数据库 (
tutorial_db
)。在下一步中,您将在此数据库中创建架构。计算池 (
tutorial_compute_pool
)。您的服务容器在此计算池中执行。仓库 (
tutorial_warehouse
)。当服务连接到 Snowflake 并执行查询时,此仓库用于执行查询。
创建并运行该单元格,以使用 ACCOUNTADMIN 角色创建这些账户范围的对象。请注意,仅当资源不存在时,脚本才会创建资源。代码中的注释显示等效的 SQL 语句。
from snowflake.core.compute_pool import ComputePool
from snowflake.core.database import Database
from snowflake.core.grant import Grant, Grantees, Privileges, Securable, Securables
from snowflake.core.role import Role
from snowflake.core.warehouse import Warehouse
session = get_active_session()
session.use_role("ACCOUNTADMIN")
root = Root(session)
# CREATE ROLE test_role;
root.roles.create(
Role(name=user_role_name),
mode=CreateMode.if_not_exists)
print(f"Created role:", user_role_name)
# GRANT ROLE test_role TO USER <user_name>
root.grants.grant(Grant(
securable=Securables.role(user_role_name),
grantee=Grantees.user(name=current_user),
))
# CREATE COMPUTE POOL IF NOT EXISTS tutorial_compute_pool
# MIN_NODES = 1 MAX_NODES = 1
# INSTANCE_FAMILY = CPU_X64_XS
root.compute_pools.create(
mode=CreateMode.if_not_exists,
compute_pool=ComputePool(
name=compute_pool_name,
instance_family="CPU_X64_XS",
min_nodes=1,
max_nodes=2,
)
)
# GRANT USAGE, OPERATE, MONITOR ON COMPUTE POOL tutorial_compute_pool TO ROLE test_role
root.grants.grant(Grant(
privileges=[Privileges.usage, Privileges.operate, Privileges.monitor],
securable=Securables.compute_pool(compute_pool_name),
grantee=Grantees.role(name=user_role_name)
))
print(f"Created compute pool:", compute_pool_name)
# CREATE DATABASE IF NOT EXISTS tutorial_db;
root.databases.create(
Database(name=database_name),
mode=CreateMode.if_not_exists)
# GRANT ALL ON DATABASE tutorial_db TO ROLE test_role;
root.grants.grant(Grant(
privileges=[Privileges.all_privileges],
securable=Securables.database(database_name),
grantee=Grantees.role(name=user_role_name),
))
print("Created database:", database_name)
# CREATE OR REPLACE WAREHOUSE tutorial_warehouse WITH WAREHOUSE_SIZE='X-SMALL';
root.warehouses.create(
Warehouse(name=warehouse_name, warehouse_size="X-SMALL"),
mode=CreateMode.if_not_exists)
# GRANT USAGE ON WAREHOUSE tutorial_warehouse TO ROLE test_role;
root.grants.grant(Grant(
privileges=[Privileges.usage],
grantee=Grantees.role(name=user_role_name),
securable=Securables.warehouse(warehouse_name)
))
print("Created warehouse:", warehouse_name)
# GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE test_role
root.grants.grant(Grant(
privileges=[Privileges.bind_service_endpoint],
securable=Securables.current_account,
grantee=Grantees.role(name=user_role_name)
))
print("Done: GRANT BIND SERVICE ENDPOINT")
在创建资源时,代码还会向角色 (test_role
) 授予所需权限,以便角色可以使用这些资源。此外,请注意,您在本教程中创建的 echo 服务公开了一个公共端点。此公共端点允许您账户中的其他用户从公共 Web(入口)访问服务。要创建具有公共端点的服务,角色 (test_role
) 必须具有对账户的 BIND SERVICE ENDPOINT
权限。
2.2 创建架构范围的对象¶
本部分中的 Python 代码使用 test_role
角色创建架构和该架构中的对象。您不需要管理权限即可创建这些资源。
架构 (
data_schema
)。您可以在此架构中创建镜像仓库、服务和作业。镜像仓库 (
tutorial_repository
)。您将应用程序镜像存储在此存储库中。暂存区 (
tutorial_stage
)。创建的暂存区仅供说明使用。虽然本教程中未进行演示,但暂存区可用于将数据传递到服务或从服务中收集数据。
请注意,仅当资源不存在时,脚本才会创建资源。
from snowflake.core.image_repository import ImageRepository
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage, StageDirectoryTable
session = get_active_session()
session.use_role(user_role_name)
root = Root(session)
# CREATE SCHEMA IF NOT EXISTS {schema_name}
schema = root.databases[database_name].schemas.create(
Schema(name=schema_name),
mode=CreateMode.if_not_exists)
print("Created schema:", schema.name)
# CREATE IMAGE REPOSITORY IF NOT EXISTS {repo}
repo = schema.image_repositories.create(
ImageRepository(name=repo_name),
mode=CreateMode.if_not_exists)
print("Create image repository:", repo.fully_qualified_name)
repo_url = repo.fetch().repository_url
print("image registry hostname:", repo_url.split("/")[0])
print("image repository url:", repo_url + "/")
#CREATE STAGE IF NOT EXISTS tutorial_stage
# DIRECTORY = ( ENABLE = true );
stage = schema.stages.create(
Stage(
name=stage_name,
directory_table=StageDirectoryTable(enable=True)),
mode=CreateMode.if_not_exists)
print("Created stage:", stage.fully_qualified_name)
Python 代码还会打印有关您在向存储库推送镜像时所用存储库(存储库 URL)的有用信息。
3:构建镜像并上传¶
您可以按照 教程 1 中所述在本地下载代码,使用 Docker 命令构建镜像,并将其上传到您账户中的镜像存储库。
创建并运行该单元格以获取镜像注册表的主机名和镜像仓库的 URL。
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url print("image registry hostname:", repo_url.split("/")[0]) print("image repository url:", repo_url + "/")
按照 教程 1 步骤 1 和 2 下载服务代码,构建镜像,然后将其上传到存储库。
创建并运行该单元格,以验证镜像是否在存储库中。
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] for image in repo.list_images_in_repository(): print(image.image_path)
该代码枚举镜像存储库资源 (
repo
) 中的镜像并打印每个镜像的image_path
。
4:创建服务¶
创建服务和服务函数,与服务进行通信。
验证计算池是否已准备就绪。创建计算池后,Snowflake 需要一些时间来预置所有节点。在创建服务之前,请确保计算池已准备就绪,因为服务容器在指定的计算池中执行。
创建并运行单元格以获取计算池状态:
import time session = get_active_session() session.use_role(user_role_name) root = Root(session) cp = root.compute_pools[compute_pool_name] cpm = cp.fetch() print(cpm.state, cpm.status_message) if cpm.state == 'SUSPENDED': cp.resume() while cpm.state in ['STARTING', 'SUSPENDED']: time.sleep(5) cpm = cp.fetch() print(cpm.state, cpm.status_message)
该代码从计算池资源 (
cp
) 中提取计算池模型 (cpm
),以检索当前计算池状态。如果计算池已暂停,则代码将恢复计算池。代码循环,每次暂停 5 秒,直到计算池不再处于 STARTING 或 SUSPENDED 状态。输出的最后一行应该是“IDLE”或“ACTIVE”,这表示计算池已准备好运行服务。有关更多信息,请参阅 计算池生命周期。如果计算池尚未准备就绪,则服务无法启动。
创建并运行单元格,以创建 echo 服务。
from snowflake.core.service import Service, ServiceSpec session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url specification = f""" spec: containers: - name: echo image: {repo_url}/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true """ echo_service = schema.services.create(Service( name=service_name, compute_pool=compute_pool_name, spec=ServiceSpec(specification), min_instances=1, max_instances=1), mode=CreateMode.if_not_exists) print("created service:", echo_service.name)
代码检索存储库 URL,如上一步所示。然后,代码会使用内联规范和指定镜像存储库中的镜像创建
echo_service
。从 Python 代码中可以看出,参数化资源的名称非常简单。以下是创建服务但不使用参数的等效 SQL 命令。
CREATE SERVICE echo_service IN COMPUTE POOL tutorial_compute_pool FROM SPECIFICATION $$ spec: containers: - name: echo image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true $$ MIN_INSTANCES=1 MAX_INSTANCES=1;
运行单元格以创建服务函数 (
my_echo_function
)。服务函数是使用服务的一种方式。from snowflake.core.function import ServiceFunction, FunctionArgument session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] # CREATE FUNCTION my_echo_udf (inputtext VARCHAR) # RETURNS VARCHAR # SERVICE=echo_service # ENDPOINT=echoendpoint # AS '/echo'; svcfn = schema.functions.create(mode=CreateMode.or_replace, function=ServiceFunction( name="my_echo_function", arguments=[FunctionArgument(name="inputtext", datatype="TEXT")], returns="TEXT", service=service_name, endpoint="echoendpoint", path="/echo")) print("created service function:", svcfn.name_with_args)
该代码在
schema
的functions
集合上调用create
方法,创建服务函数 (my_echo_function
)。
5:使用服务¶
在本部分中,您将按如下方式使用该服务:
调用服务函数。
使用浏览器与服务的公共端点交互。
调用服务函数。
svcfn = schema.functions["my_echo_function(TEXT)"] print(svcfn.execute(["hello"]))
Snowflake 向服务端点 (
echoendpoint
) 发送 POST 请求。收到请求后,服务会在响应中回显输入字符串。输出:
+--------------------------+ | **MY_ECHO_UDF('HELLO!')**| |------------------------- | | Bob said hello! | +--------------------------+
从浏览器访问服务公开的公共端点。
获取公共端点的 URL。
# helper to check if service is ready and return endpoint url def get_ingress_for_endpoint(svc, endpoint): for _ in range(10): # only try 10 times # Find the target endpoint. target_endpoint = None for ep in svc.get_endpoints(): if ep.is_public and ep.name == endpoint: target_endpoint = ep break; else: print(f"Endpoint {endpoint} not found") return None # Return endpoint URL or wait for it to be provisioned. if target_endpoint.ingress_url.startswith("Endpoints provisioning "): print(f"{target_endpoint.ingress_url} is still in provisioning. Wait for 10 seconds.") time.sleep(10) else: return target_endpoint.ingress_url print("Timed out waiting for endpoint to become available") endpoint_url = get_ingress_for_endpoint(echo_service, "echoendpoint") print(f"https://{endpoint_url}/ui")
在浏览器窗口中粘贴打印的 URL。这样会导致服务执行
ui()
函数(请参阅echo_service.py
)。请注意,首次访问端点 URL 时,系统将要求您登录 Snowflake。对于此测试,请使用用于创建服务的同一用户,以确保用户具有必要的权限。
在 输入 框中输入字符串“Hello”,然后按 Return。
6:创建作业¶
在教程 2 中,您将使用 SQL 界面创建 Snowpark Container Services 作业。在本部分中,您将使用 Snowflake Python APIs 创建同样的作业。
创建并运行该单元格以获取镜像注册表的主机名和镜像仓库的 URL。
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url print("image registry hostname:", repo_url.split("/")[0]) print("image repository url:", repo_url + "/")
Python 代码检索镜像存储库资源对象 (
repo
),访问模型对象,并从中提取存储库 URL。按照 教程 2 步骤 1 和 2 下载服务代码,构建镜像,然后将其上传到存储库。
创建并运行该单元格,以验证镜像是否在存储库中。
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] for image in repo.list_images_in_repository(): print(image.image_path)
该代码枚举镜像存储库资源 (
repo
) 中的镜像并打印每个镜像的image_path
。创建并运行单元格,以创建作业。
from snowflake.core.service import JobService, ServiceSpec session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url job_name = "test_job" # cleanup previous job if present. schema.services[job_name].drop()(if_exists=True) specification = f""" spec: containers: - name: main image: {repo_url}/my_job_image:latest env: SNOWFLAKE_WAREHOUSE: {warehouse_name} args: - "--query=select current_time() as time,'hello'" - "--result_table=results" """ job = schema.services.execute_job(JobService( name=job_name, compute_pool=compute_pool_name, spec=ServiceSpec(specification))) print("executed job:", job.name, "status:", job.fetch().status) print("job logs:") print(job.get_service_logs(0, "main"))
该作业运行给定查询并将结果存储在表中。
运行以下单元格以查看写入表的结果。此代码使用 Snowpark Python 查询该表。
session = get_active_session() session.use_role(user_role_name) # show that above job wrote to results table session.sql(f"select * from {database_name}.{schema_name}.results").collect()
7:清理¶
停止服务并将其删除。删除服务后,默认情况下,Snowflake 会自动暂停计算池(假设没有其他服务和作业服务正在运行)。有关更多信息,请参阅 计算池生命周期。
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] # now let's clean up schema.functions["my_echo_function(TEXT)"].drop() schema.services[service_name].drop()
删除镜像存储库以避免支付存储费用。请注意,如果存储库中存储了任何其他镜像,系统会将其删除。
schema.image_repositories[repo_name].drop()
删除架构。删除架构还会删除该架构中的所有对象。对于本教程,其中包括您创建的服务、函数、镜像存储库和暂存区。
root.databases[database_name].schemas[schema_name].drop()
您还可以显式暂停计算池,而不是等待 Snowflake 暂停计算池。在这种情况下,Snowflake 会暂停任何正在运行的服务,并等待任何正在运行的作业完成,然后暂停计算池。
root.compute_pool[compute_pool_name].suspend()
下一步是什么?¶
本教程演示如何使用 Snowflake Python APIs 创建和管理 Snowpark Container Services 服务和作业。有关 Snowflake Python APIs 的更多信息,请参阅 Snowflake Python APIs:使用 Python 管理 Snowflake 对象。