用 Python 管理 Snowflake 流¶
您可以使用 Python 来管理 Snowflake 流;流是一些对象,用于记录对表所做的数据操作语言 (DML) 更改(包括插入、更新和删除)以及每次更改的元数据。有关更多信息,请参阅 Streams 简介。
备注
ALTER STREAM 目前不受支持。
Snowflake Python APIs 使用两种不同类型来表示流:
Stream
:显示流的属性,例如其名称、目标滞后、仓库和查询语句。StreamResource
:显示可用于提取相应Stream
对象、暂停和恢复流以及删除流的方法。
先决条件¶
在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root
对象以使用 Snowflake Python APIs 的代码。
例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
代码可通过生成的 Session
对象创建 Root
对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python APIs 连接到 Snowflake。
创建流¶
要创建流,请先创建 Stream
对象,然后根据 API Root
对象创建 StreamCollection
对象。使用 StreamCollection.create
将新流添加到 Snowflake。
您可以在针对以下对象类型创建流:
标准表
视图
目录表
针对源表¶
以下示例中的代码会创建一个 Stream
对象,其表示 my_db
数据库和 my_schema
架构中源表 my_table
上名为 my_stream_on_table
且具有指定的流属性的流:
备注
StreamSourceTable
类型只支持标准表。目前不支持其他类型的表,如动态表、事件表、外部表和 Iceberg 表。
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable
stream_on_table = Stream(
"my_stream_on_table",
StreamSourceTable(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_table',
append_only = True,
show_initial_rows = False,
),
comment = 'create stream on table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
该代码会创建一个 StreamCollection
变量 streams
并使用 StreamCollection.create
在 Snowflake 中创建一个新流。
针对源视图¶
以下示例中的代码会创建一个 Stream
对象,其表示 my_db
数据库和 my_schema
架构中源视图 my_view
上名为 my_stream_on_view
且具有指定的流属性的流:
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView
stream_on_view = Stream(
"my_stream_on_view",
StreamSourceView(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_view',
),
comment = 'create stream on view'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
针对源目录表¶
以下示例中的代码会创建一个 Stream
对象,其表示``my_db`` 数据库和 my_schema
架构中源目录表 my_directory_table
上名为 my_stream_on_directory_table
且具有指定的流属性的流:
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage
stream_on_directory_table = Stream(
"my_stream_on_directory_table",
StreamSourceStage(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_directory_table',
),
comment = 'create stream on directory table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
克隆流¶
以下示例中的代码会创建一个名为 my_stream
的新流,其定义与 my_db
数据库和 my_schema
架构中的源流 my_other_stream
相同:
from snowflake.core.stream import Stream
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
获取流详细信息¶
您可以通过调用 StreamResource.fetch
方法来获取关于该流的信息,该方法会返回一个 Stream
对象。
以下示例中的代码会获取 my_db
数据库和 my_schema
架构中名为 my_stream
的流的信息:
stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
列出流¶
您可以使用 StreamCollection.iter
方法列出流,该方法会返回 Stream
对象的 PagedIter
迭代器。
以下示例中的代码会列出 my_db
数据库和 my_schema
架构中名称以 my
开头的流,并打印每个流的名称:
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
print(stream_obj.name)
以下示例中的代码也会列出名称以 my
开头的流,但使用的是 starts_with
参数,而不是 like
。此示例还设置了可选参数 show_limit=10
,将结果数限制为 10
:
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
print(stream_obj.name)
删除流¶
您可以使用 StreamResource
对象删除流。
以下示例中的代码会获取 my_stream
流资源对象,然后删除该流。
my_stream_res = root.streams["my_stream"]
my_stream_res.drop()