使用 Python 管理 Snowflake 动态表¶
您可以使用 Python 来管理 Snowflake 动态表,这些表是用于连续处理管道的新表类型。动态表物化指定查询的结果。有关此功能的概述,请参阅 动态表。
Snowflake Python APIs 使用两种不同类型来表示动态表:
DynamicTable
:显示动态表的属性,例如其表名、目标滞后、仓库和查询语句。DynamicTableResource
:显示可用于获取相应DynamicTable
对象、暂停和恢复动态表以及删除动态表的方法。
先决条件¶
在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root
对象以使用 Snowflake Python APIs 的代码。
例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
代码可通过生成的 Session
对象创建 Root
对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python APIs 连接到 Snowflake。
创建动态表¶
要创建动态表,请先创建一个 DynamicTable
对象,然后根据 API Root
对象创建 DynamicTableCollection
对象。使用 DynamicTableCollection.create
将新动态表添加到 Snowflake。
以下示例中的代码会创建一个 DynamicTable
对象,其表示 my_db
数据库和 my_schema
架构中名为 my_dynamic_table
并指定了最少的必需选项的动态表:
from snowflake.core.dynamic_table import DynamicTable, DownstreamLag
my_dt = DynamicTable(
name='my_dynamic_table',
target_lag=DownstreamLag(),
warehouse='my_wh',
query='SELECT * FROM t',
)
dynamic_tables = root.databases['my_db'].schemas['my_schema'].dynamic_tables
dynamic_tables.create(my_dt)
该代码会创建一个 DynamicTableCollection
变量 dynamic_tables
,并使用 DynamicTableCollection.create
在 Snowflake 中创建了一个新动态表。
以下示例中的代码会创建一个 DynamicTable
对象,其表示 my_db
数据库和 my_schema
架构中一个名为 my_dynamic_table2
并指定了当前所有可能的必需选项的动态表:
from snowflake.core.dynamic_table import DynamicTable, UserDefinedLag
root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
DynamicTable(
name='my_dynamic_table2',
kind='PERMANENT',
target_lag=UserDefinedLag(seconds=60),
warehouse='my_wh',
query='SELECT * FROM t',
refresh_mode='FULL',
initialize='ON_SCHEDULE',
cluster_by=['id > 1'],
comment='test table',
data_retention_time_in_days=7,
max_data_extension_time_in_days=7,
)
)
克隆动态表¶
下面示例中的代码会创建一个名为 my_dynamic_table2
的新动态表,该表具有与 my_db
数据库和 my_schema
架构内源动态表 my_dynamic_table
中的相同列定义和所有现有数据:
备注
此克隆操作使用
DynamicTableClone
对象,该对象包括可选的target_lag
和warehouse
参数,当前不支持其他参数。
from snowflake.core.dynamic_table import DynamicTableClone
root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
DynamicTableClone(
name='my_dynamic_table2',
warehouse='my_wh2',
),
clone_table='my_dynamic_table',
)
有关此功能的更多信息,请参阅 CREATE DYNAMIC TABLE ... CLONE。
获取动态表详细信息¶
您可以通过调用 DynamicTableResource.fetch
方法来获取有关动态表的信息,该方法会返回一个 DynamicTable
对象。
以下示例中的代码会获取有关 my_db
数据库和 my_schema
架构中名为 my_dynamic_table
的动态表的信息:
dynamic_table = root.databases['my_db'].schemas['my_schema'].dynamic_tables['my_dynamic_table']
dt_details = dynamic_table.fetch()
print(dt_details.to_dict())
列出动态表¶
您可以使用 DynamicTableCollection.iter
方法列出动态表,该方法会返回 DynamicTable
对象的 PagedIter
迭代器。
以下示例中的代码会列出 my_db
数据库和 my_schema
架构中名称以文本 my
开头的动态表,并打印每个动态表的名称:
from snowflake.core.dynamic_table import DynamicTableCollection
dt_list = root.databases['my_db'].schemas['my_schema'].dynamic_tables.iter(like='my%')
for dt_obj in dt_list:
print(dt_obj.name)
执行动态表操作¶
您可以使用 DynamicTableResource
对象执行常见的动态表操作,如刷新、暂停和恢复动态表。
为了演示可以对动态表资源执行的一些操作,以下示例中的代码会执行以下操作:
获取
my_dynamic_table
动态表资源对象。刷新动态表。
暂停动态表。
恢复动态表。
删除动态表
my_dynamic_table_res = root.databases["my_db"].schemas["my_schema"].dynamic_tables["my_dynamic_table"]
my_dynamic_table_res.refresh()
my_dynamic_table_res.suspend()
my_dynamic_table_res.resume()
my_dynamic_table_res.drop()