使用 Python 管理 Snowpark Container Services(包括服务函数)

您可以使用 Python 管理 Snowpark 容器服务,这是一种完全托管的容器服务,您可以通过它来部署、管理和扩展容器化应用程序。有关 Snowpark 容器服务的概述,请参阅 关于 Snowpark Container Services

使用 Snowflake Python APIs,您可以管理计算池、镜像存储库和服务。

先决条件

在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root 对象以使用 Snowflake Python APIs 的代码。

例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

代码可通过生成的 Session 对象创建 Root 对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python APIs 连接到 Snowflake

管理计算池

您可以管理计算池,它是虚拟机 (VM) 节点的集合,在这些节点上 Snowflake 运行 Snowpark 容器服务作业和服务。

Snowflake Python APIs 使用两种不同类型来表示计算池:

  • ComputePool:显示计算池的属性,例如其仓库、最大节点和最小节点,以及自动恢复和自动暂停设置。

  • ComputePoolResource:显示对计算池执行操作的方法,例如提取相应的 ComputePool 对象以及如何暂停、恢复和停止池。

有关计算池的详细信息,请参阅 Snowpark Container Services:使用计算池

创建计算池

您可以调用 ComputePoolCollection.create 方法,并传递表示要创建的计算池的 ComputePool 对象,从而创建计算池。

要创建计算池,请先创建指定池属性的 ComputePool 对象,如下所示:

  • 计算池名称

  • 池将包含的最大节点数和最小节点数

  • 实例系列的名称,标识要为池中的节点配置的计算机类型

  • 当向池提交服务或作业时,池是否应该自动恢复

以下示例中的代码会创建一个 ComputePool 对象,其表示名为 my_compute_pool 的池:

from snowflake.core.compute_pool import ComputePool

compute_pool = ComputePool(name="my_compute_pool", min_nodes=1, max_nodes=2, instance_family="CPU_X64_XS", auto_resume=False)
root.compute_pools.create(compute_pool)
Copy

然后,代码通过将 ComputePool 对象传递给 ComputePoolCollection.create 方法创建计算池。

获取计算池详细信息

您可以通过调用 ComputePoolResource.fetch 方法来获取有关计算池的信息,该方法会返回一个 ComputePool 对象。

以下示例中的代码会获取有关名为 my_compute_pool 的池的信息:

compute_pool = root.compute_pools["my_compute_pool"].fetch()
print(compute_pool.to_dict())
Copy

创建或更改计算池

您可以设置 ComputePool 对象的属性并将其传递给 ComputePoolResource.create_or_alter 方法以创建计算池(如果计算池不存在)或者根据计算池定义更改计算池(如果计算池存在)。create_or_alter 的行为旨在实现幂等性,这意味着无论在运行该方法之前计算池是否存在,生成的计算池对象都是一样的。

备注

对于任何没有显式定义的 ComputePool 属性,create_or_alter 方法会使用默认值。例如,如果您没有设置 auto_resume,则即使计算池以前以不同的值存在,其值也会默认为 None

以下示例中的代码会更新 my_compute_pool 计算池允许的最大节点数,然后更改 Snowflake 上的计算池:

compute_pool = root.compute_pools["my_compute_pool"].fetch()
compute_pool.max_nodes = 3
compute_pool_res = root.compute_pools["my_compute_pool"].create_or_alter(compute_pool)
Copy

列出计算池

您可以使用 iter 方法列出计算池,该方法会返回一个 PagedIter 迭代器。

以下示例中的代码会列出名称以 my 开头的计算池:

compute_pools = root.compute_pools.iter(like="my%")
for compute_pool in compute_pools:
  print(compute_pool.name)
Copy

执行计算池操作

您可以使用 ComputePoolResource 对象执行常见的计算池操作(如暂停、恢复和停止池),而该对象可使用 ComputePool.fetch 方法进行获取。

以下示例中的代码暂停、恢复和停止 my_compute_pool 计算池:

compute_pool_res = root.compute_pools["my_compute_pool"]
compute_pool_res.suspend()
compute_pool_res.resume()
compute_pool_res.stop_all_services()
Copy

管理图像存储库

您可以管理图像存储库,这些存储库可存储您在容器服务上运行的应用程序的图像。

图像存储库是一个架构级对象。当您创建或引用一个存储库时,您是在其架构的上下文中执行此操作的。

Snowflake Python APIs 使用两种不同类型来表示镜像存储库:

  • ImageRepository:显示镜像仓库的属性,例如其数据库名称、架构名称、存储库 URL 以及所有者。

  • ImageRepositoryResource:显示可用于提取相应 ImageRepository 对象和删除镜像存储库资源的方法。

有关镜像仓库的详细信息,请参阅 Snowpark Container Services:使用镜像注册表和镜像仓库

创建镜像仓库

要创建镜像仓库,请先创建指定存储库名称的 ImageRepository 对象。

以下示例中的代码会创建一个 ImageRepository 对象,其表示名为 my_repo 的存储库:

from snowflake.core.image_repository import ImageRepository

my_repo = ImageRepository("my_repo")
root.databases["my_db"].schemas["my_schema"].image_repositories.create(my_repo)
Copy

然后,代码通过将 ImageRepository 对象传递给 ImageRepositoryCollection.create 方法来创建镜像仓库,该存储库在 my_db 数据库和 my_schema 架构中进行创建。

获取镜像仓库详细信息

您可以通过调用 ImageRepositoryResource.fetch 方法来获取有关镜像仓库的信息,该方法会返回一个 ImageRepository 对象。

以下示例中的代码会获取一个表示 my_repo 镜像仓库的 ImageRepository 对象,然后打印存储库所有者的名称:

my_repo_res = root.databases["my_db"].schemas["my_schema"].image_repositories["my_repo"]
my_repo = my_repo_res.fetch()
print(my_repo.owner)
Copy

列出镜像仓库

您可以使用 iter 方法在指定架构中列出镜像仓库,该方法会返回一个 ImageRepository 对象的 PagedIter 迭代器。

以下示例中的代码会列出 my_db 数据库和 my_schema 架构中的存储库名称:

repo_list = root.databases["my_db"].schemas["my_schema"].image_repositories.iter()
for repo_obj in repo_list:
  print(repo_obj.name)
Copy

删除镜像仓库

您可以使用 ImageRepositoryResource.drop 方法删除镜像存储库。

下面的示例中的代码会删除 my_repo 存储库:

my_repo_res = root.databases["my_db"].schemas["my_schema"].image_repositories["my_repo"]
my_repo_res.drop()
Copy

管理服务和服务函数

您可以管理这些运行应用程序容器的服务,除非您停止它们。Snowflake 会在服务容器停止时自动重启服务。通过这种方式,服务可以不间断地有效运行。

服务是一个架构级对象。当您创建或引用一项服务时,您是在其架构的上下文中执行此操作的。

Snowflake Python APIs 使用两种不同类型来表示服务:

  • Service:显示服务的属性,例如其规范、最小实例和最大实例,以及数据库和架构的名称。

  • ServiceResource:显示可用于提取相应 Service 对象、暂停和恢复服务以及获取其状态的方法。

有关服务的详细信息,请参阅 Snowpark Container Services:使用服务

创建服务

要创建服务,需要运行 services.create 方法,传递表示要创建的服务的 Service 对象。

您可根据已上传到暂存区的服务规范 .yaml 文件创建服务。有关创建服务规范的详细信息,请参阅 服务规范参考

上传规范

如果要根据尚未上传到暂存区的规范创建服务,可使用 Snowpark FileOperation 对象上传该规范。

以下示例中的代码使用 FileOperation.put 方法将规范作为文件上传:

session.file.put("/local_location/my_service_spec.yaml", "@my_stage")
Copy

以下示例中的代码使用 FileOperation.put_stream 方法将规范作为字符串上传:

service_spec_string = """
// Specification as a string.
"""
session.file.put_stream(StringIO(sepc_in_string), "@my_stage/my_service_spec.yaml")
Copy

创建服务

要根据暂存区规范创建服务,请先创建指定如下服务属性的 Service 对象:

  • 服务名称

  • Snowflake 可创建的最大和最小服务实例数

  • 应向其中添加服务的计算池

  • 存储位置和规格名称

以下示例中的代码会创建一个 Service 对象,其表示 @my_stage/my_service_spec.yaml 内的规范中名为 my_service 的服务:

from snowflake.core.service import Service, ServiceSpec

my_service = Service(name="my_service", min_instances=1, max_instances=2, compute_pool="my_compute_pool", spec=ServiceSpec("@my_stage/my_service_spec.yaml"))
root.databases["my_db"].schemas["my_schema"].services.create(my_service)
Copy

然后,代码通过将 Service 对象传递给 ServiceCollection.create 方法来创建服务,该服务在 my_db 数据库和 my_schema 架构中进行创建。

您还可以根据您作为内联文本提供的规范创建服务,如下例所示。ServiceSpec 函数采用单个字符串实参 spec。如果字符串以 @ 开头,则函数将其解释并验证为暂存区文件路径。否则该字符串将作为内联文本传递。

from textwrap import dedent
from snowflake.core.service import Service, ServiceSpec

spec_text = dedent(f"""\
    spec:
      containers:
      - name: hello-world
        image: repo/hello-world:latest
      endpoints:
      - name: hello-world-endpoint
        port: 8080
        public: true
    """)

my_service = Service(name="my_service", min_instances=1, max_instances=2, compute_pool="my_compute_pool", spec=ServiceSpec(spec_text))
root.databases["my_db"].schemas["my_schema"].services.create(my_service)
Copy

创建服务函数

服务启动并运行后,您可以创建一个与服务端点通信的服务函数。服务函数是您创建的用户定义的函数 (UDF),它与 Snowpark Container Services 中的服务关联。有关更多信息,请参阅 服务函数:使用 SQL 查询中的服务

以下示例中的代码会创建一个名为 my-udf 的 UDF,其指定您以前定义的 hello-world 服务和 hello-world-endpoint 端点:

from snowflake.core import CreateMode
from snowflake.core.function import FunctionArgument, ServiceFunction

root.databases["my_db"].schemas["my_schema"].functions.create(
  ServiceFunction(
    name="my-udf",
    arguments=[
        FunctionArgument(name="input", datatype="TEXT")
    ],
    returns="TEXT",
    service="hello-world",
    endpoint="'hello-world-endpoint'",
    path="/hello-world-path",
    max_batch_rows=5,
  ),
  mode = CreateMode.or_replace
)
Copy

调用服务函数

创建服务函数之后,您可以调用该函数来测试它。

以下示例中的代码会调用您以前创建的 my-udf 服务函数:

result = root.databases["my_db"].schemas["my_schema"].functions["my-udf(TEXT)"].execute_function(["test"])
print(result)
Copy

获取服务详细信息

您可以通过调用 ServiceResource.fetch 方法获取有关 Snowflake 服务的信息,该方法会返回一个 Service 对象。

以下示例中的代码会获取有关名为 my_service 的服务的信息:

my_service = root.databases["my_db"].schemas["my_schema"].services["my_service"].fetch()
Copy

列出服务

您可以使用 iter 方法列出指定架构中的服务,该方法会返回一个 Service 对象的 PagedIter 迭代器。

以下示例中的代码会列出名称以 my 开头的服务:

services = root.databases["my_db"].schemas["my_schema"].services.iter(like="my%")
for service_obj in services:
  print(service_obj.name)
Copy

执行服务操作

您可以使用 ServiceResource 对象执行常见的服务操作 – 如暂停、恢复和获取服务状态。

以下示例中的代码会暂停和恢复 my_service 服务,还会获取服务状态:

my_service_res = root.databases["my_db"].schemas["my_schema"].services["my_service"]

my_service_res.suspend()
my_service_res.resume()
status = my_service_res.get_service_status(10)
Copy
语言: 中文