CREATE PIPE

在系统中创建一个新管道,用于定义 COPY INTO <table> 语句,Snowpipe 可使用该语句将数据从引入队列加载到表中。

另请参阅:

ALTER PIPEDROP PIPESHOW PIPESDESCRIBE PIPE

语法

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
  [ AUTO_INGEST = [ TRUE | FALSE ] ]
  [ ERROR_INTEGRATION = <integration_name> ]
  [ AWS_SNS_TOPIC = '<string>' ]
  [ INTEGRATION = '<string>' ]
  [ COMMENT = '<string_literal>' ]
  AS <copy_statement>
Copy

必填参数

name

管道标识符对于创建管道的架构必须是唯一的。

标识符必须以字母字符开头,且不能包含空格或特殊字符,除非整个标识符字符串放在双引号内(例如 "My object")。放在双引号内的标识符也区分大小写。

有关更多详细信息,请参阅 标识符要求

copy_statement

COPY INTO <table> 语句用于将排队文件中的数据加载到 Snowflake 表中。该语句用作管道的文本/定义,并会在 SHOW PIPES 输出中显示。

备注

我们目前不建议在 Snowpipe 的 copy_statement 中使用以下函数:

  • CURRENT_DATE

  • CURRENT_TIME

  • CURRENT_TIMESTAMP

  • GETDATE

  • LOCALTIME

  • LOCALTIMESTAMP

  • SYSDATE

  • SYSTIMESTAMP

一个已知的问题是,使用这些函数插入的时间值可能比 COPY_HISTORY 函数COPY_HISTORY 视图 返回的 LOAD_TIME 值早几个小时。

建议改为查询 METADATA$START_SCAN_TIME,这样会更准确地表示记录加载。

可选参数

AUTO_INGEST = TRUE | FALSE

指定当从配置的消息服务收到事件通知时,是否自动从指定的外部暂存区和可选路径加载数据文件:

  • 设置为 TRUE 会启用自动数据加载。

    Snowpipe 支持从外部暂存区(Amazon S3、Google Cloud Storage 或 Microsoft Azure)加载数据。

  • 设置为 FALSE 会禁用自动数据加载。必须调用 Snowpipe REST API 端点才能加载数据文件。

    Snowpipe 支持从内部暂存区(即 Snowflake 命名暂存区或表暂存区,但 不是 用户暂存区)或外部暂存区(Amazon S3、Google Cloud Storage 或 Microsoft Azure)加载数据。

ERROR_INTEGRATION = 'integration_name'

仅当配置 Snowpipe 将错误通知发送到云消息传递服务时才需要。

指定用于与消息传递服务通信的通知集成的名称。有关更多信息,请参阅 Snowpipe 错误通知

AWS_SNS_TOPIC = 'string'

仅当使用 SNS 为 Amazon S3 暂存区配置 AUTO_INGEST 时才需要。

指定 S3 桶中 SNS 主题的 Amazon Resource Name (ARN)。CREATE PIPE 语句将 Amazon Simple Queue Service (SQS) 队列订阅至指定的 SNS 主题。管道将文件复制到引入队列(由事件通知通过 SNS 主题触发)。有关更多信息,请参阅 针对 Amazon S3 自动执行 Snowpipe

INTEGRATION = 'string'

仅在为 Google Cloud Storage 或 Microsoft Azure 暂存区配置 AUTO_INGEST 时才需要。

指定用于访问存储队列的现有通知集成。有关更多信息,请参阅:

集成名称必须全部使用大写输入。

COMMENT = 'string_literal'

指定管道的注释。

默认:无值

使用说明

  • 此 SQL 命令需要以下最低权限:

    权限

    对象

    备注

    CREATE PIPE

    架构

    USAGE

    管道定义中的暂存区

    仅限外部暂存区

    READ

    管道定义中的暂存区

    仅限内部暂存区

    SELECT、INSERT

    管道定义中的表

    对架构对象进行 SQL 操作还需要对包含该对象的数据库和架构具有 USAGE 权限。

  • 除以下选项 之外,支持所有 COPY INTO <table> 复制选项:

    • FILES = ( 'file_name1' [ , 'file_name2', ... ] )

    • ON_ERROR = ABORT_STATEMENT

    • SIZE_LIMIT = num

    • :samp:`PURGE = TRUE | FALSE`(即在加载时自动清除)

    • FORCE = TRUE | FALSE

      请注意,在加载文件后,可以使用 REMOVE 命令从内部(即 Snowflake)暂存区手动移除文件。

    • RETURN_FAILED_ONLY = TRUE | FALSE

    • VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS

  • PATTERN = 'regex_pattern' 复制选项使用正则表达式筛选要加载的文件集。模式匹配的行为如下,具体取决于 AUTO_INGEST 参数值:

    • AUTO_INGEST = TRUE:正则表达式筛选 COPY INTO <table> 语句中的暂存区和可选路径(即云存储位置)上的文件列表。

    • :AUTO_INGEST = FALSE:正则表达式筛选在调用 Snowpipe REST API insertFiles 端点时提交的文件列表。

    请注意,Snowpipe 从存储位置修剪暂存区定义中的任何路径段,并将正则表达式应用于任何剩余的路径段和文件名。要查看暂存区定义,请执行针对暂存区的 DESCRIBE STAGE 命令。URL 属性由桶或容器名称以及零个或多个路径段组成。例如,如果 FROM 位置位于 COPY INTO <table> statement is @s/path1/path2/ and the URL value for stage @s is s3://mybucket/path1/, then Snowpipe trims /path1/ from the storage location in the FROM clause and applies the regular expression to path2/ 中,加上路径中的文件名。

    重要

    Snowflake 建议您为 Snowpipe 启用云事件筛选,以降低成本、事件噪音和延迟。仅当云提供商的事件筛选功能不足时,才使用 PATTERN 选项。有关为每个云提供商配置事件筛选的更多信息,请参阅以下页面:

    • Amazon S3: 使用对象键名称筛选功能配置事件通知 (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-filtering.html)

    • Microsoft Azure 事件网格: 了解适用于事件网格订阅的事件筛选 (https://docs.microsoft.com/en-us/azure/event-grid/event-filtering)

    • Google Cloud Pub/Sub: 筛选消息 (https://cloud.google.com/pubsub/docs/filtering)

  • 支持将查询用作源,以便使用 COPY 语句进行列重新排序、列省略和类型转换(即在加载期间转换数据)。有关用法示例,请参阅 在加载期间转换数据。请注意,仅支持简单的 SELECT 语句。不支持使用 WHERE 子句进行筛选。

  • 管道定义不是动态的(即,如果基础暂存区或表发生变更,例如重命名或删除了暂存区/表,管道并不会自动更新)。在这种情况下,必须创建一个新管道,并在将来的 Snowpipe REST API 调用中提交此管道名称。

  • 关于元数据:

    注意

    客户应确保在使用 Snowflake 服务时,不会将个人数据(用户对象除外)、敏感数据、出口管制数据或其他受监管数据作为元数据输入。有关更多信息,请参阅 Snowflake 中的元数据字段

  • CREATE OR REPLACE <object> 语句是原子的。也就是说,当对象被替换时,旧对象将被删除,新对象将在单个事务中创建。

重要

如果重新创建管道(使用 CREATE OR REPLACE PIPE 语法),请参阅 重建管道,以查看相关注意事项和最佳实践。

示例

在当前架构中创建管道,以便将暂存在 mystage 暂存区的文件中的所有数据加载到 mytable 中:

CREATE PIPE mypipe
  AS
  COPY INTO mytable
  FROM @mystage
  FILE_FORMAT = (TYPE = 'JSON');
Copy

与前面的示例相同,但进行了数据转换。按照相反的顺序,仅从暂存文件中的第 4 列和第 5 列加载数据:

CREATE PIPE mypipe2
  AS
  COPY INTO mytable(C1, C2)
  FROM (SELECT $5, $4 FROM @mystage)
  FILE_FORMAT = (TYPE = 'JSON');
Copy

创建一个管道,将所有数据加载到目标表中与数据中表示的相应列相匹配的列中。列名称不区分大小写。

此外,将 METADATA$START_SCAN_TIME 和 METADATA$FILENAME 元数据列 中的元数据加载到名为 c1c2 的列中。

CREATE PIPE mypipe3
  AS
  (COPY INTO mytable
   FROM @mystage
   MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
   INCLUDE_METADATA = (c1= METADATA$START_SCAN_TIME, c2=METADATA$FILENAME)
   FILE_FORMAT = (TYPE = 'JSON'));
Copy

在当前架构中创建管道,以便使用从消息传递服务接收的事件通知自动加载数据:

Amazon S3

CREATE PIPE mypipe_s3
  AUTO_INGEST = TRUE
  AWS_SNS_TOPIC = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket'
  AS
  COPY INTO snowpipe_db.public.mytable
  FROM @snowpipe_db.public.mystage
  FILE_FORMAT = (TYPE = 'JSON');
Copy

Google Cloud Storage

CREATE PIPE mypipe_gcs
  AUTO_INGEST = TRUE
  INTEGRATION = 'MYINT'
  AS
  COPY INTO snowpipe_db.public.mytable
  FROM @snowpipe_db.public.mystage
  FILE_FORMAT = (TYPE = 'JSON');
Copy

Microsoft Azure

CREATE PIPE mypipe_azure
  AUTO_INGEST = TRUE
  INTEGRATION = 'MYINT'
  AS
  COPY INTO snowpipe_db.public.mytable
  FROM @snowpipe_db.public.mystage
  FILE_FORMAT = (TYPE = 'JSON');
Copy
语言: 中文