CREATE STREAM¶
在当前/指定的架构中创建新流或替换现有 流。流记录对视图(包括安全视图)中的表、目录表、动态表、外部表或基础表所进行的数据操作语言 (DML) 变更。其变更被记录的对象称为 源对象。
此外,此命令支持以下变体:
CREATE STREAM ...CLONE (创建现有流的克隆)
语法¶
命令语法因创建流所基于的对象而异:
-- table
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
<name>
[ [ WITH ] TAG ( <tag_name> = '<tag_value>' [ , <tag_name> = '<tag_value>' , ... ] ) ]
[ COPY GRANTS ]
ON TABLE <table_name>
[ { AT | BEFORE } ( { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> | STREAM => '<name>' } ) ]
[ APPEND_ONLY = TRUE | FALSE ]
[ SHOW_INITIAL_ROWS = TRUE | FALSE ]
[ COMMENT = '<string_literal>' ]
-- Event table
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
<name>
[ [ WITH ] TAG ( <tag_name> = '<tag_value>' [ , <tag_name> = '<tag_value>' , ... ] ) ]
[ COPY GRANTS ]
ON EVENT TABLE <table_name>
[ COMMENT = '<string_literal>' ]
-- External table
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
<name>
[ [ WITH ] TAG ( <tag_name> = '<tag_value>' [ , <tag_name> = '<tag_value>' , ... ] ) ]
[ COPY GRANTS ]
ON EXTERNAL TABLE <external_table_name>
[ { AT | BEFORE } ( { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> | STREAM => '<name>' } ) ]
[ INSERT_ONLY = TRUE ]
[ COMMENT = '<string_literal>' ]
-- Directory table
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
<name>
[ [ WITH ] TAG ( <tag_name> = '<tag_value>' [ , <tag_name> = '<tag_value>' , ... ] ) ]
[ COPY GRANTS ]
ON STAGE <stage_name>
[ COMMENT = '<string_literal>' ]
-- Dynamic table
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
<name>
[ [ WITH ] TAG ( <tag_name> = '<tag_value>' [ , <tag_name> = '<tag_value>' , ... ] ) ]
[ COPY GRANTS ]
ON DYNAMIC TABLE <table_name>
[ COMMENT = '<string_literal>' ]
-- View
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
<name>
[ [ WITH ] TAG ( <tag_name> = '<tag_value>' [ , <tag_name> = '<tag_value>' , ... ] ) ]
[ COPY GRANTS ]
ON VIEW <view_name>
[ { AT | BEFORE } ( { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> | STREAM => '<name>' } ) ]
[ APPEND_ONLY = TRUE | FALSE ]
[ SHOW_INITIAL_ROWS = TRUE | FALSE ]
[ COMMENT = '<string_literal>' ]
变体语法¶
CREATE STREAM ...CLONE
使用与源流相同的定义创建新流。克隆从源流继承当前 *偏移*(即当前事务性 表版本)。
CREATE [ OR REPLACE ] STREAM <name> CLONE <source_stream> [ COPY GRANTS ] [ ... ]
有关克隆的更多信息,请参阅 CREATE <object> ... CLONE。
必填参数¶
name指定流标识符(即名称)的字符串;对于从中创建流的架构必须是唯一的。
此外,标识符必须以字母字符开头,且不能包含空格或特殊字符,除非整个标识符字符串放在双引号内(例如,
"My object")。放在双引号内的标识符也区分大小写。有关更多详细信息,请参阅 标识符要求。
table_name此字符串指定流(即源表)跟踪其变更的表的标识符(即名称)。
- 访问控制:
要查询流,角色必须具有基础表的 SELECT 权限。
external_table_name此字符串指定流(即源外部表)跟踪其变更的外部表的标识符(即名称)。
- 访问控制:
要查询流,角色必须具有基础外部表的 SELECT 权限。
stage_name此字符串指定流(即源目录表)跟踪其目录表变更的暂存区的标识符(即名称)。
- 访问控制:
要查询流,角色必须具有基础暂存区的 USAGE(外部暂存区)或 READ(内部暂存区)权限。
view_name此字符串指定源视图的标识符(即名称)。流跟踪对视图中基础表的 DML 变更。
有关视图上的流的更多信息,请参阅 Streams on views。
- 访问控制:
要查询流,角色必须具有视图的 SELECT 权限。
可选参数¶
TAG tag_name = 'tag_value' [ , tag_name = 'tag_value' , ... ]指定 标签 名称和标签字符串值。
标签值始终为字符串,标签值的最大字符数为 256。
有关在语句中指定标签的信息,请参阅 Tag quotas。
COPY GRANTS指定该参数后,在使用以下任何 CREATE STREAM 变体创建新流时会保留来自原始流的访问权限:
CREATE OR REPLACE STREAM
CREATE STREAM ...CLONE
该参数将所有权限( OWNERSHIP 除外 )从现有流复制到新流。默认情况下,执行 CREATE STREAM 命令的角色拥有新流。
备注
如果 CREATE STREAM 语句引用多个流(例如
create or replace stream t1 clone t2;),则COPY GRANTS子句优先选择被替换的流。替换流的 SHOW GRANTS 输出会将复制权限的获得者列为执行了 CREATE STREAM 语句的角色,并附带执行该语句时的当前时间戳。
复制授权的操作在 CREATE STREAM 命令中会以原子方式发生(即在同一事务中)。
备注
目前不支持此参数。
{ AT ( { TIMESTAMP => timestamp | OFFSET => time_difference | STATEMENT => id | STREAM => 'name' } ) | BEFORE ( { TIMESTAMP => timestamp | OFFSET => time_difference | STATEMENT => id } ) }在过去的特定时间/点创建流(使用 Time Travel)。AT | BEFORE 子句确定了请求历史数据的过去时间点:
AT关键字指定请求包含时间戳等于指定参数的语句或事务所做的任何变更。STREAM => '<name>'值很特别。如果提供此值,则 CREATE STREAM 语句以与指定流相同的偏移来创建新流。也可以在重新创建现有流(使用OR REPLACE关键字)时提供此值,以便在重新创建流之后保留其当前偏移。'<name>'是现有流的标识符(即名称),该流的偏移被复制到新的或重新创建的流。当在 DML 事务中使用新的或重新创建的流时,该流会像往常一样推进偏移。
BEFORE关键字指定请求引用紧挨着指定参数之前的点。
备注
如果源对象在 AT | BEFORE 子句指定的过去时间点没有可用的变更跟踪数据,则 CREATE STREAM 语句将失败。在记录变更跟踪之前,无法在过去的时间创建流。
APPEND_ONLY = TRUE | FALSE只有标准表上的流或查询标准表的视图上的流才支持此参数。
指定这是否是仅追加流。仅追加流仅跟踪行插入。不会记录更新和删除操作(包括表截断)。例如,如果在表中插入了 10 行,然后在推进仅追加流的偏移之前删除了其中的 5 行,则流会记录 10 行。
与标准流相比,这种类型的流提高了查询性能,对于仅依赖行插入的提取、加载、转换 (ELT) 和类似场景非常有用。
标准流联接变更集中已删除和插入的行,以确定哪些行已删除和哪些行已更新。仅追加流仅返回追加的行,因此比标准流的性能要高得多。例如,源表可在仅追加流中的行被使用后立即截断,并且下次查询或使用该流时,记录删除操作不会增加开销。
- 默认值:
FALSE
INSERT_ONLY = TRUE | FALSE对于外部表和外部管理 Iceberg 表上的流是必需的。其他对象上的流不支持此参数。
指定这是否是仅插入流。仅插入流只会跟踪行插入;它们不会记录从插入集中移除行的删除操作(即无操作)。例如,在任意两个偏移之间,如果从外部表引用的云存储位置中移除
File1并添加File2,则此流仅返回File2中行的记录,无论File1是在请求的更改间隔之前还是之内添加都如此。与跟踪标准表的变更数据获取 (CDC) 不同,对云存储中文件的历史记录的访问不受 Snowflake 的控制,也无法得到其保证。覆盖或追加的文件实际上是作为新文件处理的:旧版本的文件将从云存储中删除,但仅插入流不会记录删除操作。新版本的文件将添加到云存储中,仅插入流会将行记录为插入。流不会记录旧文件版本与新文件版本的差异。请注意,追加操作可能不会触发外部表元数据的自动刷新,例如在使用 Azure AppendBlobs 时。
- 默认值:
FALSE
SHOW_INITIAL_ROWS = TRUE | FALSE指定首次使用流时要返回的记录。
TRUE流 仅 返回创建流时源对象中存在的行。METADATA$ISUPDATE 列显示这些行中的 FALSE 值。随后,流返回自最近的偏移以来对源对象的任何 DML 变更,即正常的流行为。
此参数允许使用流的源对象的内容初始化任何下游过程。
FALSE流返回自最近的偏移以来对源对象的任何 DML 变更。
- 默认值:
FALSE
COMMENT = 'string_literal'此字符串(字面量)指定流的注释。
默认:无值
输出¶
流的输出包含与源对象相同的列,以及以下附加列:
METADATA$ACTION:指定操作(INSERT 或 DELETE)。
METADATA$ISUPDATE:指定记录的操作(INSERT 或 DELETE)是否是应用于源表或视图中行的 UPDATE 的一部分。
请注意,流会记录两个偏移之间的差异。如果在当前偏移中添加了一行,随后进行了更新,则增量变更是新行。METADATA$ISUPDATE 行会记录一个 FALSE 值。
METADATA$ROW_ID:指定行的唯一且不可变 ID,它可用于跟踪一段时间内对特定行的变更。
访问控制要求¶
标准表上的流:
对象
权限
备注
架构
CREATE STREAM
表
SELECT
如果尚未在源表上启用变更跟踪(使用 ALTER TABLE ...SET CHANGE_TRACKING = TRUE),则只有表所有者(即对表具有 OWNERSHIP 权限的角色)可以在表上创建初始流。创建初始流会自动在表上启用变更跟踪。
Operating on an object in a schema requires at least one privilege on the parent database and at least one privilege on the parent schema.
视图上的流:
对象
权限
备注
架构
CREATE STREAM
视图
SELECT
如果尚未在源视图及其基础表上启用变更跟踪,则只有对视图具有 OWNERSHIP 权限的角色和视图基础表的所有者才能在视图上创建初始流。创建初始流会自动在表上启用变更跟踪。有关在视图及其基础表上启用变更跟踪的说明,请参阅 Enabling change tracking on views and underlying tables。 请注意,在启用变更跟踪期间,启用变更跟踪会锁定基础表。锁定基础对象可能会导致这些对象的 DDL/DML 操作出现延迟。有关更多信息,请参阅 资源锁定。
Operating on an object in a schema requires at least one privilege on the parent database and at least one privilege on the parent schema.
目录表上的流:
对象
权限
备注
架构
CREATE STREAM
暂存区
USAGE(外部暂存区)或 READ(内部暂存区)
Operating on an object in a schema requires at least one privilege on the parent database and at least one privilege on the parent schema.
外部表上的流:
对象
权限
备注
架构
CREATE STREAM
外部表
SELECT
Operating on an object in a schema requires at least one privilege on the parent database and at least one privilege on the parent schema.
有关创建具有指定权限集的自定义角色的说明,请参阅 创建自定义角色。
使用说明¶
可以多次查询一个流以更新同一个事务中的多个对象,该流将返回相同的数据。
在 DML 语句中使用流时,流位置(即 偏移)会被推进。在事务结束时,此位置会更新为事务的开始时间戳。流描述了从流的当前位置开始到当前事务性时间戳结束的变更记录。
为确保多个语句访问流中的相同变更记录,请将其置于显式事务语句 (BEGIN ..:doc:
/sql-reference/sql/commit)。显式事务会锁定流,因此在提交事务之前,不会向流报告源对象的 DML 更新。流没有故障安全期或 Time Travel 保留期。如果删除流,则无法恢复这些对象中的元数据。
共享表上的流:
源表的保留期 不会 自动延长,以防止表上的任何流过时。
标准流无法检索地理空间数据的变更数据。我们建议在包含地理空间数据的对象上创建仅追加流。
视图上的流:
如果使用视图所有者角色(即对视图具有 OWNERSHIP 权限的角色)在视图上创建第一个流,则会在视图上启用变更跟踪。如果同一角色也拥有基础表,则还会在表上启用变更跟踪。如果未向角色授予对视图及其基础表的 OWNERSHIP 权限,则必须手动在适用的对象上启用变更跟踪。有关说明,请参阅 Enabling change tracking on views and underlying tables。
根据视图中的联接数量,基础表中的单项变更可能会导致流输出中出现大量变更。
如果删除或重新创建(使用 CREATE OR REPLACE VIEW)源视图或基础表,则给定视图上的任何流都会中断。
安全视图上的任何流都遵守安全视图约束。
如果非安全视图的所有者(即对视图具有 OWNERSHIP 权限的角色)将该视图更改为安全视图(使用 ALTER VIEW ...SET SECURE),则该视图上的任何流都会自动强制执行安全视图约束。
此外,基础表的保留期 不会 自动延长,以防止安全视图上的任何流过时。
基于视图的流(视图使用非确定性函数)可以返回不确定性的结果。
例如,上下文函数 (如 CURRENT_DATE 和 CURRENT_USER)的结果是不确定的。 数据生成函数 (如 RANDOM)的结果也是不确定的。如果视图包含非确定性函数,则该视图上的任何流都不会是此函数的输出的恒定快照。相反,在查询时,流中的值可能会发生变化。
我们建议您确保视图结果中的不确定性不会影响流查询结果的正确性。
有关示例,请参阅 Stream on a view that calls a non-deterministic SQL function。
目录表上的流: 流输出中的 METADATA$ROW_ID 列值为空。
关于元数据:
注意
客户应确保在使用 Snowflake 服务时,不会将个人数据(用户对象除外)、敏感数据、出口管制数据或其他受监管数据作为元数据输入。有关更多信息,请参阅 Snowflake 中的元数据字段。
OR REPLACE 和 IF NOT EXISTS 子句互斥。它们不能同时用于同一条语句中。
CREATE OR REPLACE <object> 语句是原子的。也就是说,当对象被替换时,旧对象将被删除,新对象将在单个事务中创建。
示例¶
创建表流¶
在 mytable 表上创建一个流:
CREATE STREAM mystream ON TABLE mytable;
将 Time Travel 用于源表¶
在 mytable 表上创建一个流,如同它在指定时间戳中的日期和时间之前存在一样:
CREATE STREAM mystream ON TABLE mytable BEFORE (TIMESTAMP => TO_TIMESTAMP(40*365*86400));
在 mytable 表上创建一个流,如同它恰好在指定时间戳的日期和时间存在一样:
CREATE STREAM mystream ON TABLE mytable AT (TIMESTAMP => TO_TIMESTAMP_TZ('02/02/2019 01:02:03', 'mm/dd/yyyy hh24:mi:ss'));
在 mytable 表上创建一个流,如同它在 5 分钟之前存在一样:
CREATE STREAM mystream ON TABLE mytable AT(OFFSET => -60*5);
在 mytable 表上创建一个流,使其偏移与同一源表上的现有流 oldstream 相同:
CREATE STREAM mystream ON TABLE mytable AT(STREAM => 'oldstream');
重新创建现有的 mystream 流,但保留其当前偏移:
CREATE OR REPLACE STREAM mystream ON TABLE mytable AT(STREAM => 'mystream');
在 mytable 表上创建一个流,使其包含一直到指定事务为止的事务,但不包含指定事务所做的任何变更:
CREATE STREAM mystream ON TABLE mytable BEFORE(STATEMENT => '8e5d0ca9-005e-44e6-b858-a8f5b37c5726');
在单表视图上创建流¶
在 myview 视图上创建一个流:
CREATE STREAM mystream ON VIEW myview;
有关其他示例,请参阅 Stream examples。
在外部表上创建仅插入流¶
创建外部表流并查询该流中的变更数据获取记录,这些记录跟踪添加到外部表元数据的记录:
-- Create an external table that points to the MY_EXT_STAGE stage.
-- The external table is partitioned by the date (in YYYY/MM/DD format) in the file path.
CREATE EXTERNAL TABLE my_ext_table (
date_part date as to_date(substr(metadata$filename, 1, 10), 'YYYY/MM/DD'),
ts timestamp AS (value:time::timestamp),
user_id varchar AS (value:userId::varchar),
color varchar AS (value:color::varchar)
) PARTITION BY (date_part)
LOCATION=@my_ext_stage
AUTO_REFRESH = false
FILE_FORMAT=(TYPE=JSON);
-- Create a stream on the external table
CREATE STREAM my_ext_table_stream ON EXTERNAL TABLE my_ext_table INSERT_ONLY = TRUE;
-- Execute SHOW streams
-- The MODE column indicates that the new stream is an INSERT_ONLY stream
SHOW STREAMS;
+-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------+
| created_on | name | database_name | schema_name | owner | comment | table_name | type | stale | mode |
|-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------|
| 2020-08-02 05:13:20.174 -0800 | MY_EXT_TABLE_STREAM | MYDB | PUBLIC | MYROLE | | MYDB.PUBLIC.EXTTABLE_S3_PART | DELTA | false | INSERT_ONLY |
+-------------------------------+------------------------+---------------+-------------+--------------+-----------+------------------------------------+-------+-------+-------------+
-- Add a file named '2020/08/05/1408/log-08051409.json' to the stage using the appropriate tool for the cloud storage service.
-- Manually refresh the external table metadata.
ALTER EXTERNAL TABLE my_ext_table REFRESH;
-- Query the external table stream.
-- The stream indicates that the rows in the added JSON file were recorded in the external table metadata.
SELECT * FROM my_ext_table_stream;
+----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------+
| VALUE | DATE_PART | TS | USER_ID | COLOR | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | METADATA$FILENAME |
|----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------|
| { | 2020-08-05 | 2020-08-05 15:57:01.000 | user25 | green | INSERT | False | | test/logs/2020/08/05/1408/log-08051409.json |
| "color": "green", | | | | | | | | |
| "time": "2020-08-05 15:57:01-07:00", | | | | | | | | |
| "userId": "user25" | | | | | | | | |
| } | | | | | | | | |
| { | 2020-08-05 | 2020-08-05 15:58:02.000 | user56 | brown | INSERT | False | | test/logs/2020/08/05/1408/log-08051409.json |
| "color": "brown", | | | | | | | | |
| "time": "2020-08-05 15:58:02-07:00", | | | | | | | | |
| "userId": "user56" | | | | | | | | |
| } | | | | | | | | |
+----------------------------------------+------------+-------------------------+---------+-------+-----------------+-------------------+-----------------+---------------------------------------------+
在目录表上创建标准流¶
在目录表上为名为 mystage 的暂存区创建流:
CREATE STREAM dirtable_mystage_s ON STAGE mystage;
手动刷新目录表元数据以填充流:
ALTER STAGE mystage REFRESH;
在将一个或多个文件添加到流的最近偏移后面的暂存区之后,查询流:
SELECT * FROM dirtable_mystage_s;
+-------------------+--------+-------------------------------+----------------------------------+----------------------------------+-------------------------------------------------------------------------------------------+-----------------+-------------------+-----------------+
| RELATIVE_PATH | SIZE | LAST_MODIFIED | MD5 | ETAG | FILE_URL | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|-------------------+--------+-------------------------------+----------------------------------+----------------------------------+-------------------------------------------------------------------------------------------+-----------------+-------------------+-----------------|
| file1.csv.gz | 1048 | 2021-05-14 06:09:08.000 -0700 | c98f600c492c39bef249e2fcc7a4b6fe | c98f600c492c39bef249e2fcc7a4b6fe | https://myaccount.snowflakecomputing.cn/api/files/MYDB/MYSCHEMA/MYSTAGE/file1%2ecsv%2egz | INSERT | False | |
| file2.csv.gz | 3495 | 2021-05-14 06:09:09.000 -0700 | 7f1a4f98ef4c7c42a2974504d11b0e20 | 7f1a4f98ef4c7c42a2974504d11b0e20 | https://myaccount.snowflakecomputing.cn/api/files/MYDB/MYSCHEMA/MYSTAGE/file2%2ecsv%2egz | INSERT | False | |
+-------------------+--------+-------------------------------+----------------------------------+----------------------------------+-------------------------------------------------------------------------------------------+-----------------+-------------------+-----------------+