用 Python 管理数据加载和卸载资源

您可以使用 Python 来管理 Snowflake 中的数据加载和卸载资源,包括外部卷、管道和暂存区。

先决条件

在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root 对象以使用 Snowflake Python APIs 的代码。

例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

代码可通过生成的 Session 对象创建 Root 对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python APIs 连接到 Snowflake

管理暂存区

您可以管理 Snowflake 暂存区,它们是云存储中数据文件的位置。有关暂存区的概述,请参阅 数据加载概述

Snowflake Python APIs 使用两种不同类型来表示存储区:

  • Stage:显示暂存区的属性,例如其名称、加密类型、凭据和目录表设置。

  • StageResource:显示可用于提取相应 Stage 对象、上传和列出暂存区上的文件和删除暂存区的方法。

创建暂存区

要创建暂存区,请先创建 Stage 对象,然后根据 API Root 对象创建 StageCollection 对象。使用 StageCollection.create 将新暂存区添加到 Snowflake。

以下示例中的代码会创建一个 Stage 对象,其表示名为:code:my_stage 且加密类型为 ``SNOWFLAKE_SSE``(仅限服务器端加密)的暂存区:

from snowflake.core.stage import Stage, StageEncryption

my_stage = Stage(
  name="my_stage",
  encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
Copy

该代码会创建一个 StageCollection 变量 stages,并使用 StageCollection.create 在 Snowflake 中创建一个新暂存区。

获取暂存区详细信息

您可以通过调用 StageResource.fetch 方法来获取有关暂存区的信息,该方法会返回一个 Stage 对象。

以下示例中的代码会获取有关名为 my_stage 的暂存区的信息:

my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
Copy

列出暂存区

您可以使用 StageCollection.iter 方法列出暂存区,该方法会返回 Stage 对象的 PagedIter 迭代器。

以下示例中的代码会列出名称中包含 my 文本的暂存区,并打印每个暂存区的名称:

from snowflake.core.stage import StageCollection

stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%")  # returns a PagedIter[Stage]
for stage_obj in stage_iter:
  print(stage_obj.name)
Copy

执行暂存区操作

您可以使用 StageResource 对象执行常见的暂存区操作,例如将文件上传到暂存区和列出暂存区中的文件。

为了演示可以对暂存区资源执行的一些操作,以下示例中的代码会执行以下操作:

  1. 使用指定的自动压缩和覆盖选项,将名为 my-file.yaml 的文件上传到 my_stage 暂存区。

  2. 列出暂存区上的所有文件,以验证文件是否上传成功。

  3. 删除暂存区。

my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]

my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)

stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
  print(stageFile)

my_stage_res.drop()
Copy

管理管道

您可以管理 Snowflake 管道;管道是已命名的一级 Snowflake 对象,它们包含一个 COPY INTO 语句,由 Snowpipe 用于将数据从引入队列加载到表中。有关管道的概述,请参阅 Snowpipe

Snowflake Python APIs 使用两种不同类型来表示管道:

  • Pipe:显示管道的属性,例如其如名称和要由 Snowpipe 使用的 COPY INTO 语句。

  • PipeResource:显示可用于获取相应的:code:Pipe 对象、使用暂存数据文件刷新管道和删除管道的方法。

创建管道

要创建管道,请先创建 Pipe 对象,然后根据 API Root 对象创建 PipeCollection 对象。使用 PipeCollection.create 将新管道添加到 Snowflake。

以下示例中的代码会使用指定的 COPY INTO 语句创建一个 Pipe 对象,其表示名为 my_pipe 的管道:

from snowflake.core.pipe import Pipe

my_pipe = Pipe(
  name="my_pipe",
  comment="creating my pipe",
  copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)

pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
Copy

该代码会创建一个 PipeCollection 变量 pipes 并使用 PipeCollection.create 在 Snowflake 中创建一个新管道。

获取管道详细信息

您可以通过调用 PipeResource.fetch 方法来获取有关管道的信息,该方法会返回一个 Pipe 对象。

以下示例中的代码会获取有关名为 my_pipe 的管道的信息:

my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
Copy

列出管道

您可以使用 PipeCollection.iter 方法列出管道,该方法会返回 Pipe 对象的 PagedIter 迭代器。

以下示例中的代码会列出名称以 my 开头的管道,并打印每个管道的名称:

from snowflake.core.pipe import PipeCollection

pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%")  # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
  print(pipe_obj.name)
Copy

执行管道操作

您可以使用 PipeResource 对象执行常见的管道操作,例如刷新管道和删除管道。

备注

目前仅支持 ALTER PIPE 的 REFRESH 功能。

了演示您可以对管道资源执行的操作,以下示例中的代码会执行以下操作:

  1. 获取 my_pipe 管道资源对象。

  2. 使用带有指定的可选前缀(或路径)的暂存数据文件刷新管道。

  3. 删除管道。

my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]

# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")

my_pipe_res.drop()
Copy

管理外部卷

您可以管理外部卷;外部卷是命名的账户级 Snowflake 对象,用于将 Snowflake 连接到 Apache Iceberg™ 表的外部云存储。有关更多信息,请参阅 Apache Iceberg™ 表外部卷 部分。

Snowflake Python APIs 使用两种不同类型来表示外部卷:

  • ExternalVolume:显示外部卷的属性,例如其名称和存储位置。

  • ExternalVolumeResource:显示可用于获取相应 ExternalVolume 对象以及删除和取消删除外部卷的方法。

创建外部卷

要创建外部卷,请先创建一个 ExternalVolume 对象,然后根据 API Root 对象创建 ExternalVolumeCollection 对象。使用 ExternalVolumeCollection.create 将新外部卷添加到 Snowflake。

以下示例中的代码会创建一个 ExternalVolume 对象,其表示名为 my_external_volume 且具有指定的 AWS S3 存储位置的外部卷:

from snowflake.core.external_volume import (
    ExternalVolume,
    StorageLocationS3,
)

my_external_volume = ExternalVolume(
    name="my_external_volume",
    storage_locations=[
        StorageLocationS3(
            name="my-s3-us-west-1",
            storage_base_url="s3://MY_EXAMPLE_BUCKET/",
            storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
            encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
        ),
        StorageLocationS3(
            name="my-s3-us-west-2",
            storage_base_url="s3://MY_EXAMPLE_BUCKET/",
            storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
            encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
        ),
    ]
)

root.external_volumes.create(my_external_volume)
Copy

获取外部卷详细信息

您可以通过调用 ExternalVolumeResource.fetch 方法来获取有关外部卷的信息,该方法会返回一个 ExternalVolume 对象。

以下示例中的代码会获取有关名为 my_external_volume 的外部卷的信息:

my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
Copy

列出外部卷

您可以使用 ExternalVolumeCollection.iter 方法列出外部卷,该方法会返回 ExternalVolume 对象的 PagedIter 迭代器。

以下示例中的代码会列出名称以 my 开头的外部卷,并打印每个外部卷的名称:

external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
  print(external_volume_obj.name)
Copy

执行外部卷操作

您可以使用 ExternalVolumeResource 对象执行常见的外部卷操作,例如删除和取消删除外部卷。

为了演示可以对外部卷资源执行的操作,以下示例中的代码会执行以下操作:

  1. 获取 my_external_volume 外部卷资源对象。

  2. 删除外部卷。

  3. 取消删除外部卷。

my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()
Copy
语言: 中文