用 Python 管理 Snowflake 流

您可以使用 Python 来管理 Snowflake 流;流是一些对象,用于记录对表所做的数据操作语言 (DML) 更改(包括插入、更新和删除)以及每次更改的元数据。有关更多信息,请参阅 Streams 简介

备注

ALTER STREAM 目前不受支持。

Snowflake Python APIs 使用两种不同类型来表示流:

  • Stream:显示流的属性,例如其名称、目标滞后、仓库和查询语句。

  • StreamResource:显示可用于提取相应 Stream 对象、暂停和恢复流以及删除流的方法。

先决条件

在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root 对象以使用 Snowflake Python APIs 的代码。

例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

代码可通过生成的 Session 对象创建 Root 对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python APIs 连接到 Snowflake

创建流

要创建流,请先创建 Stream 对象,然后根据 API Root 对象创建 StreamCollection 对象。使用 StreamCollection.create 将新流添加到 Snowflake。

您可以在针对以下对象类型创建流:

  • 标准表

  • 视图

  • 目录表

针对源表

以下示例中的代码会创建一个 Stream 对象,其表示 my_db 数据库和 my_schema 架构中源表 my_table 上名为 my_stream_on_table 且具有指定的流属性的流:

备注

StreamSourceTable 类型只支持标准表。目前不支持其他类型的表,如动态表、事件表、外部表和 Iceberg 表。

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable

stream_on_table = Stream(
  "my_stream_on_table",
  StreamSourceTable(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_table',
      append_only = True,
      show_initial_rows = False,
  ),
  comment = 'create stream on table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
Copy

该代码会创建一个 StreamCollection 变量 streams 并使用 StreamCollection.create 在 Snowflake 中创建一个新流。

针对源视图

以下示例中的代码会创建一个 Stream 对象,其表示 my_db 数据库和 my_schema 架构中源视图 my_view 上名为 my_stream_on_view 且具有指定的流属性的流:

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView

stream_on_view = Stream(
  "my_stream_on_view",
  StreamSourceView(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_view',
  ),
  comment = 'create stream on view'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
Copy

针对源目录表

以下示例中的代码会创建一个 Stream 对象,其表示``my_db`` 数据库和 my_schema 架构中源目录表 my_directory_table 上名为 my_stream_on_directory_table 且具有指定的流属性的流:

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage

stream_on_directory_table = Stream(
  "my_stream_on_directory_table",
  StreamSourceStage(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_directory_table',
  ),
  comment = 'create stream on directory table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
Copy

克隆流

以下示例中的代码会创建一个名为 my_stream 的新流,其定义与 my_db 数据库和 my_schema 架构中的源流 my_other_stream 相同:

from snowflake.core.stream import Stream

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
Copy

获取流详细信息

您可以通过调用 StreamResource.fetch 方法来获取关于该流的信息,该方法会返回一个 Stream 对象。

以下示例中的代码会获取 my_db 数据库和 my_schema 架构中名为 my_stream 的流的信息:

stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
Copy

列出流

您可以使用 StreamCollection.iter 方法列出流,该方法会返回 Stream 对象的 PagedIter 迭代器。

以下示例中的代码会列出 my_db 数据库和 my_schema 架构中名称以 my 开头的流,并打印每个流的名称:

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

以下示例中的代码也会列出名称以 my 开头的流,但使用的是 starts_with 参数,而不是 like。此示例还设置了可选参数 show_limit=10,将结果数限制为 10

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

删除流

您可以使用 StreamResource 对象删除流。

以下示例中的代码会获取 my_stream 流资源对象,然后删除该流。

my_stream_res = root.streams["my_stream"]
my_stream_res.drop()
Copy
语言: 中文