用 Python 管理数据加载和卸载资源¶
您可以使用 Python 来管理 Snowflake 中的数据加载和卸载资源,包括外部卷、管道和暂存区。
先决条件¶
在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root
对象以使用 Snowflake Python APIs 的代码。
例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
代码可通过生成的 Session
对象创建 Root
对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python APIs 连接到 Snowflake。
管理暂存区¶
您可以管理 Snowflake 暂存区,它们是云存储中数据文件的位置。有关暂存区的概述,请参阅 数据加载概述。
Snowflake Python APIs 使用两种不同类型来表示存储区:
Stage
:显示暂存区的属性,例如其名称、加密类型、凭据和目录表设置。StageResource
:显示可用于提取相应Stage
对象、上传和列出暂存区上的文件和删除暂存区的方法。
创建暂存区¶
要创建暂存区,请先创建 Stage
对象,然后根据 API Root
对象创建 StageCollection
对象。使用 StageCollection.create
将新暂存区添加到 Snowflake。
以下示例中的代码会创建一个 Stage
对象,其表示名为:code:my_stage
且加密类型为 ``SNOWFLAKE_SSE``(仅限服务器端加密)的暂存区:
from snowflake.core.stage import Stage, StageEncryption
my_stage = Stage(
name="my_stage",
encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
该代码会创建一个 StageCollection
变量 stages
,并使用 StageCollection.create
在 Snowflake 中创建一个新暂存区。
获取暂存区详细信息¶
您可以通过调用 StageResource.fetch
方法来获取有关暂存区的信息,该方法会返回一个 Stage
对象。
以下示例中的代码会获取有关名为 my_stage
的暂存区的信息:
my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
列出暂存区¶
您可以使用 StageCollection.iter
方法列出暂存区,该方法会返回 Stage
对象的 PagedIter
迭代器。
以下示例中的代码会列出名称中包含 my
文本的暂存区,并打印每个暂存区的名称:
from snowflake.core.stage import StageCollection
stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%") # returns a PagedIter[Stage]
for stage_obj in stage_iter:
print(stage_obj.name)
执行暂存区操作¶
您可以使用 StageResource
对象执行常见的暂存区操作,例如将文件上传到暂存区和列出暂存区中的文件。
为了演示可以对暂存区资源执行的一些操作,以下示例中的代码会执行以下操作:
使用指定的自动压缩和覆盖选项,将名为
my-file.yaml
的文件上传到my_stage
暂存区。列出暂存区上的所有文件,以验证文件是否上传成功。
删除暂存区。
my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]
my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)
stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
print(stageFile)
my_stage_res.drop()
管理管道¶
您可以管理 Snowflake 管道;管道是已命名的一级 Snowflake 对象,它们包含一个 COPY INTO 语句,由 Snowpipe 用于将数据从引入队列加载到表中。有关管道的概述,请参阅 Snowpipe。
Snowflake Python APIs 使用两种不同类型来表示管道:
Pipe
:显示管道的属性,例如其如名称和要由 Snowpipe 使用的 COPY INTO 语句。PipeResource
:显示可用于获取相应的:code:Pipe
对象、使用暂存数据文件刷新管道和删除管道的方法。
创建管道¶
要创建管道,请先创建 Pipe
对象,然后根据 API Root
对象创建 PipeCollection
对象。使用 PipeCollection.create
将新管道添加到 Snowflake。
以下示例中的代码会使用指定的 COPY INTO 语句创建一个 Pipe
对象,其表示名为 my_pipe
的管道:
from snowflake.core.pipe import Pipe
my_pipe = Pipe(
name="my_pipe",
comment="creating my pipe",
copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)
pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
该代码会创建一个 PipeCollection
变量 pipes
并使用 PipeCollection.create
在 Snowflake 中创建一个新管道。
获取管道详细信息¶
您可以通过调用 PipeResource.fetch
方法来获取有关管道的信息,该方法会返回一个 Pipe
对象。
以下示例中的代码会获取有关名为 my_pipe
的管道的信息:
my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
列出管道¶
您可以使用 PipeCollection.iter
方法列出管道,该方法会返回 Pipe
对象的 PagedIter
迭代器。
以下示例中的代码会列出名称以 my
开头的管道,并打印每个管道的名称:
from snowflake.core.pipe import PipeCollection
pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%") # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
print(pipe_obj.name)
执行管道操作¶
您可以使用 PipeResource
对象执行常见的管道操作,例如刷新管道和删除管道。
备注
目前仅支持 ALTER PIPE 的 REFRESH 功能。
了演示您可以对管道资源执行的操作,以下示例中的代码会执行以下操作:
获取
my_pipe
管道资源对象。使用带有指定的可选前缀(或路径)的暂存数据文件刷新管道。
删除管道。
my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]
# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")
my_pipe_res.drop()
管理外部卷¶
您可以管理外部卷;外部卷是命名的账户级 Snowflake 对象,用于将 Snowflake 连接到 Apache Iceberg™ 表的外部云存储。有关更多信息,请参阅 Apache Iceberg™ 表 的 外部卷 部分。
Snowflake Python APIs 使用两种不同类型来表示外部卷:
ExternalVolume
:显示外部卷的属性,例如其名称和存储位置。ExternalVolumeResource
:显示可用于获取相应ExternalVolume
对象以及删除和取消删除外部卷的方法。
创建外部卷¶
要创建外部卷,请先创建一个 ExternalVolume
对象,然后根据 API Root
对象创建 ExternalVolumeCollection
对象。使用 ExternalVolumeCollection.create
将新外部卷添加到 Snowflake。
以下示例中的代码会创建一个 ExternalVolume
对象,其表示名为 my_external_volume
且具有指定的 AWS S3 存储位置的外部卷:
from snowflake.core.external_volume import (
ExternalVolume,
StorageLocationS3,
)
my_external_volume = ExternalVolume(
name="my_external_volume",
storage_locations=[
StorageLocationS3(
name="my-s3-us-west-1",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
StorageLocationS3(
name="my-s3-us-west-2",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
]
)
root.external_volumes.create(my_external_volume)
获取外部卷详细信息¶
您可以通过调用 ExternalVolumeResource.fetch
方法来获取有关外部卷的信息,该方法会返回一个 ExternalVolume
对象。
以下示例中的代码会获取有关名为 my_external_volume
的外部卷的信息:
my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
列出外部卷¶
您可以使用 ExternalVolumeCollection.iter
方法列出外部卷,该方法会返回 ExternalVolume
对象的 PagedIter
迭代器。
以下示例中的代码会列出名称以 my
开头的外部卷,并打印每个外部卷的名称:
external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
print(external_volume_obj.name)
执行外部卷操作¶
您可以使用 ExternalVolumeResource
对象执行常见的外部卷操作,例如删除和取消删除外部卷。
为了演示可以对外部卷资源执行的操作,以下示例中的代码会执行以下操作:
获取
my_external_volume
外部卷资源对象。删除外部卷。
取消删除外部卷。
my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()