使用 Python 管理 Snowflake 动态表

您可以使用 Python 来管理 Snowflake 动态表,这些表是用于连续处理管道的新表类型。动态表物化指定查询的结果。有关此功能的概述,请参阅 动态表

Snowflake Python APIs 使用两种不同类型来表示动态表:

  • DynamicTable:显示动态表的属性,例如其表名、目标滞后、仓库和查询语句。

  • DynamicTableResource:显示可用于获取相应 DynamicTable 对象、暂停和恢复动态表以及删除动态表的方法。

先决条件

在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root 对象以使用 Snowflake Python APIs 的代码。

例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

代码可通过生成的 Session 对象创建 Root 对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python APIs 连接到 Snowflake

创建动态表

要创建动态表,请先创建一个 DynamicTable 对象,然后根据 API Root 对象创建 DynamicTableCollection 对象。使用 DynamicTableCollection.create 将新动态表添加到 Snowflake。

以下示例中的代码会创建一个 DynamicTable 对象,其表示 my_db 数据库和 my_schema 架构中名为 my_dynamic_table 并指定了最少的必需选项的动态表:

from snowflake.core.dynamic_table import DynamicTable, DownstreamLag

my_dt = DynamicTable(
  name='my_dynamic_table',
  target_lag=DownstreamLag(),
  warehouse='my_wh',
  query='SELECT * FROM t',
)
dynamic_tables = root.databases['my_db'].schemas['my_schema'].dynamic_tables
dynamic_tables.create(my_dt)
Copy

该代码会创建一个 DynamicTableCollection 变量 dynamic_tables,并使用 DynamicTableCollection.create 在 Snowflake 中创建了一个新动态表。

以下示例中的代码会创建一个 DynamicTable 对象,其表示 my_db 数据库和 my_schema 架构中一个名为 my_dynamic_table2 并指定了当前所有可能的必需选项的动态表:

from snowflake.core.dynamic_table import DynamicTable, UserDefinedLag

root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
  DynamicTable(
      name='my_dynamic_table2',
      kind='PERMANENT',
      target_lag=UserDefinedLag(seconds=60),
      warehouse='my_wh',
      query='SELECT * FROM t',
      refresh_mode='FULL',
      initialize='ON_SCHEDULE',
      cluster_by=['id > 1'],
      comment='test table',
      data_retention_time_in_days=7,
      max_data_extension_time_in_days=7,
  )
)
Copy

克隆动态表

下面示例中的代码会创建一个名为 my_dynamic_table2 的新动态表,该表具有与 my_db 数据库和 my_schema 架构内源动态表 my_dynamic_table 中的相同列定义和所有现有数据:

备注

此克隆操作使用 DynamicTableClone 对象,该对象包括可选的 target_lagwarehouse 参数,当前不支持其他参数。

from snowflake.core.dynamic_table import DynamicTableClone

root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
  DynamicTableClone(
      name='my_dynamic_table2',
      warehouse='my_wh2',
  ),
  clone_table='my_dynamic_table',
)
Copy

有关此功能的更多信息,请参阅 CREATE DYNAMIC TABLE ... CLONE

获取动态表详细信息

您可以通过调用 DynamicTableResource.fetch 方法来获取有关动态表的信息,该方法会返回一个 DynamicTable 对象。

以下示例中的代码会获取有关 my_db 数据库和 my_schema 架构中名为 my_dynamic_table 的动态表的信息:

dynamic_table = root.databases['my_db'].schemas['my_schema'].dynamic_tables['my_dynamic_table']
dt_details = dynamic_table.fetch()
print(dt_details.to_dict())
Copy

列出动态表

您可以使用 DynamicTableCollection.iter 方法列出动态表,该方法会返回 DynamicTable 对象的 PagedIter 迭代器。

以下示例中的代码会列出 my_db 数据库和 my_schema 架构中名称以文本 my 开头的动态表,并打印每个动态表的名称:

from snowflake.core.dynamic_table import DynamicTableCollection

dt_list = root.databases['my_db'].schemas['my_schema'].dynamic_tables.iter(like='my%')
for dt_obj in dt_list:
  print(dt_obj.name)
Copy

执行动态表操作

您可以使用 DynamicTableResource 对象执行常见的动态表操作,如刷新、暂停和恢复动态表。

为了演示可以对动态表资源执行的一些操作,以下示例中的代码会执行以下操作:

  1. 获取 my_dynamic_table 动态表资源对象。

  2. 刷新动态表。

  3. 暂停动态表。

  4. 恢复动态表。

  5. 删除动态表

my_dynamic_table_res = root.databases["my_db"].schemas["my_schema"].dynamic_tables["my_dynamic_table"]

my_dynamic_table_res.refresh()
my_dynamic_table_res.suspend()
my_dynamic_table_res.resume()
my_dynamic_table_res.drop()
Copy
语言: 中文