为 Google Pub/Sub 启用 Snowpipe 错误通知

本主题介绍了如何将 Snowpipe 错误通知推送到 `Google Cloud Pub/Sub<https://cloud.google.com/storage/docs/reporting-changes>`_ (以下简称为 Pub/Sub)服务。

该功能可为以下负载类型推送错误通知:

  • 自动引入 Snowpipe。

  • 对 Snowpipe insertFiles REST API 端点的调用。

  • 仅使用 Snowflake Connector for Kafka 和 Snowpipe 引入方法,从 Apache Kafka 进行的加载。

本主题内容:

云平台支持

目前,该功能仅限于 Google Cloud Platform (GCP) 上托管的 Snowflake 账户。Snowpipe 可通过支持的云存储服务中的文件加载数据;但是,只有托管在 GCP 上的 Snowflake 账户才支持向 Pub/Sub 推送通知。

备注

  • Snowflake 保证至少实现一次错误通知的消息传送(即多次尝试传送消息以确保至少一次尝试成功,这可能会导致重复消息)。

  • 使用通知集成对象可实现该功能。通知集成是一个 Snowflake 对象,它在 Snowflake 和第三方云消息队列服务之间提供一个接口。一个通知集成可支持多个管道。

启用错误通知

创建通知集成

请参阅 创建通知集成以向 Google Cloud Pub/Sub 主题发送通知

在管道中启用错误通知

单个通知集成可由多个管道共享。错误消息的正文会标明管道、外部暂存区和路径,以及产生错误的文件等详细信息。

要为管道启用错误通知,请指定 ERROR_INTEGRATION 参数值。

备注

要创建或修改用于引用通知集成的管道,需要具有通知集成 USAGE 权限的角色。此外,该角色必须分别拥有架构的 CREATE PIPE 权限或管道的 OWNERSHIP 权限。

请注意,对架构中的对象进行操作还需要对父数据库和架构具有 USAGE 权限。

有关创建具有指定权限集的自定义角色的说明,请参阅 创建自定义角色

有关对 安全对象 执行 SQL 操作的相应角色和权限授予的一般信息,请参阅 访问控制概述

新建管道

使用 CREATE PIPE 创建新管道。

CREATE PIPE <name>
  AUTO_INGEST = TRUE
  [ INTEGRATION = '<string>' ]
  ERROR_INTEGRATION = <integration_name>
  AS <copy_statement>
Copy

其中:

ERROR_INTEGRATION = <integration_name>

您在 第 3 步:在 Snowflake 中创建通知集成 中创建的通知集成的名称。

例如:

CREATE PIPE mypipe
  AUTO_INGEST = TRUE
  INTEGRATION = 'my_storage_int'
  ERROR_INTEGRATION = my_notification_int
  AS
  COPY INTO mydb.public.mytable
  FROM @mydb.public.mystage;
Copy

现有管道

使用 ALTER PIPE 修改现有管道。

备注

如果在创建管道时指定了通知集成,则需要首先 取消设置 ERROR_INTEGRATION 参数(使用 ALTER PIPE ...UNSET ERROR_INTEGRATION),然后再设置参数。

ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
Copy

其中,<integration_name> 是您在 第 3 步:在 Snowflake 中创建通知集成 中创建的通知集成的名称。

例如:

ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
Copy

错误通知消息有效负载

错误消息的正文会标明管道和加载过程中遇到的错误。

以下是描述 Snowpipe 错误的消息有效负载示例。有效负载可包括一条或多条错误消息。

{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"gcs://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
Copy

请注意,必须将字符串解析为 JSON 对象,才能处理有效负载中的值。

语言: 中文