CREATE PIPE¶
在系统中创建一个新管道,用于定义 COPY INTO <table> 语句,Snowpipe 可使用该语句将数据从引入队列加载到表中。
语法¶
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
[ AUTO_INGEST = [ TRUE | FALSE ] ]
[ ERROR_INTEGRATION = <integration_name> ]
[ AWS_SNS_TOPIC = '<string>' ]
[ INTEGRATION = '<string>' ]
[ COMMENT = '<string_literal>' ]
AS <copy_statement>
必填参数¶
name
管道标识符对于创建管道的架构必须是唯一的。
标识符必须以字母字符开头,且不能包含空格或特殊字符,除非整个标识符字符串放在双引号内(例如
"My object"
)。放在双引号内的标识符也区分大小写。有关更多详细信息,请参阅 标识符要求。
copy_statement
COPY INTO <table> 语句用于将排队文件中的数据加载到 Snowflake 表中。该语句用作管道的文本/定义,并会在 SHOW PIPES 输出中显示。
备注
我们目前不建议在 Snowpipe 的 copy_statement
中使用以下函数:
CURRENT_DATE
CURRENT_TIME
CURRENT_TIMESTAMP
GETDATE
LOCALTIME
LOCALTIMESTAMP
SYSDATE
SYSTIMESTAMP
一个已知的问题是,使用这些函数插入的时间值可能比 COPY_HISTORY 函数 或 COPY_HISTORY 视图 返回的 LOAD_TIME 值早几个小时。
建议改为查询 METADATA$START_SCAN_TIME,这样会更准确地表示记录加载。
可选参数¶
AUTO_INGEST = TRUE | FALSE
指定当从配置的消息服务收到事件通知时,是否自动从指定的外部暂存区和可选路径加载数据文件:
设置为
TRUE
会启用自动数据加载。Snowpipe 支持从外部暂存区(Amazon S3、Google Cloud Storage 或 Microsoft Azure)加载数据。
设置为
FALSE
会禁用自动数据加载。必须调用 Snowpipe REST API 端点才能加载数据文件。Snowpipe 支持从内部暂存区(即 Snowflake 命名暂存区或表暂存区,但 不是 用户暂存区)或外部暂存区(Amazon S3、Google Cloud Storage 或 Microsoft Azure)加载数据。
ERROR_INTEGRATION = 'integration_name'
仅当配置 Snowpipe 将错误通知发送到云消息传递服务时才需要。
指定用于与消息传递服务通信的通知集成的名称。有关更多信息,请参阅 Snowpipe 错误通知。
AWS_SNS_TOPIC = 'string'
仅当使用 SNS 为 Amazon S3 暂存区配置 AUTO_INGEST 时才需要。
指定 S3 桶中 SNS 主题的 Amazon Resource Name (ARN)。CREATE PIPE 语句将 Amazon Simple Queue Service (SQS) 队列订阅至指定的 SNS 主题。管道将文件复制到引入队列(由事件通知通过 SNS 主题触发)。有关更多信息,请参阅 针对 Amazon S3 自动执行 Snowpipe。
INTEGRATION = 'string'
仅在为 Google Cloud Storage 或 Microsoft Azure 暂存区配置 AUTO_INGEST 时才需要。
指定用于访问存储队列的现有通知集成。有关更多信息,请参阅:
集成名称必须全部使用大写输入。
COMMENT = 'string_literal'
指定管道的注释。
默认:无值
使用说明¶
此 SQL 命令需要以下最低权限:
权限
对象
备注
CREATE PIPE
架构
USAGE
管道定义中的暂存区
仅限外部暂存区
READ
管道定义中的暂存区
仅限内部暂存区
SELECT、INSERT
管道定义中的表
对架构对象进行 SQL 操作还需要对包含该对象的数据库和架构具有 USAGE 权限。
除以下选项 之外,支持所有 COPY INTO <table> 复制选项:
FILES = ( 'file_name1' [ , 'file_name2', ... ] )
ON_ERROR = ABORT_STATEMENT
SIZE_LIMIT = num
:samp:`PURGE = TRUE | FALSE`(即在加载时自动清除)
FORCE = TRUE | FALSE
请注意,在加载文件后,可以使用 REMOVE 命令从内部(即 Snowflake)暂存区手动移除文件。
RETURN_FAILED_ONLY = TRUE | FALSE
VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS
PATTERN = 'regex_pattern'
复制选项使用正则表达式筛选要加载的文件集。模式匹配的行为如下,具体取决于 AUTO_INGEST 参数值:AUTO_INGEST = TRUE
:正则表达式筛选 COPY INTO <table> 语句中的暂存区和可选路径(即云存储位置)上的文件列表。:AUTO_INGEST = FALSE
:正则表达式筛选在调用 Snowpipe REST APIinsertFiles
端点时提交的文件列表。
请注意,Snowpipe 从存储位置修剪暂存区定义中的任何路径段,并将正则表达式应用于任何剩余的路径段和文件名。要查看暂存区定义,请执行针对暂存区的 DESCRIBE STAGE 命令。URL 属性由桶或容器名称以及零个或多个路径段组成。例如,如果 FROM 位置位于 COPY INTO <table> statement is
@s/path1/path2/
and the URL value for stage@s
iss3://mybucket/path1/
, then Snowpipe trims/path1/
from the storage location in the FROM clause and applies the regular expression topath2/
中,加上路径中的文件名。重要
Snowflake 建议您为 Snowpipe 启用云事件筛选,以降低成本、事件噪音和延迟。仅当云提供商的事件筛选功能不足时,才使用 PATTERN 选项。有关为每个云提供商配置事件筛选的更多信息,请参阅以下页面:
Amazon S3: 使用对象键名称筛选功能配置事件通知 (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-filtering.html)
Microsoft Azure 事件网格: 了解适用于事件网格订阅的事件筛选 (https://docs.microsoft.com/en-us/azure/event-grid/event-filtering)
Google Cloud Pub/Sub: 筛选消息 (https://cloud.google.com/pubsub/docs/filtering)
支持将查询用作源,以便使用 COPY 语句进行列重新排序、列省略和类型转换(即在加载期间转换数据)。有关用法示例,请参阅 在加载期间转换数据。请注意,仅支持简单的 SELECT 语句。不支持使用 WHERE 子句进行筛选。
管道定义不是动态的(即,如果基础暂存区或表发生变更,例如重命名或删除了暂存区/表,管道并不会自动更新)。在这种情况下,必须创建一个新管道,并在将来的 Snowpipe REST API 调用中提交此管道名称。
关于元数据:
注意
客户应确保在使用 Snowflake 服务时,不会将个人数据(用户对象除外)、敏感数据、出口管制数据或其他受监管数据作为元数据输入。有关更多信息,请参阅 Snowflake 中的元数据字段。
CREATE OR REPLACE <object> 语句是原子的。也就是说,当对象被替换时,旧对象将被删除,新对象将在单个事务中创建。
重要
如果重新创建管道(使用 CREATE OR REPLACE PIPE 语法),请参阅 重建管道,以查看相关注意事项和最佳实践。
示例¶
在当前架构中创建管道,以便将暂存在 mystage
暂存区的文件中的所有数据加载到 mytable
中:
CREATE PIPE mypipe AS COPY INTO mytable FROM @mystage FILE_FORMAT = (TYPE = 'JSON');
与前面的示例相同,但进行了数据转换。按照相反的顺序,仅从暂存文件中的第 4 列和第 5 列加载数据:
CREATE PIPE mypipe2 AS COPY INTO mytable(C1, C2) FROM (SELECT $5, $4 FROM @mystage) FILE_FORMAT = (TYPE = 'JSON');
创建一个管道,将所有数据加载到目标表中与数据中表示的相应列相匹配的列中。列名称不区分大小写。
此外,将 METADATA$START_SCAN_TIME 和 METADATA$FILENAME 元数据列 中的元数据加载到名为 c1
和 c2
的列中。
CREATE PIPE mypipe3 AS (COPY INTO mytable FROM @mystage MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE INCLUDE_METADATA = (c1= METADATA$START_SCAN_TIME, c2=METADATA$FILENAME) FILE_FORMAT = (TYPE = 'JSON'));
在当前架构中创建管道,以便使用从消息传递服务接收的事件通知自动加载数据:
Amazon S3
CREATE PIPE mypipe_s3 AUTO_INGEST = TRUE AWS_SNS_TOPIC = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket' AS COPY INTO snowpipe_db.public.mytable FROM @snowpipe_db.public.mystage FILE_FORMAT = (TYPE = 'JSON');Google Cloud Storage
CREATE PIPE mypipe_gcs AUTO_INGEST = TRUE INTEGRATION = 'MYINT' AS COPY INTO snowpipe_db.public.mytable FROM @snowpipe_db.public.mystage FILE_FORMAT = (TYPE = 'JSON');Microsoft Azure
CREATE PIPE mypipe_azure AUTO_INGEST = TRUE INTEGRATION = 'MYINT' AS COPY INTO snowpipe_db.public.mytable FROM @snowpipe_db.public.mystage FILE_FORMAT = (TYPE = 'JSON');