针对 Amazon S3 自动执行 Snowpipe

本主题说明如何使用针对 S3 桶的 Amazon SQS (Simple Queue Service) (https://aws.amazon.com/sqs/) 通知自动触发 Snowpipe 数据加载。

Snowflake 建议仅发送 Snowpipe 支持的事件,以降低成本、事件干扰和延迟。

本主题内容:

云平台支持

所有受支持云平台 上托管的 Snowflake 账户支持使用 S3 事件消息触发自动 Snowpipe 数据加载。

网络流量

Virtual Private Snowflake (VPS)AWS PrivateLink 客户的注意事项:

使用 Amazon SQS 通知自动执行 Snowpipe 的效果很好。但是,尽管 VPC(包括 VPS)内的 AWS 云存储可以与其自己的消息传递服务(Amazon SQS、Amazon Simple Notification Service)进行通信,但是此流量在 VPC 之外 Amazon 安全网络上的服务器之间流动;因此,此流量不受 VPC 保护。

配置对 Cloud Storage 的安全访问

备注

如果您已配置对存储数据文件的 S3 桶的安全访问,则可以跳过此部分。

本部分介绍如何使用存储集成来允许 Snowflake 从外部(如 S3)暂存区中引用的 Amazon S3 桶读取数据,并向其中写入数据。集成是已命名的第一类 Snowflake 对象,无需传递显式云提供商凭据(如密钥或访问令牌)。集成对象存储 AWS Identity and Access Management (IAM) 用户 ID。您组织中的管理员在 AWS 账户向集成授予 IAM 用户权限。

集成还可以列出桶(和可选路径),以限制用户在创建使用集成的外部暂存区时可以指定的位置。

备注

  • 需要在 AWS 中拥有创建和管理 IAM 策略和角色的权限,才能完成本部分中的说明。如果您不是 AWS 管理员,请让 AWS 管理员来执行这些任务。

  • 请注意,目前只能通过在同一政府区域的 AWS 上托管的 Snowflake 账户,使用存储集成访问 政府区域 中的 S3 存储。支持使用直接凭据从政府区域之外托管的账户访问 S3 存储。

下图显示了 S3 暂存区的集成流程:

Amazon S3 暂存区集成流程
  1. 外部(即 S3)暂存区在其定义中引用了存储集成对象。

  2. Snowflake 会自动将存储集成与为您的账户创建的 S3 IAM 用户相关联。Snowflake 会创建一个 IAM 用户,供 Snowflake 账户中的所有 S3 存储集成引用。

  3. 您组织中的 AWS 管理员向 IAM 用户授予权限,以访问暂存区定义中引用的桶。请注意,许多外部暂存区对象可以引用不同的桶和路径,并使用相同的存储集成进行身份验证。

当用户从暂存区加载或卸载数据时,Snowflake 会在允许或拒绝访问之前验证桶上授予 IAM 用户的权限。

备注

我们强烈建议使用此选项,这样可以避免在访问云存储时提供 IAM 凭据。有关其他存储访问选项,请参阅 配置对 Amazon S3 的安全访问

本部分内容:

第 1 步:配置 S3 桶的访问权限

AWS 访问控制要求

Snowflake 需要对 S3 桶和文件夹拥有以下权限才能访问该文件夹(以及子文件夹)中的文件:

  • s3:GetBucketLocation

  • s3:GetObject

  • s3:GetObjectVersion

  • s3:ListBucket

作为最佳实践,Snowflake 建议创建一个供 Snowflake 访问 S3 桶的 IAM 策略。然后,您可以将策略附加到角色,并使用由 AWS 为角色生成的安全凭据来访问桶中的文件。

创建 IAM 策略

以下分步说明介绍如何在 AWS 管理控制台中配置 Snowflake 的访问权限以访问您的 S3 桶。

  1. 登录 AWS 管理控制台。

  2. 从主页仪表板搜索并选择 IAM

  3. 从左侧导航窗格中选择 Account settings

  4. Endpoints 列表中的 Security Token Service (STS) 下,找到账户所在的 Snowflake 区域。如果 STS status 不活动,请将切换按钮移至 Active

  5. 从左侧导航窗格中选择 Policies

  6. 选择 Create Policy

  7. 对于 Policy editor,选择 JSON

  8. 添加允许 Snowflake 访问 S3 桶和文件夹的策略文档。

    以下策略(采用 JSON 格式)为 Snowflake 提供使用单个桶和文件夹路径加载或卸载数据所需的权限。

    将文本复制并粘贴到策略编辑器中:

    备注

    • 确保用实际桶名称和文件夹路径前缀替换 bucketprefix

    • 政府区域 中的桶的 Amazon 资源名称 (ARN) 有一个 arn:aws-us-gov:s3::: 前缀。

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                  "s3:GetObject",
                  "s3:GetObjectVersion"
                ],
                "Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket",
                    "s3:GetBucketLocation"
                ],
                "Resource": "arn:aws:s3:::<bucket>",
                "Condition": {
                    "StringLike": {
                        "s3:prefix": [
                            "<prefix>/*"
                        ]
                    }
                }
            }
        ]
    }
    
    Copy

    备注

    "s3:prefix": 条件设置为 ["*"] 或者 ["<path>/*"],以便分别向指定桶中的所有前缀或桶中的路径分别授予访问权限。

    注意 AWS 策略支持各种不同的安全用例。

  9. 选择 Next

  10. 输入 Policy name (例如 snowflake_access)和可选的 Description

  11. 选择 Create policy

第 2 步:在 AWS 中创建 IAM 角色

要在 AWS 管理控制台中为 Snowflake 配置访问权限,请执行以下操作:

  1. 在 Identity and Access Management (IAM) 控制面板的左侧导航窗格中,选择 Roles

  2. 选择 Create role

  3. 选择 AWS account 作为信任实体类型。

  4. Account ID 字段中,暂时输入自己的 AWS 账户 ID。在稍后的步骤中,您将修改信任关系并授予对 Snowflake 的访问权限。

  5. 选择 Require external ID 选项。外部 ID 用于向第三方(如 Snowflake)授予对 AWS 资源(例如 S3 桶)的访问权限。

    输入占位符 ID,例如 0000。在后续步骤中,您将修改 IAM 角色的信任关系并指定外部 ID 用于存储集成。

  6. 选择 Next

  7. 选择您在 第 1 步:配置 S3 桶访问权限 (本主题内容)中创建的策略。

  8. 选择 Next

    AWS 管理控制台中的 Review 页面
  9. 输入角色的名称和描述,然后选择 Create role

    您现在已经为一个桶创建了 IAM 策略,创建了一个 IAM 角色,并将策略附加到了该角色。

  10. 在角色摘要页面上,找到 Role ARN 值并将其记录下来。在下一步中,您将创建引用此角色的 Snowflake 集成。

备注

Snowflake 会将临时凭据缓存一段时间,但不能超过 60 分钟的过期时间。如果您撤消 Snowflake 的访问权限,用户或许也能够从云存储位置列出文件并访问数据,直到缓存过期。

第 3 步:在 Snowflake 中创建云存储集成

使用 CREATE STORAGE INTEGRATION 命令创建存储集成。存储集成是一个 Snowflake 对象,用于存储生成的身份和 S3 云存储访问管理 (IAM) 用户,以及一组可选的允许或阻止的存储位置(如桶)。组织中的云提供商管理员会将存储位置的权限授予生成的用户。此选项可让用户在创建暂存区或加载数据时无需提供凭据。

单个存储集成可以支持多个外部(如 S3)暂存区。暂存区定义中的 URL 必须与为 STORAGE_ALLOWED_LOCATIONS 参数指定的 S3 桶(和可选路径)一致。

备注

只有账户管理员(具有 ACCOUNTADMIN 角色的用户)或具有全局 CREATE INTEGRATION 权限的角色才能执行此 SQL 命令。

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<iam_role>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]
Copy

其中:

  • integration_name 是新集成的名称。

  • iam_role 是您在 第 2 步:在 AWS 中创建 IAM 角色 (本主题内容)中创建的角色的 Amazon 资源名称 (ARN)。

  • bucket 是存储数据文件的 S3 桶的名称(例如 mybucket)。在创建或修改引用此集成的暂存区时,必需的 STORAGE_ALLOWED_LOCATIONS 参数和可选的 STORAGE_BLOCKED_LOCATIONS 参数分别限制或阻止对这些桶的访问。

  • path 是一个可选路径,可用于提供对桶中对象的精细控制。

下面的示例创建了一个集成,允许访问账户中的所有桶,但阻止访问定义的 sensitivedata 文件夹。

也使用此集成的其他外部暂存区可以引用允许的桶和路径:

CREATE STORAGE INTEGRATION s3_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::001234567890:role/myrole'
  STORAGE_ALLOWED_LOCATIONS = ('*')
  STORAGE_BLOCKED_LOCATIONS = ('s3://mybucket1/mypath1/sensitivedata/', 's3://mybucket2/mypath2/sensitivedata/');
Copy

备注

(可选)使用 STORAGE_AWS_EXTERNAL_ID 参数来指定您自己的外部 ID。您可以选择此选项,在多个外部卷和/或存储集成中使用相同的外部 ID。

第 4 步:检索您的 Snowflake 账户的 AWSIAM 用户

  1. 要检索自动为您的 Snowflake 账户创建的 IAM 用户的 ARN,请使用 DESCRIBE INTEGRATION

    DESC INTEGRATION <integration_name>;
    
    Copy

    其中:

    例如:

    DESC INTEGRATION s3_int;
    
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------+
    | property                  | property_type | property_value                                                                 | property_default |
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------|
    | ENABLED                   | Boolean       | true                                                                           | false            |
    | STORAGE_ALLOWED_LOCATIONS | List          | s3://mybucket1/mypath1/,s3://mybucket2/mypath2/                                | []               |
    | STORAGE_BLOCKED_LOCATIONS | List          | s3://mybucket1/mypath1/sensitivedata/,s3://mybucket2/mypath2/sensitivedata/    | []               |
    | STORAGE_AWS_IAM_USER_ARN  | String        | arn:aws:iam::123456789001:user/abc1-b-self1234                                 |                  |
    | STORAGE_AWS_ROLE_ARN      | String        | arn:aws:iam::001234567890:role/myrole                                          |                  |
    | STORAGE_AWS_EXTERNAL_ID   | String        | MYACCOUNT_SFCRole=2_a123456/s0aBCDEfGHIJklmNoPq=                               |                  |
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------+
    
    Copy
  2. 记录以下属性的值:

    属性

    描述

    STORAGE_AWS_IAM_USER_ARN

    为您的 Snowflake 账户创建的 AWS IAM 用户,例如 arn:aws:iam::123456789001:user/abc1-b-self1234。Snowflake 会为整个 Snowflake 账户配置单一 IAM 用户。您账户中的所有 S3 存储集成都使用该 IAM 用户。

    STORAGE_AWS_EXTERNAL_ID

    Snowflake 与 AWS 建立信任关系所使用的外部 ID。如果创建存储集成时未指定外部 ID (STORAGE_AWS_EXTERNAL_ID),Snowflake 会生成一个 ID 供您使用。

    在下一部分中需要提供这些值。

第 5 步:授予访问桶对象的 IAM 用户权限

以下分步说明描述了如何在 AWS 管理控制台中配置 Snowflake 的 IAM 访问权限,以便您可以使用 S3 桶来加载和卸载数据:

  1. 登录 AWS 管理控制台。

  2. 选择 IAM

  3. 从左侧导航窗格中选择 Roles

  4. 选择在 第 2 步:在 AWS 中创建 IAM 角色 (本主题内容)中创建的角色。

  5. 选择 Trust relationships 选项卡。

  6. 选择 Edit trust policy

  7. 使用您在 第 4 步:检索 Snowflake 账户 AWS IAM 用户 (本主题内容)中记录的 DESC STORAGE INTEGRATION 输出值修改策略文档。

    IAM 角色的策略文档

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "",
          "Effect": "Allow",
          "Principal": {
            "AWS": "<snowflake_user_arn>"
          },
          "Action": "sts:AssumeRole",
          "Condition": {
            "StringEquals": {
              "sts:ExternalId": "<snowflake_external_id>"
            }
          }
        }
      ]
    }
    
    Copy

    其中:

    • snowflake_user_arn 是记录的 STORAGE_AWS_IAM_USER_ARN 值。

    • snowflake_external_id 是记录的 STORAGE_AWS_EXTERNAL_ID 值。

      在此示例中,snowflake_external_id 值为 MYACCOUNT_SFCRole=2_a123456/s0aBCDEfGHIJklmNoPq=

      备注

      出于安全原因,如果您在不指定外部 ID 的情况下创建新的存储集成(或使用 CREATE OR REPLACE STORAGE INTEGRATION 语法重新创建一个现有的存储集成),新的集成具有 不同 的外部 ID,所以它不能解析信任关系,除非更新信任策略。

  8. 选择 Update policy 保存更改。

备注

Snowflake 会将临时凭据缓存一段时间,但不能超过 60 分钟的过期时间。如果您撤消 Snowflake 的访问权限,用户或许也能够从云存储位置列出文件并加载数据,直到缓存过期。

确定正确的选项

在继续操作之前,请确定数据文件所在的 S3 桶中的目标路径(或 AWS 术语中的“前缀”)是否存在 S3 事件通知。AWS 规则禁止为同一路径创建冲突的通知。

支持通过以下选项,使用 Amazon SQS 自动执行 Snowpipe:

  • 选项 1.新的 S3 事件通知: 在 S3 桶中为目标路径创建事件通知。当文件准备好加载时,事件通知会通过 SQS 队列通知 Snowpipe。

    重要

    如果您的 S3 桶存在冲突事件通知,请改用选项 2。

  • 选项 2.现有事件通知: 将 Amazon Simple Notification Service (SNS) (https://aws.amazon.com/sns/) 配置为广播器,以便与多个端点(或“订阅者”,如 SQS 队列或 AWS Lambda 工作负载)共享给定路径的通知,包括用于 Snowpipe 自动化的 Snowflake SQS 队列。当文件准备好加载时,由 SNS 发布的 S3 事件通知通过 SQS 队列通知 Snowpipe。

  • 选项 3。设置 Amazon EventBridge 以自动执行 Snowpipe: 与选项 2 类似,您还可以为 S3 桶启用 Amazon EventBridge (https://aws.amazon.com/eventbridge/),并创建规则以向 SNS 主题发送通知。

选项 1:创建新的 S3 事件通知以自动执行 Snowpipe

本部分介绍比较常见的选项,用于使用针对 S3 桶的 Amazon SQS (Simple Queue Service) (https://aws.amazon.com/sqs/) 通知,自动触发 Snowpipe 数据加载。这些步骤说明了如何为已存储数据文件的 S3 桶中的目标路径(或 AWS 术语中的“前缀”)创建事件通知。

重要

如果 S3 桶存在冲突的事件通知,请改用(本主题内容) 选项 2:配置 Amazon SNS 以使用 SQS 通知自动执行 Snowpipe。AWS 规则禁止为同一目标路径创建冲突的通知。

下图显示了 Snowpipe 自动引入的流程流:

Snowpipe 自动引入流程流
  1. 数据文件在暂存区中加载。

  2. S3 事件通知会通过 SQS 队列通知 Snowpipe:文件可供加载。Snowpipe 将文件复制到队列中。

  3. Snowflake 提供的虚拟仓库根据指定管道中定义的参数将数据从排队文件加载到目标表中。

备注

本主题中的说明假定将加载数据的 Snowflake 数据库中已存在目标表。

第 1 步:创建暂存区(如果需要)

使用 CREATE STAGE 命令创建引用您的 S3 桶的外部暂存区。Snowpipe 会从暂存区中获取数据文件,并在加载到目标表之前将它们临时加入队列。或者,您可以使用现有的外部暂存区。

备注

  • 要配置对云存储位置的安全访问,请参阅 配置对 Cloud Storage 的安全访问 (本主题内容)。

  • 要在 CREATE STAGE 语句中引用存储集成,角色必须具有对存储集成对象的 USAGE 权限。

以下示例会在活动架构中为用户会话创建一个名为 mystage 的暂存区。云存储 URL 包括路径 files。该暂存区引用名为 my_storage_int 的存储集成:

USE SCHEMA snowpipe_db.public;

CREATE STAGE mystage
  URL = 's3://mybucket/load/files'
  STORAGE_INTEGRATION = my_storage_int;
Copy

第 2 步:创建已启用自动引入的管道

使用 CREATE PIPE 命令创建管道。管道定义了 Snowpipe 用来将数据从引入队列加载到目标表中的 COPY INTO <table> 语句。

以下示例会在活动架构中为用户会话创建名为 mypipe 的管道。管道将数据从 mystage 暂存区中暂存的文件加载到 mytable 表:

create pipe snowpipe_db.public.mypipe auto_ingest=true as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');
Copy

AUTO_INGEST=true 参数指定,当新数据可供加载时,将从 S3 桶发送的事件通知读取到 SQS 队列。

重要

将管道定义中的暂存区引用与现有管道进行比较。确认相同 S3 桶的目录路径不重叠;否则,多个管道可能会多次将同一组数据文件加载到一个或多个目标表中。例如,当多个暂存区引用具有不同粒度级别的相同 S3 桶(如 s3://mybucket/path1s3://mybucket/path1/path2)时,可能会发生这种情况。在此用例中,如果文件暂存在 s3://mybucket/path1/path2 中,则两个暂存区的管道将加载文件的副本。

这与手动 Snowpipe 设置(已禁用 自动引入)不同,手动设置要求用户将一组命名文件提交到 REST API,将文件加入队列以进行加载。启用自动引入后,每个管道都会通过 S3 事件通知接收生成的文件列表。需要格外小心以避免数据重复。

第 3 步:配置安全性

对于将使用 Snowpipe 执行连续数据加载的每个用户,请授予针对数据加载对象(即目标数据库、架构和表;暂存区对象和管道)的足够访问控制权限。

备注

为了遵循“最小权限”的一般原则,我们建议使用管道创建单独的用户和角色以用于引入文件。创建用户时,应将此角色作为其默认角色。

要使用 Snowpipe,需要具有以下权限的角色:

对象

权限

备注

命名管道

OWNERSHIP

命名暂存区

USAGE、READ

命名文件格式

USAGE

可选;仅当 第 1 步:创建暂存区(如果需要) 中创建的暂存区引用命名文件格式时才需要。

目标数据库

USAGE

目标架构

USAGE

目标表

INSERT、SELECT

使用 GRANT <privileges> 命令为角色授予权限。

备注

只有安全管理员(即具有 SECURITYADMIN 角色的用户)或更高级别的用户,或者同时具有账户中 CREATE ROLE 的权限和全局 MANAGE GRANTS 权限的其他角色,才能创建角色和授予权限。

例如,创建名为 snowpipe_role 的角色,该角色可以访问一组 snowpipe_db.public 数据库对象以及一个名为 mypipe 的管道,然后将该角色授予用户:

-- Create a role to contain the Snowpipe privileges
use role securityadmin;

create or replace role snowpipe_role;

-- Grant the required privileges on the database objects
grant usage on database snowpipe_db to role snowpipe_role;

grant usage on schema snowpipe_db.public to role snowpipe_role;

grant insert, select on snowpipe_db.public.mytable to role snowpipe_role;

grant usage on stage snowpipe_db.public.mystage to role snowpipe_role;

-- Pause the pipe for OWNERSHIP transfer
alter pipe mypipe SET PIPE_EXECUTION_PAUSED = true;

-- Grant the OWNERSHIP privilege on the pipe object
grant ownership on pipe snowpipe_db.public.mypipe to role snowpipe_role;

-- Grant the role to a user
grant role snowpipe_role to user jsmith;

-- Set the role as the default role for the user
alter user jsmith set default_role = snowpipe_role;

-- Resume the pipe
alter pipe mypipe SET PIPE_EXECUTION_PAUSED = false;
Copy

第 4 步:配置事件通知

为 S3 桶配置事件通知,以便在有新数据可供加载时通知 Snowpipe。自动引入功能依赖 SQS 队列将事件通知从 S3 传递到 Snowpipe。

为便于使用,Snowpipe SQS 队列由 Snowflake 创建和管理。SHOW PIPES 命令输出显示 SQS 队列的 Amazon 资源名称 (ARN)。

  1. 执行 SHOW PIPES 命令:

    SHOW PIPES;
    
    Copy

    记住针对 notification_channel 列中暂存区的 SQS 队列的 ARN。将 ARN 复制到方便的位置。

    备注

    根据 AWS 准则,Snowflake 为每个 S3 桶指定的 SQS 队列不超过一个。SQS 队列可由同一 AWS 账户的同一区域中的多个桶共享。SQS 队列会协调所有管道的通知,将 S3 桶的外部暂存区连接到目标表。将数据文件上传到桶时,与暂存区目录路径匹配的所有管道都会将文件一次性加载到其相应的目标表中。

  2. 登录到 Amazon S3 控制台。

  3. 使用 Amazon S3 文档 (https://docs.aws.amazon.com/AmazonS3/latest/user-guide/enable-event-notifications.html) 中提供的说明为您的 S3 桶配置事件通知。填写如下字段:

    • Name:事件通知的名称(例如 Auto-ingest Snowflake)。

    • Events:选择 ObjectCreate (All) 选项。

    • Send to:从下拉列表中选择 SQS Queue

    • SQS:从下拉列表中选择 Add SQS queue ARN

    • SQS queue ARN:粘贴 SHOW PIPES 输出中的 SQS 队列名称。

备注

这些说明旨在创建单一事件通知,来监控整个 S3 桶的活动。这是最简单的方法。此通知处理 S3 桶目录中更精细级别配置的所有管道。Snowpipe 仅加载管道定义中指定的数据文件。但请注意,管道定义之外活动的大量通知可能会负面影响 Snowpipe 筛选通知并采取措施的速率。

或者,在上述步骤中,配置一个或多个路径和/或文件扩展名(或 AWS 术语中的 前缀后缀)来筛选事件活动。有关信息,请参阅相关 AWS 文档主题 (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-filtering.html) 的对象键名筛选信息。对您希望通知监控的每个其他路径或文件扩展名重复这些步骤。

请注意, AWS 将这些通知 队列配置 的数量限制为每个 S3 桶最多 100 个。

另请注意, AWS 不允许同一 S3 桶的重叠队列配置(跨事件通知)。例如,如果为 s3://mybucket/load/path1 配置现有通知,那么您就无法在更高级别创建另一个通知,例如 s3://mybucket/load,反之亦然。

现已配置附带自动引入功能的 Snowpipe!

当新数据文件添加到 S3 桶时,事件通知会告知 Snowpipe,将数据文件加载到管道中定义的目标表中。

第 5 步:加载历史文件

要加载在配置 SQS 通知 之前 外部暂存区中已存在的数据文件积压,请参阅 正在加载历史数据

第 6 步:删除暂存文件

在成功加载数据并且不再需要这些文件后删除暂存文件。有关信息,请参阅 在 Snowpipe 加载数据后删除暂存文件

选项 2:配置 Amazon SNS 以使用 SQS 通知自动执行 Snowpipe

本部分介绍如何使用针对 S3 桶的 Amazon SQS (Simple Queue Service) (https://aws.amazon.com/sqs/) 通知自动触发 Snowpipe 数据加载。这些步骤解释了如何将 Amazon Simple Notification Service (SNS) (https://aws.amazon.com/sns/) 配置为广播器,以便向多个订阅者发布 S3 桶的事件通知(如 SQS 队列或 AWS Lambda 工作负载),包括用于 Snowpipe 自动化的 Snowflake SQS 队列。

备注

这些说明假设数据文件所在的 S3 桶中的目标路径存在事件通知。若不存在事件通知,则:

  • 改为按照(本主题内容) 选项 1:创建新的 S3 事件通知以自动执行 Snowpipe 操作。

  • 为您的 S3 桶创建事件通知,然后继续按照本主题中的说明操作。有关信息,请参阅 Amazon S3 文档 (https://docs.aws.amazon.com/AmazonS3/latest/user-guide/enable-event-notifications.html)。

下图显示了 Snowpipe Auto-Ingest with Amazon SNS 的流程流:

通过 Amazon SNS 执行的 Snowpipe 自动引入流程流
  1. 数据文件在暂存区中加载。

  2. SNS 发布的 S3 事件通知会通过 SQS 队列通知 Snowpipe:文件可供加载。Snowpipe 将文件复制到队列中。

  3. Snowflake 提供的虚拟仓库根据指定管道中定义的参数将数据从排队文件加载到目标表中。

备注

这些说明假定将加载数据的 Snowflake 数据库中已存在目标表。

Snowpipe 自动引入支持 AWS KMS 加密的 SNS 主题。有关更多信息,请参阅 静态加密 (https://docs.aws.amazon.com/sns/latest/dg/sns-server-side-encryption.html)。

先决条件:创建 Amazon SNS 主题和订阅

  1. 在您的 SNS 账户中创建一个 AWS 主题来处理 S3 桶上 Snowflake 暂存区位置的所有消息。

  2. 将 S3 事件通知的目标目的地(如其他 SQS 队列或 AWS Lambda 工作负载)订阅到此主题。SNS 将您的桶的事件通知发布给该主题的所有订阅者。

有关信息,请参阅 SNS 文档 (https://aws.amazon.com/documentation/sns/)。

第 1 步:将 Snowflake SQS 队列订阅至 SNS 主题

  1. 登录 AWS 管理控制台。

  2. 从主页仪表板中选择 Simple Notification Service (SNS)。

  3. 从左侧导航窗格中选择 Topics

  4. 找到您的 S3 桶主题。注意主题 ARN。

  5. 使用 Snowflake 客户端,用您的 SNS 主题 ARN 查询 SYSTEM$GET_AWS_SNS_IAM_POLICY 系统函数:

    select system$get_aws_sns_iam_policy('<sns_topic_arn>');
    
    Copy

    该函数返回一个授予 Snowflake SQS 队列权限以订阅 SNS 主题的 IAM 策略。

    例如:

    select system$get_aws_sns_iam_policy('arn:aws:sns:us-west-2:001234567890:s3_mybucket');
    
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | SYSTEM$GET_AWS_SNS_IAM_POLICY('ARN:AWS:SNS:US-WEST-2:001234567890:S3_MYBUCKET')                                                                                                                                                                   |
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | {"Version":"2012-10-17","Statement":[{"Sid":"1","Effect":"Allow","Principal":{"AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"},"Action":["sns:Subscribe"],"Resource":["arn:aws:sns:us-west-2:001234567890:s3_mybucket"]}]}                 |
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    
    Copy
  6. 返回 AWS 管理控制台。从左侧导航窗格中选择 Topics

  7. 选择您的 S3 桶的主题,然后点击 Edit 按钮。Edit 页面随即打开。

  8. 点击 Access policy - Optional 以展开页面的此区域。

  9. 将 SYSTEM$GET_AWS_SNS_IAM_POLICY 函数结果中添加的 IAM 策略合并到 JSON 文档中。

    例如:

    原始 IAM 策略(缩写):

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         }
       ]
     }
    
    Copy

    合并的 IAM 策略:

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         },
         {
            "Sid":"1",
            "Effect":"Allow",
            "Principal":{
              "AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"
             },
             "Action":[
               "sns:Subscribe"
             ],
             "Resource":[
               "arn:aws:sns:us-west-2:001234567890:s3_mybucket"
             ]
         }
       ]
     }
    
    Copy
  10. 添加额外的策略授权以允许 S3 将桶的事件通知发布到 SNS 主题。

    例如(使用 SNS 主题 ARN 以及这些说明中使用的 S3 桶):

    {
        "Sid":"s3-event-notifier",
        "Effect":"Allow",
        "Principal":{
           "Service":"s3.amazonaws.com"
        },
        "Action":"SNS:Publish",
        "Resource":"arn:aws:sns:us-west-2:001234567890:s3_mybucket",
        "Condition":{
           "ArnLike":{
              "aws:SourceArn":"arn:aws:s3:*:*:s3_mybucket"
           }
        }
     }
    
    Copy

    合并的 IAM 策略:

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         },
         {
            "Sid":"1",
            "Effect":"Allow",
            "Principal":{
              "AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"
             },
             "Action":[
               "sns:Subscribe"
             ],
             "Resource":[
               "arn:aws:sns:us-west-2:001234567890:s3_mybucket"
             ]
         },
         {
            "Sid":"s3-event-notifier",
            "Effect":"Allow",
            "Principal":{
               "Service":"s3.amazonaws.com"
            },
            "Action":"SNS:Publish",
            "Resource":"arn:aws:sns:us-west-2:001234567890:s3_mybucket",
            "Condition":{
               "ArnLike":{
                  "aws:SourceArn":"arn:aws:s3:*:*:s3_mybucket"
               }
            }
          }
       ]
     }
    
    Copy
  11. 点击 Save changes 按钮。

第 2 步:创建暂存区(如果需要)

使用 CREATE STAGE 命令创建引用您的 S3 桶的外部暂存区。Snowpipe 会从暂存区中获取数据文件,并在加载到目标表之前将它们临时加入队列。

或者,您可以使用现有的外部暂存区。

备注

要配置对云存储位置的安全访问,请参阅 配置对 Cloud Storage 的安全访问 (本主题内容)。

以下示例会在活动架构中为用户会话创建一个名为 mystage 的暂存区。云存储 URL 包括路径 files。该暂存区引用名为 my_storage_int 的存储集成:

CREATE STAGE mystage
  URL = 's3://mybucket/load/files'
  STORAGE_INTEGRATION = my_storage_int;
Copy

第 3 步:创建已启用自动引入的管道

使用 CREATE PIPE 命令创建管道。管道定义了 Snowpipe 用来将数据从引入队列加载到目标表中的 COPY INTO <table> 语句。在 COPY 语句中,从 先决条件:创建 Amazon SNS 主题和订阅 中识别 SNS 主题 ARN。

以下示例会在活动架构中为用户会话创建名为 mypipe 的管道。管道将数据从 mystage 暂存区中暂存的文件加载到 mytable 表:

create pipe snowpipe_db.public.mypipe
  auto_ingest=true
  aws_sns_topic='<sns_topic_arn>'
  as
    copy into snowpipe_db.public.mytable
    from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');
Copy

其中:

AUTO_INGEST = true

指定在新数据可供加载时将从 S3 桶发送的事件通知读取到 SQS 队列。

AWS_SNS_TOPIC = '<sns_topic_arn>'

为 S3 桶的 SNS 主题指定 ARN,例如,当前示例中为 arn:aws:sns:us-west-2:001234567890:s3_mybucket。CREATE PIPE 语句将 Snowflake SQS 队列订阅至指定的 SNS 主题。请注意,管道只会将文件复制到事件通知通过 SNS 主题触发的引入队列。

要从管道中移除任一参数,当前需要使用语法 CREATE OR REPLACE PIPE 重新创建管道。

重要

验证 COPY INTO 中的存储位置引用,<table> statement does not overlap with the reference in existing pipes in the account. Otherwise, multiple pipes could load the same set of data files into the target tables. For example, this situation can occur when multiple pipe definitions reference the same storage location with different levels of granularity, such as <storage_location>/path1/ and <storage_location>/path1/path2/. In this example, if files are staged in <storage_location>/path1/path2/,两个管道都将加载这些文件的副本。

执行 SHOW PIPES、查询 Account Usage 中的 PIPES 视图,或查询 Information Schema 中的 PIPES 视图以查看账户内所有管道定义中的 COPY INTO <table> 语句。

第 4 步:配置安全性

对于将使用 Snowpipe 执行连续数据加载的每个用户,请授予针对数据加载对象(即目标数据库、架构和表;暂存区对象和管道)的足够访问控制权限。

备注

为了遵循“最小权限”的一般原则,我们建议使用管道创建单独的用户和角色以用于引入文件。创建用户时,应将此角色作为其默认角色。

要使用 Snowpipe,需要具有以下权限的角色:

对象

权限

备注

命名管道

OWNERSHIP

命名存储集成

USAGE

仅当 第 2 步:创建暂存区(如果需要) 中创建的暂存区引用存储集成时才需要。

命名暂存区

USAGE、READ

命名文件格式

USAGE

可选;仅当 _第 2 步:创建暂存区(如果需要) 中创建的暂存区引用命名文件格式时才需要。

目标数据库

USAGE

目标架构

USAGE

目标表

INSERT、SELECT

使用 GRANT <privileges> 命令为角色授予权限。

备注

只有安全管理员(即具有 SECURITYADMIN 角色的用户)或更高级别的用户才能创建角色。

例如,创建名为 snowpipe_role 的角色,该角色可以访问一组 snowpipe_db.public 数据库对象以及一个名为 mypipe 的管道,然后将该角色授予用户:

-- Create a role to contain the Snowpipe privileges
use role securityadmin;

create or replace role snowpipe_role;

-- Grant the required privileges on the database objects
grant usage on database snowpipe_db to role snowpipe_role;

grant usage on schema snowpipe_db.public to role snowpipe_role;

grant insert, select on snowpipe_db.public.mytable to role snowpipe_role;

grant usage, read on stage snowpipe_db.public.mystage to role snowpipe_role;

-- Pause the pipe for OWNERSHIP transfer
alter pipe mypipe SET PIPE_EXECUTION_PAUSED = true;

-- Grant the OWNERSHIP privilege on the pipe object
grant ownership on pipe snowpipe_db.public.mypipe to role snowpipe_role;

-- Grant the role to a user
grant role snowpipe_role to user jsmith;

-- Set the role as the default role for the user
alter user jsmith set default_role = snowpipe_role;

-- Resume the pipe
alter pipe mypipe SET PIPE_EXECUTION_PAUSED = false;
Copy

现已配置附带自动引入功能的 Snowpipe!

当新数据文件添加到 S3 桶时,事件通知会告知 Snowpipe,将数据文件加载到管道中定义的目标表中。

第 5 步:加载历史文件

要加载在配置 SQS 通知 之前 外部暂存区中已存在的数据文件积压,请参阅 正在加载历史数据

第 6 步:删除暂存文件

在成功加载数据并且不再需要这些文件后删除暂存文件。有关信息,请参阅 在 Snowpipe 加载数据后删除暂存文件

选项 3:设置 Amazon EventBridge 以自动执行 Snowpipe

与选项 2 类似,您还可以设置 Amazon EventBridge 以自动执行 Snowpipe。

第 1 步:创建 Amazon SNS 主题

按照(本主题内容) 先决条件:创建 Amazon SNS 主题和订阅 操作。

第 2 步:创建 EventBridge 规则以订阅 S3 桶并向 SNS 主题发送通知

  • 为 S3 桶 启用 Amazon EventBridge (https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-event-notifications-eventbridge.html)。

  • 创建 EventBridge 规则,将通知发送 (https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-s3-object-created-tutorial.html) 至第 1 步中创建的 SNS 主题。

第 3 步:配置 Amazon SNS 以使用 SQS 通知自动执行 Snowpipe

按照 选项 2:配置 Amazon SNS 以使用 SQS 通知自动执行 Snowpipe (本主题内容)操作。

SYSTEM$PIPE_STATUS 输出

SYSTEM$PIPE_STATUS 函数检索管道当前状态的 JSON 表示形式。

对于将 AUTO_INGEST 设置为 TRUE 的管道,该函数返回包含以下名称/值对的 JSON 对象(如果适用于当前管道状态):

{"executionState":"<value>","oldestFileTimestamp":<value>,"pendingFileCount":<value>,"notificationChannelName":"<value>","numOutstandingMessagesOnChannel":<value>,"lastReceivedMessageTimestamp":"<value>","lastForwardedMessageTimestamp":"<value>","error":<value>,"fault":<value>}

有关输出值的说明,请参阅 SQL 函数的引用主题。

语言: 中文