CREATE PIPE¶
Creates a new pipe in the system for defining the COPY INTO <table> statement used by Snowpipe to load data from an ingestion queue, or by Snowpipe Streaming with high-performance architecture to load data from a streaming source directly into tables.
语法¶
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
[ AUTO_INGEST = [ TRUE | FALSE ] ]
[ ERROR_INTEGRATION = <integration_name> ]
[ AWS_SNS_TOPIC = '<string>' ]
[ INTEGRATION = '<string>' ]
[ COMMENT = '<string_literal>' ]
AS <copy_statement>
备注
您可以使用具有两种不同类型的数据源的 <copy_statement>:
暂存位置:
COPY INTO mytable FROM @mystage ...流式传输源:
COPY INTO mytable FROM (SELECT ... FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')))
必填参数¶
name管道标识符对于创建管道的架构必须是唯一的。
标识符必须以字母字符开头,且不能包含空格或特殊字符,除非整个标识符字符串放在双引号内(例如
"My object")。放在双引号内的标识符也区分大小写。有关更多详细信息,请参阅 标识符要求。
copy_statementCOPY INTO <table> 语句用于将排队文件中的数据加载到 Snowflake 表中。该语句用作管道的文本/定义,并会在 SHOW PIPES 输出中显示。
备注
我们目前不建议在 Snowpipe 的 copy_statement 中使用以下函数:
CURRENT_DATE
CURRENT_TIME
CURRENT_TIMESTAMP
GETDATE
LOCALTIME
LOCALTIMESTAMP
SYSDATE
SYSTIMESTAMP
一个已知的问题是,使用这些函数插入的时间值可能比 COPY_HISTORY 函数 或 COPY_HISTORY 视图 返回的 LOAD_TIME 值早几个小时。
建议改为查询 METADATA$START_SCAN_TIME,这样会更准确地表示记录加载。
可选参数¶
AUTO_INGEST = TRUE | FALSE指定是从内部还是外部暂存区自动加载数据文件:
设置为
TRUE会启用自动数据加载。Snowpipe 支持从外部暂存区(Amazon S3、Google Cloud Storage 或 Microsoft Azure)加载数据。
设置为
FALSE会禁用自动数据加载。必须调用 Snowpipe REST API 端点才能加载数据文件。Snowpipe 支持从内部暂存区(即 Snowflake 命名暂存区或表暂存区,但 不是 用户暂存区)或外部暂存区(Amazon S3、Google Cloud Storage 或 Microsoft Azure)加载数据。
ERROR_INTEGRATION = 'integration_name'仅当配置 Snowpipe 将错误通知发送到云消息传递服务时才需要。
指定用于与消息传递服务通信的通知集成的名称。有关更多信息,请参阅 Snowpipe 错误通知。
AWS_SNS_TOPIC = 'string'仅当使用 SNS 为 Amazon S3 外部暂存区配置 AUTO_INGEST 时才需要。
指定 S3 桶中 SNS 主题的 Amazon Resource Name (ARN)。CREATE PIPE 语句将 Amazon Simple Queue Service (SQS) 队列订阅至指定的 SNS 主题。管道将文件复制到引入队列(由事件通知通过 SNS 主题触发)。有关更多信息,请参阅 针对 Amazon S3 自动执行 Snowpipe。
INTEGRATION = 'string'仅当为 Google Cloud Storage 或 Microsoft Azure 外部暂存区配置 AUTO_INGEST 时才需要。
指定用于访问存储队列的现有通知集成。有关更多信息,请参阅:
集成名称必须全部使用大写输入。
COMMENT = 'string_literal'指定管道的注释。
默认:无值
适用于采用高性能架构的 Snowpark Streaming 的管道¶
您可以定义适用于 Snowpipe Streaming 的管道,直接从 Snowpipe Streaming API 加载数据,而不需要暂存文件位置。此方法专为低延迟、基于行的引入而设计。
用于流式传输管道的 COPY INTO 语句必须在 FROM 子句中使用 DATA_SOURCE 表函数,并附带 TYPE => 'STREAMING' 实参。
For example:
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE
FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')))
备注
为流式传输创建的管道不需要
AUTO_INGEST参数或FROM @stage子句。流式传输管道中的
copy_statement用于转换和加载从 API 接收的数据。
使用说明¶
此 SQL 命令需要以下最低权限:
权限
对象
备注
CREATE PIPE
架构
USAGE
管道定义中的暂存区
仅限外部暂存区
USAGE
集成
接收 Snowpipe 错误通知时需要
READ
管道定义中的暂存区
仅限内部暂存区
SELECT、INSERT
管道定义中的表
对架构对象进行 SQL 操作还需要对包含该对象的数据库和架构具有 USAGE 权限。
除以下选项 之外,支持所有 COPY INTO <table> 复制选项:
FILES = ( 'file_name1' [ , 'file_name2', ... ] )ON_ERROR = ABORT_STATEMENTSIZE_LIMIT = num:samp:`PURGE = TRUE | FALSE`(即在加载时自动清除)
FORCE = TRUE | FALSE请注意,在加载文件后,可以使用 REMOVE 命令从内部(即 Snowflake)暂存区手动移除文件。
RETURN_FAILED_ONLY = TRUE | FALSEVALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS
PATTERN = 'regex_pattern'复制选项使用正则表达式筛选要加载的文件集。模式匹配的行为如下,具体取决于 AUTO_INGEST 参数值:AUTO_INGEST = TRUE:正则表达式筛选 COPY INTO <table> 语句中的暂存区和可选路径(即云存储位置)上的文件列表。:AUTO_INGEST = FALSE:正则表达式筛选在调用 Snowpipe REST APIinsertFiles端点时提交的文件列表。
Snowpipe trims any path segments in the stage definition from the storage location and applies the regular expression to any remaining path segments and filenames. To view the stage definition, execute the DESCRIBE STAGE command for the stage. The URL property consists of the bucket or container name and zero or more path segments. For example, if the FROM location in a COPY INTO <table> statement is
@s/path1/path2/and the URL value for stage@siss3://mybucket/path1/, then Snowpipe trimss3://mybucket/path1/path2/from the storage location in the FROM clause and applies the regular expression to the remaining filenames in the path.重要
Snowflake 建议您为 Snowpipe 启用云事件筛选,以降低成本、事件噪音和延迟。仅当云提供商的事件筛选功能不足时,才使用 PATTERN 选项。有关为每个云提供商配置事件筛选的更多信息,请参阅以下页面:
Amazon S3: 使用对象键名称筛选功能配置事件通知 (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-filtering.html)
Microsoft Azure 事件网格: 了解适用于事件网格订阅的事件筛选 (https://docs.microsoft.com/en-us/azure/event-grid/event-filtering)
Google Cloud Pub/Sub: 筛选消息 (https://cloud.google.com/pubsub/docs/filtering)
支持将查询用作源,以便使用 COPY 语句进行列重新排序、列省略和类型转换(即在加载期间转换数据)。有关用法示例,请参阅 在加载期间转换数据。请注意,仅支持简单的 SELECT 语句。不支持使用 WHERE 子句进行筛选。
管道定义不是动态的(即,如果基础暂存区或表发生变更,例如重命名或删除了暂存区/表,管道并不会自动更新)。在这种情况下,必须创建一个新管道,并在将来的 Snowpipe REST API 调用中提交此管道名称。
关于元数据:
注意
客户应确保在使用 Snowflake 服务时,不会将个人数据(用户对象除外)、敏感数据、出口管制数据或其他受监管数据作为元数据输入。有关更多信息,请参阅 Snowflake 中的元数据字段。
The OR REPLACE and IF NOT EXISTS clauses are mutually exclusive. They can't both be used in the same statement.
CREATE OR REPLACE <object> 语句是原子的。也就是说,当对象被替换时,旧对象将被删除,新对象将在单个事务中创建。
重要
如果重新创建管道(使用 CREATE OR REPLACE PIPE 语法),请参阅 重建管道,以查看相关注意事项和最佳实践。
示例¶
在当前架构中创建管道,以便将暂存在 mystage 暂存区的文件中的所有数据加载到 mytable 中:
CREATE PIPE mypipe
AS
COPY INTO mytable
FROM @mystage
FILE_FORMAT = (TYPE = 'JSON');
与前面的示例相同,但进行了数据转换。按照相反的顺序,仅从暂存文件中的第 4 列和第 5 列加载数据:
CREATE PIPE mypipe2
AS
COPY INTO mytable(C1, C2)
FROM (SELECT $5, $4 FROM @mystage)
FILE_FORMAT = (TYPE = 'JSON');
创建一个管道,将所有数据加载到目标表中与数据中表示的相应列相匹配的列中。列名称不区分大小写。
此外,将 METADATA$START_SCAN_TIME 和 METADATA$FILENAME 元数据列 中的元数据加载到名为 c1 和 c2 的列中。
CREATE PIPE mypipe3
AS
(COPY INTO mytable
FROM @mystage
MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
INCLUDE_METADATA = (c1= METADATA$START_SCAN_TIME, c2=METADATA$FILENAME)
FILE_FORMAT = (TYPE = 'JSON'));
在当前架构中创建管道,以便使用从消息传递服务接收的事件通知自动加载数据:
Amazon S3
CREATE PIPE mypipe_s3
AUTO_INGEST = TRUE
AWS_SNS_TOPIC = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket'
AS
COPY INTO snowpipe_db.public.mytable
FROM @snowpipe_db.public.mystage
FILE_FORMAT = (TYPE = 'JSON');
Google Cloud Storage
CREATE PIPE mypipe_gcs
AUTO_INGEST = TRUE
INTEGRATION = 'MYINT'
AS
COPY INTO snowpipe_db.public.mytable
FROM @snowpipe_db.public.mystage
FILE_FORMAT = (TYPE = 'JSON');
Microsoft Azure
CREATE PIPE mypipe_azure
AUTO_INGEST = TRUE
INTEGRATION = 'MYINT'
AS
COPY INTO snowpipe_db.public.mytable
FROM @snowpipe_db.public.mystage
FILE_FORMAT = (TYPE = 'JSON');
内部命名暂存区
在当前架构中创建一个管道,以自动加载名为 mystage 的内部暂存区上的所有数据文件。
CREATE PIPE mypipe_aws AUTO_INGEST = TRUE AS COPY INTO snowpipe_db.public.mytable FROM @snowpipe_db.public.mystage FILE_FORMAT = (TYPE = 'JSON');