使用 Python 管理 Snowpark Container Services(包括服务函数)¶
您可以使用 Python 管理 Snowpark 容器服务,这是一种完全托管的容器服务,您可以通过它来部署、管理和扩展容器化应用程序。有关 Snowpark 容器服务的概述,请参阅 关于 Snowpark Container Services。
使用 Snowflake Python APIs,您可以管理计算池、镜像存储库和服务。
先决条件¶
在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root
对象以使用 Snowflake Python APIs 的代码。
例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
代码可通过生成的 Session
对象创建 Root
对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python APIs 连接到 Snowflake。
管理计算池¶
您可以管理计算池,它是虚拟机 (VM) 节点的集合,在这些节点上 Snowflake 运行 Snowpark 容器服务作业和服务。
Snowflake Python APIs 使用两种不同类型来表示计算池:
ComputePool
:显示计算池的属性,例如其仓库、最大节点和最小节点,以及自动恢复和自动暂停设置。ComputePoolResource
:显示对计算池执行操作的方法,例如提取相应的ComputePool
对象以及如何暂停、恢复和停止池。
有关计算池的详细信息,请参阅 Snowpark Container Services:使用计算池。
创建计算池¶
您可以调用 ComputePoolCollection.create
方法,并传递表示要创建的计算池的 ComputePool
对象,从而创建计算池。
要创建计算池,请先创建指定池属性的 ComputePool
对象,如下所示:
计算池名称
池将包含的最大节点数和最小节点数
实例系列的名称,标识要为池中的节点配置的计算机类型
当向池提交服务或作业时,池是否应该自动恢复
以下示例中的代码会创建一个 ComputePool
对象,其表示名为 my_compute_pool
的池:
from snowflake.core.compute_pool import ComputePool
compute_pool = ComputePool(name="my_compute_pool", min_nodes=1, max_nodes=2, instance_family="CPU_X64_XS", auto_resume=False)
root.compute_pools.create(compute_pool)
然后,代码通过将 ComputePool
对象传递给 ComputePoolCollection.create
方法创建计算池。
获取计算池详细信息¶
您可以通过调用 ComputePoolResource.fetch
方法来获取有关计算池的信息,该方法会返回一个 ComputePool
对象。
以下示例中的代码会获取有关名为 my_compute_pool
的池的信息:
compute_pool = root.compute_pools["my_compute_pool"].fetch()
print(compute_pool.to_dict())
创建或更改计算池¶
您可以设置 ComputePool
对象的属性并将其传递给 ComputePoolResource.create_or_alter
方法以创建计算池(如果计算池不存在)或者根据计算池定义更改计算池(如果计算池存在)。create_or_alter
的行为旨在实现幂等性,这意味着无论在运行该方法之前计算池是否存在,生成的计算池对象都是一样的。
备注
对于任何没有显式定义的 ComputePool 属性,create_or_alter
方法会使用默认值。例如,如果您没有设置 auto_resume
,则即使计算池以前以不同的值存在,其值也会默认为 None
。
以下示例中的代码会更新 my_compute_pool
计算池允许的最大节点数,然后更改 Snowflake 上的计算池:
compute_pool = root.compute_pools["my_compute_pool"].fetch()
compute_pool.max_nodes = 3
compute_pool_res = root.compute_pools["my_compute_pool"].create_or_alter(compute_pool)
列出计算池¶
您可以使用 iter
方法列出计算池,该方法会返回一个 PagedIter
迭代器。
以下示例中的代码会列出名称以 my
开头的计算池:
compute_pools = root.compute_pools.iter(like="my%")
for compute_pool in compute_pools:
print(compute_pool.name)
执行计算池操作¶
您可以使用 ComputePoolResource
对象执行常见的计算池操作(如暂停、恢复和停止池),而该对象可使用 ComputePool.fetch
方法进行获取。
以下示例中的代码暂停、恢复和停止 my_compute_pool
计算池:
compute_pool_res = root.compute_pools["my_compute_pool"]
compute_pool_res.suspend()
compute_pool_res.resume()
compute_pool_res.stop_all_services()
管理图像存储库¶
您可以管理图像存储库,这些存储库可存储您在容器服务上运行的应用程序的图像。
图像存储库是一个架构级对象。当您创建或引用一个存储库时,您是在其架构的上下文中执行此操作的。
Snowflake Python APIs 使用两种不同类型来表示镜像存储库:
ImageRepository
:显示镜像仓库的属性,例如其数据库名称、架构名称、存储库 URL 以及所有者。ImageRepositoryResource
:显示可用于提取相应ImageRepository
对象和删除镜像存储库资源的方法。
有关镜像仓库的详细信息,请参阅 Snowpark Container Services:使用镜像注册表和镜像仓库。
创建镜像仓库¶
要创建镜像仓库,请先创建指定存储库名称的 ImageRepository
对象。
以下示例中的代码会创建一个 ImageRepository
对象,其表示名为 my_repo
的存储库:
from snowflake.core.image_repository import ImageRepository
my_repo = ImageRepository("my_repo")
root.databases["my_db"].schemas["my_schema"].image_repositories.create(my_repo)
然后,代码通过将 ImageRepository
对象传递给 ImageRepositoryCollection.create
方法来创建镜像仓库,该存储库在 my_db
数据库和 my_schema
架构中进行创建。
获取镜像仓库详细信息¶
您可以通过调用 ImageRepositoryResource.fetch
方法来获取有关镜像仓库的信息,该方法会返回一个 ImageRepository
对象。
以下示例中的代码会获取一个表示 my_repo
镜像仓库的 ImageRepository
对象,然后打印存储库所有者的名称:
my_repo_res = root.databases["my_db"].schemas["my_schema"].image_repositories["my_repo"]
my_repo = my_repo_res.fetch()
print(my_repo.owner)
列出镜像仓库¶
您可以使用 iter
方法在指定架构中列出镜像仓库,该方法会返回一个 ImageRepository
对象的 PagedIter
迭代器。
以下示例中的代码会列出 my_db
数据库和 my_schema
架构中的存储库名称:
repo_list = root.databases["my_db"].schemas["my_schema"].image_repositories.iter()
for repo_obj in repo_list:
print(repo_obj.name)
删除镜像仓库¶
您可以使用 ImageRepositoryResource.drop
方法删除镜像存储库。
下面的示例中的代码会删除 my_repo
存储库:
my_repo_res = root.databases["my_db"].schemas["my_schema"].image_repositories["my_repo"]
my_repo_res.drop()
管理服务和服务函数¶
您可以管理这些运行应用程序容器的服务,除非您停止它们。Snowflake 会在服务容器停止时自动重启服务。通过这种方式,服务可以不间断地有效运行。
服务是一个架构级对象。当您创建或引用一项服务时,您是在其架构的上下文中执行此操作的。
Snowflake Python APIs 使用两种不同类型来表示服务:
Service
:显示服务的属性,例如其规范、最小实例和最大实例,以及数据库和架构的名称。ServiceResource
:显示可用于提取相应Service
对象、暂停和恢复服务以及获取其状态的方法。
有关服务的详细信息,请参阅 Snowpark Container Services:使用服务。
创建服务¶
要创建服务,需要运行 services.create
方法,传递表示要创建的服务的 Service
对象。
您可根据已上传到暂存区的服务规范 .yaml
文件创建服务。有关创建服务规范的详细信息,请参阅 服务规范参考。
上传规范¶
如果要根据尚未上传到暂存区的规范创建服务,可使用 Snowpark FileOperation 对象上传该规范。
以下示例中的代码使用 FileOperation.put
方法将规范作为文件上传:
session.file.put("/local_location/my_service_spec.yaml", "@my_stage")
以下示例中的代码使用 FileOperation.put_stream
方法将规范作为字符串上传:
service_spec_string = """
// Specification as a string.
"""
session.file.put_stream(StringIO(sepc_in_string), "@my_stage/my_service_spec.yaml")
创建服务¶
要根据暂存区规范创建服务,请先创建指定如下服务属性的 Service
对象:
服务名称
Snowflake 可创建的最大和最小服务实例数
应向其中添加服务的计算池
存储位置和规格名称
以下示例中的代码会创建一个 Service
对象,其表示 @my_stage/my_service_spec.yaml
内的规范中名为 my_service
的服务:
from snowflake.core.service import Service, ServiceSpec
my_service = Service(name="my_service", min_instances=1, max_instances=2, compute_pool="my_compute_pool", spec=ServiceSpec("@my_stage/my_service_spec.yaml"))
root.databases["my_db"].schemas["my_schema"].services.create(my_service)
然后,代码通过将 Service
对象传递给 ServiceCollection.create
方法来创建服务,该服务在 my_db
数据库和 my_schema
架构中进行创建。
您还可以根据您作为内联文本提供的规范创建服务,如下例所示。ServiceSpec
函数采用单个字符串实参 spec
。如果字符串以 @
开头,则函数将其解释并验证为暂存区文件路径。否则该字符串将作为内联文本传递。
from textwrap import dedent
from snowflake.core.service import Service, ServiceSpec
spec_text = dedent(f"""\
spec:
containers:
- name: hello-world
image: repo/hello-world:latest
endpoints:
- name: hello-world-endpoint
port: 8080
public: true
""")
my_service = Service(name="my_service", min_instances=1, max_instances=2, compute_pool="my_compute_pool", spec=ServiceSpec(spec_text))
root.databases["my_db"].schemas["my_schema"].services.create(my_service)
创建服务函数¶
服务启动并运行后,您可以创建一个与服务端点通信的服务函数。服务函数是您创建的用户定义的函数 (UDF),它与 Snowpark Container Services 中的服务关联。有关更多信息,请参阅 服务函数:使用 SQL 查询中的服务。
以下示例中的代码会创建一个名为 my-udf
的 UDF,其指定您以前定义的 hello-world
服务和 hello-world-endpoint
端点:
from snowflake.core import CreateMode
from snowflake.core.function import FunctionArgument, ServiceFunction
root.databases["my_db"].schemas["my_schema"].functions.create(
ServiceFunction(
name="my-udf",
arguments=[
FunctionArgument(name="input", datatype="TEXT")
],
returns="TEXT",
service="hello-world",
endpoint="'hello-world-endpoint'",
path="/hello-world-path",
max_batch_rows=5,
),
mode = CreateMode.or_replace
)
调用服务函数¶
创建服务函数之后,您可以调用该函数来测试它。
以下示例中的代码会调用您以前创建的 my-udf
服务函数:
result = root.databases["my_db"].schemas["my_schema"].functions["my-udf(TEXT)"].execute_function(["test"])
print(result)
获取服务详细信息¶
您可以通过调用 ServiceResource.fetch
方法获取有关 Snowflake 服务的信息,该方法会返回一个 Service
对象。
以下示例中的代码会获取有关名为 my_service
的服务的信息:
my_service = root.databases["my_db"].schemas["my_schema"].services["my_service"].fetch()
列出服务¶
您可以使用 iter
方法列出指定架构中的服务,该方法会返回一个 Service
对象的 PagedIter
迭代器。
以下示例中的代码会列出名称以 my
开头的服务:
services = root.databases["my_db"].schemas["my_schema"].services.iter(like="my%")
for service_obj in services:
print(service_obj.name)
执行服务操作¶
您可以使用 ServiceResource
对象执行常见的服务操作 – 如暂停、恢复和获取服务状态。
以下示例中的代码会暂停和恢复 my_service
服务,还会获取服务状态:
my_service_res = root.databases["my_db"].schemas["my_schema"].services["my_service"]
my_service_res.suspend()
my_service_res.resume()
status = my_service_res.get_service_status(10)