为 Google Pub/Sub 启用 Snowpipe 错误通知

This topic provides instructions for pushing Snowpipe error notifications to the Google Cloud Pub/Sub (https://cloud.google.com/storage/docs/reporting-changes) (Pub/Sub) service.

该功能可为以下负载类型推送错误通知:

  • 自动引入 Snowpipe。
  • Calls to the Snowpipe insertFiles REST API endpoint.
  • 仅使用 Snowflake Connector for Kafka 和 Snowpipe 引入方法,从 Apache Kafka 进行的加载。

云平台支持

Currently, this feature is limited to Snowflake accounts hosted on Google Cloud (GC). Snowpipe can load data from files in any supported cloud storage service; however, push notifications to Pub/Sub are only supported in Snowflake accounts hosted on GC.

备注

  • Snowflake 保证至少实现一次错误通知的消息传送(即多次尝试传送消息以确保至少一次尝试成功,这可能会导致重复消息)。
  • This feature is implemented using the notification integration object. A notification integration is a Snowflake object that provides an interface between Snowflake and third-party cloud message queuing services. A single notification integration can support multiple pipes.

启用错误通知

创建通知集成

See Creating a notification integration to send notifications to a Google Cloud Pub/Sub topic.

在管道中启用错误通知

单个通知集成可由多个管道共享。错误消息的正文会标明管道、外部暂存区和路径,以及产生错误的文件等详细信息。

要为管道启用错误通知,请指定 ERROR_INTEGRATION 参数值。

Note

要创建或修改用于引用通知集成的管道,需要具有通知集成 USAGE 权限的角色。此外,该角色必须分别拥有架构的 CREATE PIPE 权限或管道的 OWNERSHIP 权限。

Operating on an object in a schema requires at least one privilege on the parent database and at least one privilege on the parent schema.

For instructions on creating a custom role with a specified set of privileges, see Creating custom roles.

For general information about roles and privilege grants for performing SQL actions on securable objects, see Overview of Access Control.

新建管道

Create a new pipe using CREATE PIPE:

CREATE PIPE <name>
  AUTO_INGEST = TRUE
  [ INTEGRATION = '<string>' ]
  ERROR_INTEGRATION = <integration_name>
  AS <copy_statement>

其中:

ERROR_INTEGRATION = <integration_name>

Name of the notification integration you created in Create a notification integration in Snowflake.

例如:

CREATE PIPE mypipe
  AUTO_INGEST = TRUE
  INTEGRATION = 'my_storage_int'
  ERROR_INTEGRATION = my_notification_int
  AS
  COPY INTO mydb.public.mytable
  FROM @mydb.public.mystage;

现有管道

Modify an existing pipe using ALTER PIPE.

Note

If a notification integration was specified when the pipe was created, it is necessary to first unset the ERROR_INTEGRATION parameter (using ALTER PIPE … UNSET ERROR_INTEGRATION) and then set the parameter.

ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;

Where <integration_name> is the name of the notification integration you created in Create a notification integration in Snowflake.

例如:

ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;

错误通知消息有效负载

错误消息的正文会标明管道和加载过程中遇到的错误。

以下是描述 Snowpipe 错误的消息有效负载示例。有效负载可包括一条或多条错误消息。

{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"gcs://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}

请注意,必须将字符串解析为 JSON 对象,才能处理有效负载中的值。