暂存区、管道和加载历史记录复制

This topic provides information about replication support for data pipeline objects and related metadata, including stages, storage integrations, pipes, and load history. You can replicate these objects to configure failover for ingest and ETL pipelines across regions and across cloud platforms.

Before you get started, we recommend that you have familiarity with Snowflake support for replication and failover/failback. For more information, see Introduction to replication and failover across multiple accounts.

要求

Important

If a database in a target account that you plan to use already contains stages and pipes, we recommend that you contact support before enabling replication. When a replication or failover group in your source account includes that database, any pre-existing stages and pipes are dropped from the database.

To replicate any external stages that use a storage integration, you must configure your replication or failover group to replicate STORAGE INTEGRATIONS. Otherwise, external stages are replicated without the associated storage integration.

You can use an ALTER REPLICATION GROUP or ALTER FAILOVER GROUP statement to modify these properties for an existing group.

If you add INTEGRATIONS to the OBJECT_TYPES list in your ALTER statement, include any other existing objects in the list to avoid dropping those objects in the target account. The same applies if you add STORAGE INTEGRATIONS to the ALLOWED_INTEGRATION_TYPES list.

例如:

ALTER FAILOVER GROUP my_failover_group SET
  OBJECT_TYPES = ROLES, INTEGRATIONS
  ALLOWED_INTEGRATION_TYPES = API INTEGRATIONS, STORAGE INTEGRATIONS;

Note

您的云存储提供商可能会限制商业云和政府云区域之间的数据管道对象复制。为避免政府云数据复制限制,请在政府云区域可访问的任何区域中配置故障转移资源。有关政府云限制的更多信息,请查看云存储提供商的文档。

复制和暂存区

本节介绍了 Snowflake 当前针对不同类型暂存区支持的复制功能级别。

内部暂存区的复制

下表描述了每种类型的内部暂存区的复制工作方式。

TypeDescription of Replication Support
Table stageEmpty table stages are created for tables in a replicated database. Files on table stages are not replicated.
User stage

用户和用户暂存区复制需要 Business Critical Edition(或更高版本)。

为复制的用户创建空的用户暂存区。不会复制用户暂存区上的文件。

Named stage

复制数据库时,也将复制命名内部暂存区。

暂存区必须启用目录表,才能复制该暂存区上的文件。

外部暂存区的复制

Note

Snowflake 不会复制外部暂存区上的文件。对于主数据库和辅助数据库中的外部暂存区,云存储 URL 指向相同位置。

下表描述了每种类型的外部暂存区的复制工作方式。

TypeDescription of Replication Support
Named stage with no credentials (public storage location)Named external stages are replicated when you replicate a database. The files on an external stage are not replicated.
Named stage with credentials (private storage location)Replicated stages include the cloud provider credentials, such as secret keys or access tokens.
Named stage with storage integration (private storage location)

存储集成复制需要 Business Critical Edition(或更高版本)。

The replication or failover group must include STORAGE INTEGRATIONS in the ALLOWED_INTEGRATION_TYPES list. For more information, see CREATE FAILOVER GROUP.

You must also take action to configure the trust relationships for your cloud storage in the target accounts. For more information, see Configure cloud storage access for secondary storage integrations.

Note

To associate a secondary stage or pipe with a different cloud storage location than the one associated with the primary object, contact the support team. For example, you might choose a location in another region.

注意事项

The following constraints apply to stage objects:

  • Snowflake currently supports stage replication as part of group-based replication (replication and failover groups). Stage replication is not supported for database replication.

  • You can replicate an external stage. However, the files on an external stage are not replicated.

  • You can replicate an internal stage. To replicate the files on an internal stage, you must enable a directory table on the stage. Snowflake replicates only the files that are mapped by the directory table.

  • When you replicate an internal stage with a directory table, you cannot disable the directory table on the primary or secondary stage. The directory table contains critical information about replicated files and files loaded using a COPY statement.

  • A refresh operation will fail if the directory table on an internal stage contains a file that is larger than 5GB. To work around this limitation, move any files larger than 5GB to a different stage.

    You cannot disable the directory table on a primary or secondary stage, or any stage that has previously been replicated. Follow these steps before you add the database that contains the stage to a replication or failover group.

    1. Disable the directory table on the primary stage.
    2. Move the files that are larger than 5GB to another stage that does not have a directory table enabled.
    3. After you move the files to another stage, re-enable the directory table on the primary stage.
  • Files on user stages and table stages are not replicated.

  • For named external stages that use a storage integration, you must configure the trust relationship for secondary storage integrations in your target accounts prior to failover. For more information, see Configure cloud storage access for secondary storage integrations.

  • If you replicate an external stage with a directory table, and you have configured automated refresh for the source directory table, you must configure automated refresh for the secondary directory table before failover. For more information, see Configure automated refresh for directory tables on secondary stages.

  • A copy command might take longer than expected if the directory table on a replicated stage is not consistent with the replicated files on the stage. To make a directory table consistent, refresh it with an ALTER STAGE … REFRESH statement. To check the consistency status of a directory table, use the SYSTEM$GET_DIRECTORY_TABLE_STATUS function.

复制和管道

本节介绍了当前针对不同类型管道支持的复制功能级别。

Snowflake 支持复制以下内容:

  • 管道对象,包括自动引入和从外部暂存区加载数据的 REST 端点管道。
  • 管道级参数。
  • 对管道对象授予的权限。

Note

To associate a secondary stage or pipe with a different cloud storage location than the one associated with the primary object, contact the support team. For example, you might choose a location in another region.

辅助数据库中的管道

Pipes in a secondary database are in a READ_ONLY execution state and receive notifications but do not load data until you promote the secondary database to serve as the primary. After you promote a secondary database, the pipes will transition to a FAILING_OVER execution state. Once failover is complete, the pipes should be in the RUNNING execution state and begin to load any data that is available since the last refresh time (that is, the last time that the former primary database was updated).

自动引入管道的复制

在发生故障转移时,复制的自动引入管道将成为新的主管道,并且可以执行以下操作:

  • Load any data that has not yet been loaded. This includes any data that is new since the newly promoted primary database was last refreshed.

  • 当暂存区有新文件要加载时,继续接收通知,并从这些文件加载数据。

    Note

    To receive notifications, you must configure a secondary auto-ingest pipe in a target account prior to failover. For more information, see Configure notifications for secondary auto-ingest pipes.

REST 端点管道的复制

For pipes that use the Snowpipe REST API to load data, Snowflake replicates the pipes and their load history metadata to each target account that you specify. There are no additional configuration steps you need to take on the target accounts. For a detailed list of load history metadata, see Load metadata.

若要在发生故障转移时继续加载数据,请通过新提升的源账户调用 REST API。

注意事项

The following constraints apply to pipe objects:

  • Snowflake currently supports pipe replication as part of group-based replication (replication and failover groups). Pipe replication is not supported for database replication.
  • Snowflake replicates the copy history of a pipe only when the pipe belongs to the same replication group as its target table.
  • Replication of notification integrations is not supported.
  • Snowflake only replicates load history after the latest table truncate.
  • To receive notifications, you must configure a secondary auto-ingest pipe in a target account prior to failover. For more information, see Configure notifications for secondary auto-ingest pipes.
  • Use the SYSTEM$PIPE_STATUS function to resolve any pipes not in their expected execution state after failover.
  • Snowflake doesn’t support replication and failover for Snowpipe with the Kafka connector, but Snowflake does support replication and failover for Snowpipe Streaming with the Kafka connector. For more information, see Snowpipe Streaming and the Kafka connector.

示例 1:复制命名内部暂存区

该示例演示了内部暂存区的复制工作方式。具体而言,该示例显示了目录表如何在复制前后成为暂存区元数据的单一事实来源。

The first part of the example completes the following tasks in a source account.

  1. Create an internal stage named my_int_stage with a directory table enabled to replicate the files on the stage. Then copy data from a table named my_table into files on the stage.

    Note

    The example refreshes the directory table after loading file1 and file2 onto the stage to synchronize the table metadata with the latest set of files in the stage definition for the directory tables. However, no refresh operation occurs after loading file3.

    CREATE OR REPLACE STAGE my_stage
      DIRECTORY = (ENABLE = TRUE);
    
    COPY INTO @my_stage/folder1/file1 from my_table;
    COPY INTO @my_stage/folder2/file2 from my_table;
    ALTER STAGE my_stage REFRESH;
    
    COPY INTO @my_stage/folder3/file3 from my_table;
  2. 创建故障转移组:

    CREATE FAILOVER GROUP my_stage_failover_group
      OBJECT_TYPES = DATABASES
      ALLOWED_DATABASES = my_database_1
      ALLOWED_ACCOUNTS = myorg.my_account_2;

The second part of the example completes the replication and failover process in a target account:

  1. Create a failover group as a replica of the failover group in the source account, refresh the objects in the new failover group, and promote the target account to serve as the source account.

    CREATE FAILOVER GROUP my_stage_failover_group
      AS REPLICA OF myorg.my_account_1.my_stage_failover_group;
    
    ALTER FAILOVER GROUP my_stage_failover_group REFRESH;
    
    ALTER FAILOVER GROUP my_stage_failover_group PRIMARY;
  2. Next, refresh the directory table on the replicated stage and copy all of the files tracked by the directory table on my_stage into a table named my_table .

    Note

    The COPY INTO statement loads file1 and file2 into the table, but not file3. This is because the directory table was not refreshed after adding file3 in the source account.

    ALTER STAGE my_stage REFRESH;
    
    COPY INTO my_table FROM @my_stage;

示例 2:复制外部暂存区和存储集成

此示例提供了一个示例工作流,用于将外部暂存区和存储集成复制到目标账户。

The example assumes that you have already completed the following: Configured secure access to your Amazon S3 bucket.

The first part of the example completes the following tasks in a source account.

  1. Create a storage integration for an Amazon S3 bucket in database my_database_2.

    CREATE STORAGE INTEGRATION my_storage_int
      TYPE = external_stage
      STORAGE_PROVIDER = 's3'
      STORAGE_ALLOWED_LOCATIONS = ('s3china://mybucket/path')
      STORAGE_BLOCKED_LOCATIONS = ('s3china://mybucket/blockedpath')
      ENABLED = true;
  2. Create an external stage in database my_database_2 using storage integration my_storage_int.

    CREATE STAGE my_ext_stage
      URL = 's3china://mybucket/path'
      STORAGE_INTEGRATION = my_storage_int
  3. Create a failover group and include database my_database_2 and storage integration objects.

    CREATE FAILOVER GROUP my_external_stage_fg
      OBJECT_TYPES = databases, integrations
      ALLOWED_INTEGRATION_TYPES = storage integrations
      ALLOWED_DATABASES = my_database_2
      ALLOWED_ACCOUNTS = myorg.my_account_2;

The second part of the example completes the replication and failover process in a target account:

  1. 创建故障转移组,并将其用作源账户中故障转移组的副本,然后刷新。

    CREATE FAILOVER GROUP my_external_stage_fg
      AS REPLICA OF myorg.my_account_1.my_external_stage_fg;
    
    ALTER FAILOVER GROUP my_external_stage_fg REFRESH;
  2. After you replicate the storage integration to the target account, you must take additional steps to update your cloud provider permissions to grant the replication integration access to your cloud storage. For more information, see Configure cloud storage access for secondary storage integrations.

示例 3:复制自动引入管道

This example provides a sample workflow for replicating a pipe that uses an Amazon Simple Notification Service (SNS) topic with Amazon Simple Queue Service (SQS) to automate Snowpipe.

该示例假定您已完成以下任务:

Start with the following tasks in a source account.

  1. Use the CREATE PIPE command to create a pipe with auto-ingest enabled that loads data from the external stage into a table named mytable.

    CREATE PIPE snowpipe_db.public.mypipe AUTO_INGEST=TRUE
     AWS_SNS_TOPIC='<topic_arn>'
     AS
    COPY INTO snowpipe_db.public.mytable
    FROM @snowpipe_db.public.my_s3_stage
    FILE_FORMAT = (TYPE = 'JSON');
  2. Refresh the pipe with an ALTER PIPE statement to load data from the stage from the last 7 days.

    ALTER PIPE mypipe REFRESH;
  3. Finally, use CREATE FAILOVER GROUP to create a failover group that allows replication of storage integrations.

    CREATE FAILOVER GROUP my_pipe_failover_group
      OBJECT_TYPES = DATABASES, INTEGRATIONS
      ALLOWED_INTEGRATION_TYPES = STORAGE INTEGRATIONS
      ALLOWED_DATABASES = snowpipe_db
      ALLOWED_ACCOUNTS = myorg.my_account_2;

The second part of the example completes the replication and failover process in a target account:

  1. 创建故障转移组,并将其用作源账户中故障转移组的副本。

    CREATE FAILOVER GROUP my_pipe_failover_group
      AS REPLICA OF myorg.my_account_1.my_pipe_failover_group;
  2. Execute a DESCRIBE INTEGRATION statement to retrieve the ARN for the AWS IAM User for your Snowflake account on the secondary deployment.

    Use the ARN to grant the IAM user permissions to access your S3 bucket. See Step 5: Grant the IAM User Permissions to Access Bucket Objects.

    DESC INTEGRATION my_s3_storage_int;
  3. Call the SYSTEM$GET_AWS_SNS_IAM_POLICY system function to generate an IAM policy that grants the new SQS queue permission to subscribe to your SNS topic. Snowflake created the new SQS queue in your target account when you replicated the failover group from your source account.

    SELECT SYSTEM$GET_AWS_SNS_IAM_POLICY('<topic_arn>');

    topic_arn is the Amazon Resource Name (ARN) of the SNS topic that you created for the original pipe in your source account.

    Then, Subscribe the new Amazon SQS queue to your SNS topic.

  4. 刷新新故障转移组中的对象。

    ALTER FAILOVER GROUP my_pipe_failover_group REFRESH;
  5. Finally, promote the target account to serve as the source account with the ALTER FAILOVER GROUP command.

    ALTER FAILOVER GROUP my_pipe_failover_group PRIMARY;

    The mypipe pipe will begin to load any data that was made available since the last time the failover group was refreshed in the source account.

要验证复制的管道是否正常工作,请从管道的 COPY 语句中查询表。

SELECT * FROM mytable;

迁移到 Amazon Simple Notification Service (SNS)

本节介绍如何从直接将 Amazon S3 事件通知发送到 Amazon Simple Queue Service (SQS) 队列,迁移到使用 Amazon Simple Notification Service (SNS) 主题,此迁移适用于以下情况:

复制目录表或管道时,Snowflake 会在目标账户中创建一个新的 SQS 队列以处理自动化。您可以配置单个 SNS 主题,将事件通知从 S3 桶传送到多个账户的所有 SQS 队列。通过将 S3 事件通知广播到每个 SQS 队列,可以降低故障转移后丢失通知和数据的风险。

Note

如果您已经在使用 SNS,则无需迁移。否则,请按照通常步骤使用 SNS 配置自动化,用于辅助目录表或在故障转移之前自动引入管道。

先决条件

要进行迁移,您必须满足以下条件:

迁移到 SNS 主题

  1. Create an SNS topic in your AWS account. For instructions, see Creating an Amazon SNS topic (https://docs.aws.amazon.com/sns/latest/dg/sns-create-topic.html) in the AWS SNS documentation.

  2. Subscribe your target destinations (for example, other SQS queues or AWS Lambda workloads) for your S3 event notification(s) to your SNS topic. SNS publishes event notifications for your bucket to all subscribers to the topic. For instructions, see the AWS SNS documentation (https://docs.aws.amazon.com/sns/latest/dg/sns-create-subscribe-endpoint-to-topic.html).

  3. 使用以下权限更新主题的访问策略:

    • Allow the Snowflake IAM user to subscribe the SQS queue that is in your target account to your topic.
    • 允许 Amazon S3 将事件通知从桶发布到 SNS 主题。

    For instructions, see Step 1: Subscribe the Snowflake SQS Queue to the SNS Topic.

  4. In your target Snowflake account, call the SYSTEM$CONVERT_PIPES_SQS_TO_SNS function. The function subscribes the SQS queue in your target account to your SNS topic without interrupting metadata synchronization or ingestion work.

指定 S3 桶名称和 SNS 主题 ARN。

SELECT SYSTEM$CONVERT_PIPES_SQS_TO_SNS('s3_mybucket', 'arn:aws:sns:us-west-2:001234567890:MySNSTopic')
  1. Update your S3 event notifications to use your SNS topic as a destination. For instructions, see the Amazon S3 User Guide (https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-event-notifications.html).

完成这些步骤后,SQS 队列会自动解除与您的 S3 事件通知的绑定。使用指定 S3 桶的所有目录表和管道都将开始使用 SNS 作为通知的来源。