为 Google Pub/Sub 启用 Snowpipe 错误通知¶
本主题介绍了如何将 Snowpipe 错误通知推送到 `Google Cloud Pub/Sub<https://cloud.google.com/storage/docs/reporting-changes>`_ (以下简称为 Pub/Sub)服务。
该功能可为以下负载类型推送错误通知:
自动引入 Snowpipe。
对 Snowpipe
insertFiles
REST API 端点的调用。仅使用 Snowflake Connector for Kafka 和 Snowpipe 引入方法,从 Apache Kafka 进行的加载。
本主题内容:
云平台支持¶
目前,该功能仅限于 Google Cloud Platform (GCP) 上托管的 Snowflake 账户。Snowpipe 可通过支持的云存储服务中的文件加载数据;但是,只有托管在 GCP 上的 Snowflake 账户才支持向 Pub/Sub 推送通知。
备注¶
Snowflake 保证至少实现一次错误通知的消息传送(即多次尝试传送消息以确保至少一次尝试成功,这可能会导致重复消息)。
使用通知集成对象可实现该功能。通知集成是一个 Snowflake 对象,它在 Snowflake 和第三方云消息队列服务之间提供一个接口。一个通知集成可支持多个管道。
启用错误通知¶
创建通知集成¶
在管道中启用错误通知¶
单个通知集成可由多个管道共享。错误消息的正文会标明管道、外部暂存区和路径,以及产生错误的文件等详细信息。
要为管道启用错误通知,请指定 ERROR_INTEGRATION 参数值。
备注
要创建或修改用于引用通知集成的管道,需要具有通知集成 USAGE 权限的角色。此外,该角色必须分别拥有架构的 CREATE PIPE 权限或管道的 OWNERSHIP 权限。
请注意,对架构中的对象进行操作还需要对父数据库和架构具有 USAGE 权限。
有关创建具有指定权限集的自定义角色的说明,请参阅 创建自定义角色。
有关对 安全对象 执行 SQL 操作的相应角色和权限授予的一般信息,请参阅 访问控制概述。
新建管道¶
使用 CREATE PIPE 创建新管道。
CREATE PIPE <name>
AUTO_INGEST = TRUE
[ INTEGRATION = '<string>' ]
ERROR_INTEGRATION = <integration_name>
AS <copy_statement>
其中:
ERROR_INTEGRATION = <integration_name>
您在 第 3 步:在 Snowflake 中创建通知集成 中创建的通知集成的名称。
例如:
CREATE PIPE mypipe
AUTO_INGEST = TRUE
INTEGRATION = 'my_storage_int'
ERROR_INTEGRATION = my_notification_int
AS
COPY INTO mydb.public.mytable
FROM @mydb.public.mystage;
现有管道¶
使用 ALTER PIPE 修改现有管道。
备注
如果在创建管道时指定了通知集成,则需要首先 取消设置 ERROR_INTEGRATION 参数(使用 ALTER PIPE ...UNSET ERROR_INTEGRATION),然后再设置参数。
ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
其中,<integration_name>
是您在 第 3 步:在 Snowflake 中创建通知集成 中创建的通知集成的名称。
例如:
ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
错误通知消息有效负载¶
错误消息的正文会标明管道和加载过程中遇到的错误。
以下是描述 Snowpipe 错误的消息有效负载示例。有效负载可包括一条或多条错误消息。
{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"gcs://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
请注意,必须将字符串解析为 JSON 对象,才能处理有效负载中的值。