用 Python 管理 Snowflake 流¶
您可以使用 Python 来管理 Snowflake 流;流是一些对象,用于记录对表所做的数据操作语言 (DML) 更改(包括插入、更新和删除)以及每次更改的元数据。有关更多信息,请参阅 Streams 简介。
备注
ALTER STREAM 目前不受支持。
Snowflake Python APIs 使用两种不同类型来表示流:
Stream:显示流的属性,例如其名称、目标滞后、仓库和查询语句。StreamResource:显示可用于提取相应Stream对象、暂停和恢复流以及删除流的方法。
先决条件¶
在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root 对象以使用 Snowflake Python APIs 的代码。
例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
代码可通过生成的 Session 对象创建 Root 对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python APIs 连接到 Snowflake。
创建流¶
要创建流,请先创建 Stream 对象,然后根据 API Root 对象创建 StreamCollection 对象。使用 StreamCollection.create 将新流添加到 Snowflake。
您可以在针对以下对象类型创建流:
标准表
视图
目录表
针对源表¶
以下示例中的代码会创建一个 Stream 对象,其表示 my_db 数据库和 my_schema 架构中源表 my_table 上名为 my_stream_on_table 且具有指定的流属性的流:
备注
StreamSourceTable 类型只支持标准表。目前不支持其他类型的表,如动态表、事件表、外部表和 Iceberg 表。
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable
stream_on_table = Stream(
"my_stream_on_table",
StreamSourceTable(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_table',
append_only = True,
show_initial_rows = False,
),
comment = 'create stream on table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
该代码会创建一个 StreamCollection 变量 streams 并使用 StreamCollection.create 在 Snowflake 中创建一个新流。
针对源视图¶
以下示例中的代码会创建一个 Stream 对象,其表示 my_db 数据库和 my_schema 架构中源视图 my_view 上名为 my_stream_on_view 且具有指定的流属性的流:
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView
stream_on_view = Stream(
"my_stream_on_view",
StreamSourceView(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_view',
),
comment = 'create stream on view'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
针对源目录表¶
以下示例中的代码会创建一个 Stream 对象,其表示``my_db`` 数据库和 my_schema 架构中源目录表 my_directory_table 上名为 my_stream_on_directory_table 且具有指定的流属性的流:
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage
stream_on_directory_table = Stream(
"my_stream_on_directory_table",
StreamSourceStage(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_directory_table',
),
comment = 'create stream on directory table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
克隆流¶
以下示例中的代码会创建一个名为 my_stream 的新流,其定义与 my_db 数据库和 my_schema 架构中的源流 my_other_stream 相同:
from snowflake.core.stream import Stream
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
获取流详细信息¶
您可以通过调用 StreamResource.fetch 方法来获取关于该流的信息,该方法会返回一个 Stream 对象。
以下示例中的代码会获取 my_db 数据库和 my_schema 架构中名为 my_stream 的流的信息:
stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
列出流¶
您可以使用 StreamCollection.iter 方法列出流,该方法会返回 Stream 对象的 PagedIter 迭代器。
以下示例中的代码会列出 my_db 数据库和 my_schema 架构中名称以 my 开头的流,并打印每个流的名称:
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
print(stream_obj.name)
以下示例中的代码也会列出名称以 my 开头的流,但使用的是 starts_with 参数,而不是 like。此示例还设置了可选参数 show_limit=10,将结果数限制为 10:
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
print(stream_obj.name)
删除流¶
您可以使用 StreamResource 对象删除流。
以下示例中的代码会获取 my_stream 流资源对象,然后删除该流。
my_stream_res = root.streams["my_stream"]
my_stream_res.drop()