Streams 简介

本主题介绍使用流获取变更数据的关键概念。

本主题内容:

偏移存储

在创建流时,流会将时间点(称为 偏移)初始化为对象的当前事务版本,从而通过逻辑方式获取源对象(例如表、外部表或视图的基础表)中每一行的初始快照。随后,流使用的变更跟踪系统会记录有关获取此快照后的 DML 变更的信息。变更记录提供了行在变更前后的状态。变更信息会镜像所跟踪源对象的列结构,并包括描述各变更事件的其他元数据列。

请注意,流本身 包含任何表数据。流仅存储源对象的偏移,并利用源对象的版本控制历史记录返回 CDC 记录。在创建表的第一个流时,源表中会添加多个隐藏列,并开始存储变更跟踪元数据。这些列会占用少量存储空间。在查询流时返回的 CDC 记录依赖于流中存储的 偏移 和表中存储的 变更跟踪元数据 的组合。请注意,对于视图上的流,必须为视图和基础表显式启用变更跟踪,只有这样才能向这些表添加隐藏列。

为了理解流,不妨将它视为书签,用来指示书籍(相当于源对象)页面中的某个时间点。您可以丢弃书签,也可以在书籍中的不同位置插入其他书签。同样,可以删除流,并在相同或不同的时间点创建其他流(方法有两种:在不同时间连续创建流,或者使用 Time Travel),以使用一个对象的偏移相同或不同的变更记录。

CDC 记录使用者的一个示例是 数据管道,其中仅有符合此条件的数据才会转换并复制到其他表中:自上次提取以来发生过变更的暂存表中的数据。

表的版本控制

每当将包含一条或多条 DML 语句的事务提交到该表时,都会创建新的表版本。这适用于以下表类型:

  • 标准表

  • 目录表

  • 外部表

  • 视图的基础表

在表的事务历史记录中,流偏移位于两个表版本之间。查询流将返回由偏移之后、当前时间或之前提交的事务引起的变更。

以下示例在时间线中显示了具有 10 个已提交版本的源表。流 s1 的偏移目前介于表版本 v3v4 之间。查询(或使用)流时,返回的记录包括表版本 v4 (在表时间线中紧接在流偏移之后)与 v10 (时间线中最近提交的表版本)之间的所有事务,包含这两个版本在内。

流偏移示例

流提供从其当前偏移到表的当前版本的最小变更集。

多个查询可以独立使用流中的相同变更数据,而无需更改偏移。 当在 DML 事务中使用时,流才会推进偏移。这种行为同时适用于显式事务和 自动提交 事务。(默认情况下,执行 DML 语句时,将会隐式启动自动提交事务,并在语句完成时提交事务。这种行为由 AUTOCOMMIT 参数控制。仅对流执行查询这项操作本身不会推进其偏移,即使在显式事务中也是如此;流内容必须在 DML 语句中使用。

备注

要将数据流的偏移推进到当前表版本,而不在 DML 操作中使用变更数据,请完成以下任一操作:

  • 重建流(使用 CREATE OR REPLACE STREAM 语法)。

  • 将当前变更数据插入到临时表中。在 INSERT 语句中查询流,但包含一个 WHERE 子句,用于筛选出所有变更数据(例如 WHERE 0 = 1)。

在 SQL 语句查询显式事务中的流时,在事务开始时的流推进点(即时间戳)而非语句运行时查询流。此行为既与 DML 语句有关,也与 CREATE TABLE ...AS SELECT (CTAS) 语句(使用现有流中的行填充新表)有关。

只要事务成功提交,从流中进行选择的 DML 语句就会使用流中的所有变更数据。为确保多个语句访问流中的相同变更记录,请将其置于显式事务语句 (BEGIN ..:doc:/sql-reference/sql/commit) 中。这样做将锁定流。并行事务中对源对象的 DML 更新由变更跟踪系统进行跟踪,但在提交显式事务语句并且消耗现有变更数据之前,不会对流进行更新。

可重复读取隔离

Streams 支持可重复的读取隔离。在可重复读取模式下,事务中的多条 SQL 语句可查看流中的同一组记录。这与表支持的读取提交模式不同,在这种模式下,语句会查看在同一事务中执行的先前语句所做的任何变更,即使这些变更尚未提交。

在事务中,流返回的增量记录是从流的当前位置到事务开始时间的范围。如果事务提交,则流位置推进到事务开始时间;否则会保持在原位不变。

请参考以下示例:

时间

事务 1

事务 2

1

开始事务。

2

在表 t1 上查询流 s1。流返回当前位置到 . 事务 1 开始时间之间的变更数据获取记录。如果在 DML 语句中使用该流,. 则该流将被锁定,以避免并发事务发生变更。

3

更新表 t1 中的行。

4

查询流 s1。返回在 时间 2 使用流时的相同状态。

5

提交事务。 如果流是在事务内的 DML 语句中使用的,则流位置将推进到事务开始时间。

6

开始事务。

7

查询流 s1。结果包括由事务 1 提交的表更改。

在事务 1 中,所有对流 s1 的查询都会看到同一组记录。只有在提交事务时,对表 t1 的 DML 变更才会记录到流中。

在事务 2 中,对流的查询会看到记入事务 1 中表的变更。请注意,如果事务 2 在事务 1 提交 之前 就已经开始,则对流的查询将返回从流位置到事务 2 开始时间的流快照,并且不会看到由事务 1 提交的任何更改。

流列

流存储的是源对象的偏移,而非任何实际的表列或数据。查询时,流会访问并返回与源对象形式相同(即具有相同的列名和排序)的历史数据,并包含如下额外的列:

METADATA$ACTION:

指示记录到的 DML 操作(INSERT、DELETE)。

METADATA$ISUPDATE:

指示操作是否是 UPDATE 语句的一部分。对源对象中行的更新表示为流中的一对 DELETE 和 INSERT 记录,其中元数据列 METADATA$ISUPDATE 的值设置为 TRUE。

请注意,流会记录两个偏移之间的差异。如果在当前偏移中添加了一行,随后进行了更新,则增量变更是新行。METADATA$ISUPDATE 行会记录一个 FALSE 值。

METADATA$ROW_ID:

指定行的唯一且不可变 ID,可用于跟踪特定行随时间推移而发生的变化。

Snowflake 就 METADATA$ROW_ID 提供以下保证:

  1. METADATA$ROW_ID 取决于流的源对象。

    例如,表 table1 中的流 stream1 和表 table1 中的流 stream2 在相同的行中产生了相同的 METADATA$ROW_ID,但是视图 view1 中的流 stream_view 不能保证产生与 stream1 相同的 METADATA$ROW_ID,即使 view 使用 CREATE VIEW view AS SELECT * FROM table1 语句定义。

  2. 源对象上的流和源对象克隆上的流会对克隆时存在的行产生相同的 METADATA$ROW_ID。

  3. 源对象上的流和源对象副本上的流会对复制的行产生相同的 METADATA$ROW_ID。

流的类型

可用的流类型如下,划分依据是每种流类型记录到的元数据:

标准型:

:emph:` 表、目录表或视图上的流支持此类型。`标准(即增量)流跟踪对源对象的所有 DML 变更,包括插入、更新和删除(包括表截断在内)。这种流类型对变更集中插入和删除的行执行联接,以提供行级别增量。例如,最终效果是,在表中的两个事务时间点之间插入且随后删除的行将在增量中删除(即在查询流时不会返回此类行)。

备注

标准流无法检索地理空间数据的变更数据。我们建议在包含地理空间数据的对象上创建仅追加流。

仅追加:

:emph:` 标准表、目录表或视图上的流支持此类型。`仅追加流专门跟踪行插入。仅追加流不会捕获更新、删除和截断操作。例如,如果最初向表中插入了 10 行,然后在仅追加流向前推进偏移量之前删除了其中的 5 行,那么该流将仅记录 10 行插入的记录。

仅追加流专门返回追加的行,因此在提取、加载和转换 (ELT),以及仅依赖于行插入的类似情景,其性能可能比标准流要高得多。例如,源表可在仅追加流中的行被使用后立即截断,并且下次查询或使用流时,该记录删除不会增加开销。

不支持在目标账户中使用辅助对象作为源创建仅追加流。

仅插入:

:emph:` 仅外部表上的流支持此类型。`仅插入流只会跟踪行插入;它们不会记录从插入集中删除行的删除操作(即无操作)。例如,在任意两个偏移之间,如果从外部表引用的云存储位置中删除文件 1 并添加文件 2,则此类流仅返回文件 2 中行的记录。不同于跟踪标准表的 CDC 数据,Snowflake 无法访问云存储中文件的历史记录。

覆盖或追加的文件实际上是作为新文件处理的:旧版本的文件将从云存储中删除,但仅插入流不会记录删除操作。新版本的文件将添加到云存储中,仅插入流会将行记录为插入。流不会记录旧文件版本与新文件版本的差异。请注意,追加操作可能不会触发外部表元数据的自动刷新,例如在使用 Azure AppendBlobs 时。

数据流

下图显示了标准流的内容如何随着源表中行的更新而变化。每当 DML 语句使用流内容时,流位置都会推进,以跟踪对于表的下一组 DML 更改(即 表版本 中的更改):

流示例

数据保留期与过期

在流的偏移超出其源表(或源视图的基础表)的数据保留期时,流将变为过期。过期的流无法再访问源表的历史数据,包括任何未使用的变更记录。要跟踪表的 变更记录,请重新创建流(使用 CREATE STREAM)。为防止流过期,请在表的保留期内使用事务中的流记录。如果流为空,且 SYSTEM$STREAM_HAS_DATA 返回 FALSE,则在数据流上调用 SYSTEM$STREAM_HAS_DATA 函数可防止数据流过时。有关数据保留期的更多信息,请参阅 了解和使用 Time Travel

备注

此限制 适用于没有数据保留期的目录表或外部表上的流,此类流没有数据保留期。

此外,共享表或视图上的流不会分别延长表或基础表的数据保留期。有关更多信息,请参阅 共享对象上的流

如果表的数据保留期 不足 14 天,且流尚未被消耗,Snowflake 会暂时延长此时间段,以防止其过期。该期限将延长至流的偏移,默认情况下最多为 14 天,这与账户的 Snowflake 版本 无关。Snowflake 可以为数据保留期延长的最大天数由 MAX_DATA_EXTENSION_TIME_IN_DAYS 参数值决定。在流被使用时,延长的数据保留期将缩减到表的默认保留期。

下表显示了示例 DATA_RETENTION_TIME_IN_DAYS 和 MAX_DATA_EXTENSION_TIME_IN_DAYS 值,并指示为避免流过期,应该按照怎样的频率使用流内容:

DATA_RETENTION_TIME_IN_DAYS

MAX_DATA_EXTENSION_TIME_IN_DAYS

在 X 天内使用流

14

0

14

1

14

14

0

90

90

要查看流的当前过期状态,请执行 DESCRIBE STREAMSHOW STREAMS 命令。STALE_AFTER 列时间戳指示当前预测流在何时会过期(如果此时间戳是过去,则为其过期时间)。这是源对象的延长数据保留期。此时间戳的计算方式是,将源对象的 DATA_RETENTION_TIME_IN_DAYS 或 MAX_DATA_EXTENSION_TIME_IN_DAYS 参数设置中较大的值与当前时间戳相加。

备注

如果源表的数据保留期在架构或数据库级别设置,则当前角色必须有权访问架构或数据库才能计算 STALE_AFTER 值。

使用流的变更数据会将 STALE_AFTER 时间戳向前移动。请注意,从流读取可能会在 STALE_AFTER 时间戳后的一段时间内成功。但在此期间,流可能会随时过期。该 STALE 列指示流当前是否应已过期,尽管该流实际上可能尚未过期。

为避免流过期,我们强烈建议您定期在其 STALE_AFTER 时间戳之前(即在源对象的延长数据保留期内)使用其变更数据。

在 STALE_AFTER 时间戳过后,即使流没有未使用的记录,流也可能随时过期。请注意,即使源对象有变更数据,查询流时也可能返回 0 条记录。例如,仅追加流仅跟踪行插入,但更新和删除活动也会向源对象写入变更记录。重聚类就是一个不会产生变更数据的表写入示例,只是不那么清晰。

使用流的变更数据时,会将其偏移提前到当前值,而不管中间版本是否包含任何变更数据。

重要

  • 重新创建对象(使用 CREATE OR REPLACE TABLE 语法)会删除其历史记录,这也会导致表或视图上的任何流过期。此外,重新创建或删除视图的任何基础表时,都会导致视图上的任何流过期。

  • 目前,在克隆包含流及其源表(或源视图的基础表)的数据库或架构时,流克隆中任何未使用的记录都无法访问。此行为与表的 Time Travel 一致。如果克隆了表,则表克隆的历史数据将从创建克隆的时间/点开始。

  • 重命名源对象不会中断流,也不会导致其过期。此外,如果删除了源对象并创建了具有相同名称的新对象,则链接到原始对象的任何流都 不会 链接到新对象。

流的多个使用者

我们建议用户为对象变更记录的每个使用者创建一个单独的流。“使用者”是指利用 DML 事务使用对象变更记录的任务、脚本或其他机制。如本主题前面所述,流在 DML 事务中使用时,其偏移会推进。在单个流中,变更数据的不同使用者会检索不同的增量,除非使用了 Time Travel。利用 DML 事务使用从流中的最新偏移获取到的变更数据时,流会推进偏移。变更数据不再可供下一个使用者使用。要为对象使用 相同 的变更数据,请为该对象创建多个流。流仅存储源对象的偏移,而 会存储任何实际的表列数据;因此,您可以为对象创建任意多个流,而不会产生高额的费用。

视图上的流

视图上的流支持本地视图,以及通过 Snowflake Secure Data Sharing 共享的视图,包括安全视图。目前,流无法跟踪物化视图中的变更。

流仅限于满足以下要求的视图:

基础表:
  • 所有基础表都必须是原生表。

  • 该视图只能应用以下操作:

    • 预测

    • 筛选器

    • 内联接或交叉联接

    • UNION ALL

只要完全展开的查询满足此要求表中的其他要求,就支持在 FROM 子句中使用嵌套视图和子查询。

查看查询:

一般要求:

  • 查询可以选择任意多个列。

  • 查询可包含任意多个 WHERE 谓词。

  • 目前还不支持具有以下操作的视图:

    • GROUP BY 子句

    • QUALIFY 子句

    • 不在 FROM 中未的子查询

    • 相关子查询

    • LIMIT 子句

    • DISTINCT 子句

函数:

  • 选择列表中的函数必须是由系统定义的标量函数。

变更跟踪:

必须在基础表中启用变更跟踪。

在视图上创建流之前,必须为视图的基础表启用变更跟踪。有关说明,请参阅 在视图和基础表上启用变更跟踪

联接结果行为

如果一个流跟踪包含联接的视图的变更,在检查其结果时,了解所联接的数据非常重要。自流偏移以来,在左侧表上发生的变更将与右侧表联接,由于流偏移而在右侧表上发生的变更将与左侧表联接,两个表上自流偏移以来发生的变更将相互联接。

请参考以下示例:

创建了两个表:

create or replace table orders (id int, order_name varchar);
create or replace table customers (id int, customer_name varchar);
Copy

创建了一个视图,以基于 id 联接两个表。每个表都有一行与另一行联接:

create or replace view ordersByCustomer as select * from orders natural join customers;
insert into orders values (1, 'order1');
insert into customers values (1, 'customer1');
Copy

将会创建一个流,跟踪对视图的变更:

create or replace stream ordersByCustomerStream on view ordersBycustomer;
Copy

该视图有一个条目,而流没有条目,因为自流当前偏移以来,表没有发生任何变更:

select * from ordersByCustomer;
+----+------------+---------------+
| ID | ORDER_NAME | CUSTOMER_NAME |
|----+------------+---------------|
|  1 | order1     | customer1     |
+----+------------+---------------+

select * exclude metadata$row_id from ordersByCustomerStream;
+----+------------+---------------+-----------------+-------------------+
| ID | ORDER_NAME | CUSTOMER_NAME | METADATA$ACTION | METADATA$ISUPDATE |
|----+------------+---------------+-----------------+-------------------|
+----+------------+---------------+-----------------+-------------------+
Copy

对基础表进行更新后,选择 ordersByCustomerStream 将生成此记录: orders x Δ customers + Δ orders x customers + Δ orders x Δ customers,其中:

  • Δ orders 和 Δ customers 是自流偏移以来,每个表发生的变更。

  • orders 和 customers 是表在当前流偏移处的整体内容。

请注意,由于 Snowflake 中的优化,计算此表达式的费用并非总是与输入大小成线性比例关系。

如果在 orders 中插入另一个联接行,则 ordersByCustomer 将有一个新行:

insert into orders values (1, 'order2');
select * from ordersByCustomer;
+----+------------+---------------+
| ID | ORDER_NAME | CUSTOMER_NAME |
|----+------------+---------------|
|  1 | order1     | customer1     |
|  1 | order2     | customer1     |
+----+------------+---------------+
Copy

ordersByCustomersStream 中进行选择时,会产生一个新行,因为 Δ orders x customers 包含新插入,且 orders x Δ customers + Δ orders x Δ customers 为空:

select * exclude metadata$row_id from ordersByCustomerStream;
+----+------------+---------------+-----------------+-------------------+
| ID | ORDER_NAME | CUSTOMER_NAME | METADATA$ACTION | METADATA$ISUPDATE |
|----+------------+---------------+-----------------+-------------------|
|  1 | order2     | customer1     | INSERT          | False             |
+----+------------+---------------+-----------------+-------------------+
Copy

如果随后将另一个联接行插入到 customers,则 ordersByCustomer 总共将有三个 行:

insert into customers values (1, 'customer2');
select * from ordersByCustomer;
+----+------------+---------------+
| ID | ORDER_NAME | CUSTOMER_NAME |
|----+------------+---------------|
|  1 | order1     | customer1     |
|  1 | order2     | customer1     |
|  1 | order1     | customer2     |
|  1 | order2     | customer2     |
+----+------------+---------------+
Copy

ordersByCustomersStream 中进行选择时,会生成三行,因为 Δ orders x customersorders x Δ customers 和 Δ orders x Δ customers 各会生成一行:

select * exclude metadata$row_id from ordersByCustomerStream;
+----+------------+---------------+-----------------+-------------------+
| ID | ORDER_NAME | CUSTOMER_NAME | METADATA$ACTION | METADATA$ISUPDATE |
|----+------------+---------------+-----------------+-------------------|
|  1 | order1     | customer2     | INSERT          | False             |
|  1 | order2     | customer1     | INSERT          | False             |
|  1 | order2     | customer2     | INSERT          | False             |
+----+------------+---------------+-----------------+-------------------+
Copy

请注意,对于仅追加流,Δ orders 和 Δ customers 将仅包含行插入,而 orderscustomers 将包含表的完整内容,包括流偏移之前发生的任何更新。

CHANGES 子句:流的只读替代方案

作为流的替代方案,Snowflake 支持为 SELECT 语句使用 CHANGES 子句查询表或视图的变更跟踪元数据。CHANGES 子句允许查询两个时间点之间的变更跟踪元数据,而不必创建具有显式事务偏移的流。使用 CHANGES 子句 不会 推进偏移(即使用记录)。多个查询可检索不同事务开始点和结束点之间的变更跟踪元数据。此选项要求使用 AT | BEFORE 子句指定元数据的事务开始点;可使用可选的 END 子句设置变更跟踪间隔的结束点。

流存储当前事务 表版本,在大多数情况下,均为 CDC 记录的适当来源。对于需要管理任意时间段偏移的罕见情况,可使用 CHANGES 子句。

目前,在记录变更跟踪元数据之前,必须满足以下条件:

:

要么在表上启用更改跟踪(使用 ALTER TABLE ... CHANGE_TRACKING = TRUE),要么在表上创建流(使用 CREATE STREAM)。

视图:

为视图及其基础表启用变更跟踪。有关说明,请参阅 在视图和基础表上启用变更跟踪

启用变更跟踪会将多个隐藏列添加到表中,并开始存储变更跟踪元数据。这些隐藏 CDC 数据列中的值为流 元数据列 提供输入。这些列会占用少量存储空间。

在满足这些条件之一之前的时间段内,没有任何对象的变更跟踪元数据可用。

所需访问权限

若要查询流,至少需要具备以下角色权限的角色:

对象

权限

备注

数据库

USAGE

架构

USAGE

SELECT

SELECT

仅表上的流。

视图

SELECT

仅视图上的流。

外部暂存区

USAGE

仅目录表(位于外部暂存区中)上的流

内部暂存区

READ

仅目录表上的流(位于内部暂存区中)

流的计费

如本主题中的 数据保留期和过期 所述,如果流未得到定期使用,Snowflake 会 暂时 延长源表或源视图中基础表的数据保留期。如果表的数据保留期不足 14 天,则在后台,该期限将延长至流事务偏移或 14 天(如果表的数据保留期少于 14 天),以 较小 者为准,而不考虑账户的 Snowflake 版本

延长数据保留期需要占用额外的存储空间,这将体现在您的每月存储费用中。

与流关联的主要费用是虚拟仓库为查询流而使用的处理时间。这些费用在账单上显示为您熟悉的 Snowflake Credit。

语言: 中文