数据管道多地容灾能力

数据管道的多地容灾能力可帮助您保护数据管道,应对可能发生的区域范围云服务商中断。该功能确保在故障转移至辅助位置后,您的数据管道(特别是使用 Snowpipe 和 COPY INTO 的管道)能够恢复加载新数据,且不会出现中断或重复引入。

该功能支持跨云工作,允许您的主存储位置和备份存储位置跨越完全不同的云提供商(例如,从 AWS 故障转移到 Azure),也支持同一云内的跨区域故障转移。

该功能基于责任共担模型:

  • Snowflake 的角色: Snowflake 将您的目标表和加载历史记录(引入状态)原生复制到您的辅助账户。在故障转移期间,Snowflake 使用此状态防止重复,并且仅位置在主位置未处理过的文件。

  • 您的角色: 在发生中断时(或作为双写云设置的一部分),您必须将新传入的文件路由到您的辅助云存储位置。Snowflake 不会复制您的外部云存储文件。

管道容灾能力通过配置最多两种关键资源来实现:

  • 多地存储集成 (MLSI): 将 Snowflake 安全地连接到跨区域或跨云的多个外部云存储位置。当您希望仅为来自外部暂存区的 COPY INTO 或为您的 Snowpipe 管道提供容灾能力时,需要配置 MLSI。

  • 多队列通知集成 (MQNI): 将 Snowflake 连接到多个第三方云消息队列,确保持续接收新文件通知。仅当您希望为 Snowpipe 管道(即持续数据加载)提供容灾能力时,才需要 MQNI。

数据管道多地容灾能力架构

要求和注意事项

在配置此功能之前,请查看以下先决条件和注意事项:

要求

  • 版本: Snowflake Business Critical Edition(或更高版本)。

  • 支持的引入方法: 此功能仅支持通过 Snowpipe(自动引入)和 COPY INTO <table> 进行基于文件的数据加载。不支持 Openflow 或 Snowpipe Streaming。

  • 相同的路径结构: 为了使您的管道在故障转移后能够定位到新文件,您必须使用与主存储位置完全相同的层级结构、文件夹结构和相对路径将文件写入辅助存储位置。

注意事项

  • 计费: 此功能会产生标准的复制费用(数据传输和计算资源),将计入您的目标账户。

  • 暂存区修改停机时间: 更改现有暂存区的 RELATIVE_URL 属性将使依赖对象失效并停止引入。我们建议在设置期间创建新的暂存区以避免停机。

  • 多队列通知集成 (MQNI): 不支持在源账户和目标账户中使用相同的活动队列。这样做可能会导致通知丢失。Snowflake 不会检查是否跨账户使用了相同的队列。

  • 目录表: 当前不支持在使用 MLSI 的暂存区上创建目录表。

复制行为

  • 异步复制: Snowflake 将您的表和管道的加载历史记录在完全相同的快照中一起复制。由于它们是同步的,因此中断不会导致数据重复。如果您的辅助数据库落后四个小时,那么表数据也落后四个小时,处理四个小时的排队通知只是将表更新到最新状态。

  • D双写避免数据丢失: 您的恢复点对象 (RPO) 由您的复制刷新间隔决定。为防止故障转移期间数据丢失,您的辅助云消息队列的消息保留期必须长于您的复制间隔。如果你的队列在计划复制追赶上进度之前丢弃了消息,那么这些文件在故障转移后将不会引入。

  • 单写数据丢失风险: 如果您使用单写路由,则在您上次成功复制之后在主位置处理的任何文件,对于辅助位置来说都是完全未知的。在故障转移时,这些数据将暂时在您的目标账户中缺失。

警告

单写故障恢复的严重警告: 当您执行刷新以故障恢复到原始主账户时,主数据库将被辅助数据库覆盖。如果在同步回主数据库之前,没有手动对主数据库中的这些孤立文件进行对账并将其加载到辅助数据库中,那么这些文件将从主数据库中永久删除。

选择正确的架构

由于 Snowflake 会在同一快照中异步复制目标表和管道的加载历史记录,因此您的管道可以免受数据重复和部分加载的影响。如果在引入过程中发生中断,事务将完全回滚,从而不会出现部分加载的文件。

但是,在中断期间恢复“传输中”文件的方式完全取决于您的外部云存储路由是配置为双双写模式还是单写模式。

2.单写路由

生产者仅写入主云存储。发生中断时,您需要重新路由生产者,使其开始将新文件写入辅助云存储。

  • 故障转移时会发生什么: 辅助账户立即开始处理路由到辅助桶的新文件。但是,滞留在受影响主位置中的传输中文件会暂时遗留在那里。

  • 故障恢复时会发生什么: 当主位置恢复,并且您故障恢复到主 Snowflake 账户时,Snowpipe 会自动处理在中断发生前成功到达队列的所有文件通知。

  • 结果: 无重复数据。但是,因中断而完全未能生成云通知的任何文件(或者中断持续时间超过了队列消息保留策略所允许的范围),都需要手动介入处理。

  • 所需操作: 故障恢复后,将主存储桶与 Snowflake 中的 COPY_HISTORY 视图进行比较,以找出任何缺失的文件。运行 ALTER PIPE ... REFRESH 或手动执行 COPY INTO 命令,以加载这些特定的滞留文件。

第 1 部分:第一部分:一次性配置(设置)

以下步骤只需执行一次,即可配置具有弹性的数据管道。由于在设置过程中同时配置了两个账户的活动位置,在实际发生中断时进行故障转移几乎是瞬时的。

第 1 步:创建多位置存储集成 (MLSI)

要配置多位置存储集成,您需遵循配置存储集成的标准步骤,但本部分指出了一些差异之处。

在您的源账户中,通过为 STORAGE_LOCATIONS 列表中的每个位置提供值来创建 MLSI。您可以混搭不同云提供商,以实现跨云配置。

CREATE STORAGE INTEGRATION my_mlsi
  TYPE = EXTERNAL_STAGE
  STORAGE_LOCATIONS =
  (
    (
      NAME = 'my-s3-us-west-1'
      STORAGE_PROVIDER = 'S3'
      STORAGE_BASE_URL = 's3://myBucketWest'
      STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::12345:role/myrole'
      STORAGE_AWS_EXTERNAL_ID = 'mlsi-external-id'
      ENCRYPTION = ( TYPE = 'AWS_SSE_S3' )
    ),
    (
      NAME = 'my-s3-us-east-1'
      STORAGE_PROVIDER = 'S3'
      STORAGE_BASE_URL = 's3://myBucketEast'
      STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::67890:role/myrole'
      STORAGE_AWS_EXTERNAL_ID = 'mlsi-external-id'
      ENCRYPTION = ( TYPE = 'AWS_SSE_S3' )
    )
  )
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('*')
  ACTIVE = 'my-s3-us-west-1';

其中:

  • STORAGE_LOCATIONS: 为存储集成指定一个或多个存储位置(S3 桶、GCS 桶或 Azure 容器)的列表。要查看各云提供商的参数,请参阅 CREATE STORAGE INTEGRATION 参考页面中的 云提供商参数 (cloudProviderParams)

  • NAME: 字符串,用于指定存储位置的标识符(名称)。

  • ENCRYPTION: 指定存储位置的加密方式。您必须在存储集成级别而非暂存级别为存储位置指定加密。仅在从加密文件加载时需要;如果存储位置和文件均未加密,则不需要。要查看各云提供商的加密选项,请参阅 CREATE STAGE 参考页面中的 ENCRYPTION

  • ACTIVE: 指定存储位置的名称,用于将其设置为当前账户中存储集成的活动位置。

    对于源账户中的活动存储位置,您必须设置访问控制并授予 Snowflake 访问您存储的权限。请按照以下主题中的说明进行操作:

第 2 步:第 2 步:将 MLSI 与外部暂存区关联

我们强烈建议创建新暂存区,而不是更改现有暂存区。

警告

WARNING:更改 RELATIVE_URL 会导致停机

如果您使用 ALTER STAGE 更改现有暂存区的 RELATIVE_URL,所有依赖的目录表将重新创建,使用此暂存区的所有外部表或管道将被标记为无效,并将停止数据引入。如果您选择更改现有暂存区,请做好停机准备。

使用 CREATE STAGE 命令,将您创建的多位置存储集成与一个或多个外部暂存区关联:

CREATE STAGE my_ext_stage
  RELATIVE_URL = '/my_folder/my_sub_folder/'
  STORAGE_INTEGRATION = 'my_mlsi';

其中:

  • RELATIVE_URL: 从存储集成中定义的存储位置到外部暂存区位置的相对路径。为了让您的管道在故障转移后能够定位新文件,您必须使用与主位置相同的层级结构、文件夹结构和相对路径,将文件写入辅助存储位置。

备注

该值必须是一个字面量路径。不支持指定模式或通配符。要指定对存储集成中 STORAGE_BASE_URL 下所有位置的访问,请使用空字符串 RELATIVE_URL = ''。

  • STORAGE_INTEGRATION: 您的多位置存储集成的名称。

备注

者,您也可以通过指定 RELATIVE_URL 参数和您的 MLSI 来修改现有的外部暂存区。ALTER STAGE 命令也支持回滚此更改,从而使外部暂存区不再使用多位置存储集成。

例如:

ALTER STAGE my_ext_stage SET
  RELATIVE_URL = '/my_folder/my_sub_folder/'
  STORAGE_INTEGRATION = 'my_mlsi';

第 3 步:配置多队列通知集成 (MQNI)

如果您通过云消息服务使用自动数据加载,并且已为外部暂存区配置了多位置存储集成,则还必须使用多队列通知集成,以实现 Snowpipe 管道的无缝故障转移。

对于您为通知集成定义的每个队列,您必须按照以下主题中的步骤准备消息服务:

备注

如果您不想将 Amazon SNS 与 Snowpipe 一起使用,可以不创建 MQNI,但必须在故障转移期间执行一个额外步骤。如果选择此选项,请将管道与上面创建的暂存区和 MLSI 相关联,然后继续执行第 4 步。

场景 A:创建新的多队列通知集成 (MQNI)

要创建多队列通知集成,请按照创建通知集成的标准步骤操作,但本部分指出了一些差异之处。

在源账户中,通过为 QUEUES列表中的每个队列提供值来创建多队列通知集成:

CREATE NOTIFICATION INTEGRATION my_mqni
  ENABLED = TRUE
  TYPE = MULTI_QUEUE
  DIRECTION = INBOUND
  QUEUES = (
    (
      NAME = 'my-us-west-1'
      NOTIFICATION_PROVIDER = AWS_SNS
      AWS_SNS_TOPIC_ARN = 'arn:aws:sns:us-west-1:12345:my-snowpipe-mlsi-west'
    ),
    (
      NAME = 'my-us-east-1'
      NOTIFICATION_PROVIDER = AWS_SNS
      AWS_SNS_TOPIC_ARN = 'arn:aws:sns:us-west-1:12345:my-snowpipe-mlsi-east'
    )
  )
  ACTIVE = 'my-us-west-1';

其中:

  • TYPE = MULTI_QUEUE: 指定这是一个 Snowflake 与第三方云消息队列服务之间的多队列集成。

  • DIRECTION = INBOUND: 指定 Snowflake 接收云消息服务发送的通知。

  • QUEUES: 为通知集成指定一个或多个队列的列表。

  • NAME: 字符串,用于指定队列的标识符(名称)。

要查看各云提供商的具体队列参数,请参阅:

创建 MQNI 后,您可以使用它通过 CREATE PIPE 命令创建新的管道。以下示例创建了一个管道,使用依赖于多位置存储集成的外部暂存区 (my_ext_stage) 将数据从 Amazon S3 加载到表中:

CREATE PIPE my_pipe
  AUTO_INGEST = TRUE
  INTEGRATION = my_mqni
  AS COPY INTO my_table FROM @my_ext_stage/my_pipe/;

场景 B:将现有通知集成迁移到 MQNI

如果您已有现成的通知集成,希望将其转换为 MQNI 而不是从头开始创建,请使用 SYSTEM$CONVERT_PIPES_TO_MULTI_QUEUE 函数。

该函数会使用您指定的名称创建一个新的多队列通知集成,将源账户的活动队列设置为原始队列,并自动迁移源账户中的所有管道以使用新的 MQNI。

语法:

SYSTEM$CONVERT_PIPES_TO_MULTI_QUEUE(
  '<new_mqni_name>',
  '<original_sns_topic_arn_or_int_name>',
  '<new_sns_topic_arn_or_int_name>'
)

其中:

  • new_mqni_name: 字符串,用于指定要分配给该函数创建的新多队列通知集成的标识符(名称)。

  • original_sns_topic_arn_or_int_name:

    • 对于 AWS,是与一个或多个管道关联的原始 SNS 主题的 Amazon Resource Name (ARN)。

    • 对于 Google Cloud 或 Azure,是指定与一个或多个管道关联的原始单队列通知集成的标识符的字符串。

  • new_sns_topic_arn_or_int_name:

    • 对于 AWS,是要作为队列添加到 MQNI 的新 SNS 主题的 Amazon Resource Name (ARN)。

    • 对于 Google Cloud 或 Azure,是指定要与原始通知集成合并的新单队列通知集成的标识符的字符串。

示例 1:在以下 和 命令中,服务名称不会显式指定数据库和架构名称。添加新 SNS 主题队列

SELECT SYSTEM$CONVERT_PIPES_TO_MULTI_QUEUE(
  'my_mqni',
  'arn:aws:sns:us-west-1:12345:my-snowpipe-mlsi-west',
  'arn:aws:sns:us-east-1:67890:my-snowpipe-mlsi-east'
);

这将生成一个名为 my_mqni 的 MQNI,其中包含以下队列:

  • MY_MQNI-queue1(对应原始的活动 SNS 主题)

  • MY_MQNI-queue2(对应新的 SNS 主题)

示例 2:在以下 和 命令中,服务名称包括数据库和架构名称。将两个通知集成合并为 MQNI

SELECT SYSTEM$CONVERT_PIPES_TO_MULTI_QUEUE(
  'my_azure_mqni',
  'my_azure_ni_1',
  'my_azure_ni_2'
);

这将生成一个名为 my_azure_mqni 的 MQNI,其中包含以下队列:

  • my_azure_ni_1(对应原始的活动队列)

  • my_azure_ni_2(对应新队列)

备注

如果想更改源账户中的活动队列,可以使用 ALTER INTEGRATION ... SET ACTIVE = '<my_queue>' 语句。在更新活动队列之前,必须暂停任何使用该通知集成的管道。

第 4 步:将 MLSI 和 MQNI 复制到目标账户

备注

刷新操作会删除目标帐户中任何不是副本的存储或通知集成,除非这些对象具有全局 IDs。

有关更多信息,请参阅 目标账户中的复制和对象

1.要复制多位置存储集成和多队列通知集成,请更改现有复制组或故障转移组,在 ALLOWED_INTEGRATION_TYPES 列表中包含 STORAGE INTEGRATIONS 和 NOTIFICATION INTEGRATIONS。

例如,使用 ALTER FAILOVER GROUP 命令:

ALTER FAILOVER GROUP my_fg SET
  OBJECT_TYPES = DATABASES, ROLES, INTEGRATIONS
  ALLOWED_INTEGRATION_TYPES = API INTEGRATIONS, STORAGE INTEGRATIONS,
    NOTIFICATION INTEGRATIONS;
  1. 然后,在目标账户中执行刷新操作:

ALTER FAILOVER GROUP my_fg REFRESH;

第 5 步:在目标账户中配置活动状态

执行刷新操作后,为确保在实际中断期间实现无缝故障转移,需要在目标账户中配置活动存储位置和队列(如果使用通知集成)。

在目标账户中:

  1. 对于希望在目标账户中设置为活动位置的存储位置,请按照以下主题中的说明授予 Snowflake 对存储的访问权限:

  2. 激活辅助存储:将 MLSI设置为在目标账户中使用辅助备份存储位置。

    ALTER STORAGE INTEGRATION my_mlsi SET ACTIVE = 'my-s3-us-east-1';
    
  3. 如果使用多队列通知集成,请授予 Snowflake 访问您希望在目标账户中设置为活动的队列的消息服务的权限。按照云提供商的说明进行操作:

  4. 激活辅助队列(如果使用 MQNI):将活动队列设置为目标账户中的辅助位置。

    ALTER INTEGRATION my_mqni
      SET ACTIVE = 'MY_MQNI-queue2';
    

第 2 部分:故障转移步骤

在中断期间执行以下步骤,将数据引入重定向到辅助位置。由于已在设置中预先配置活动队列和存储,此过程只需要很少的命令。

  1. 提升目标账户权限:登录到目标账户,并将其提升为新的主账户。数据加载会自动从辅助云基础设施恢复。

    ALTER FAILOVER GROUP my_fg PRIMARY;
    
  2. 如果未使用 Amazon SNS 与 Snowpipe:如果您未使用 SNS 与 Snowpipe,仅依赖 SQS,则无需创建 MQNI。请改为调用以下系统函数,在故障转移期间重新绑定管道。

    SELECT SYSTEM$INGEST_REBIND_PIPE('my_db.my_schema.my_pipe');
    

第 3 部分:故障恢复步骤

中断问题解决并且主位置恢复正常后,请执行以下步骤将管道移回主位置。

  1. 数据同步回主账户:在提升原账户之前,必须将中断期间发生的所有数据和状态更改拉回到原账户。登录原始主账户(当前作为辅助账户)并启动手动刷新:

    ALTER FAILOVER GROUP my_fg REFRESH;
    

    重要

    在进行下一步之前,请等待此刷新操作完全完成。在同步完成前进行故障转移可能导致数据丢失。

    警告

    单写故障恢复严重警告: 如果您使用单写路由,则在您上次成功复制之后在主位置处理的任何文件,对于辅助位置来说都是未知的。在故障转移时,这些数据暂时在您的目标账户中缺失。您执行刷新以故障恢复到原始主账户时,主数据库将被辅助数据库覆盖。如果在同步回主数据库之前,没有手动对主数据库中的这些孤立文件进行对账并将其加载到辅助数据库中,那么这些文件会从主数据库中永久删除。

  2. 提升原账户权限:刷新完成后,将原源账户提升回主账户。

    ALTER FAILOVER GROUP my_fg PRIMARY;
    
  3. 如果未使用 Amazon SNS 与 Snowpipe:调用系统函数,将管道重新绑定回原源位置。

    SELECT SYSTEM$INGEST_REBIND_PIPE('my_db.my_schema.my_pipe');
    

第 4 部分:监控和验证

启动故障转移或故障恢复后,使用以下命令验证数据管道是否成功重定向并恢复引入。

1.验证活动集成状态

通过检查存储和队列的属性,确认集成指向正确的存储和队列。在输出中查看 ACTIVE 属性:

-- Check the active storage location
DESCRIBE STORAGE INTEGRATION my_mlsi;

-- Check the active message queue
DESCRIBE INTEGRATION my_mqni;

2.检查管道状态(仅限 Snowpipe)

使用 SYSTEM$PIPE_STATUS 函数,确保管道正在运行,并检查它是否正在积极地将来自辅助位置的新文件排队。

SELECT SYSTEM$PIPE_STATUS('my_pipe');

查看“executionState”:“RUNNING”并检查“pendingFileCount”,以确认它正在积极识别放入辅助桶中的新文件。

3.验证引入是否成功(加载历史记录)

要确保加载数据时没有错误或重复,请查询 COPY_HISTORY 视图。这会准确显示引入了哪些文件、源路径以及加载时间。

SELECT file_name, status, row_count, last_load_time
FROM TABLE(information_schema.copy_history(
  table_name => 'my_table',
  start_time => DATEADD(hours, -1, CURRENT_TIMESTAMP())
));

验证 file_name 路径是否反映了活动存储位置,以及状态是否显示为 LOADED。