为 Microsoft Azure Blob 存储自动化 Snowpipe¶
本主题提供有关使用 Blob 存储事件的 Microsoft Azure 事件网格 (https://azure.microsoft.com/en-us/services/event-grid/) 消息自动触发 Snowpipe 数据加载的说明。这些说明解释了如何为存储数据文件的 Blob 存储中的目标路径创建事件消息。
Snowflake 支持以下类型的 Blob 存储账户:
Blob 存储
Data Lake Storage Gen2
General-purpose v2
请注意,只有 Microsoft.Storage.BlobCreated
事件才会触发 Snowpipe 加载文件。向 Blob 存储添加新对象会触发这些事件。重命名目录或对象不会触发这些事件。Snowflake 支持以下 Microsoft.Storage.BlobCreated
APIs:
CopyBlob
PutBlob
PutBlockList
FlushWithClose
SftpCommit
Snowflake 建议仅发送 Snowpipe 支持的事件,以降低成本、事件干扰和延迟。
对于 Data Lake Storage Gen2 存储账户,当客户端使用 CreateFile
和 FlushWithClose
操作时会触发 Microsoft.Storage.BlobCreated
事件。如果使用 SSH 文件传输协议 (SFTP),则使用 SftpCreate
和 SftpCommit
操作触发 Microsoft.Storage.BlobCreated
事件。CreateFile
或 SftpCreate
API 不会单独指示存储账户中的文件提交。如果 FlushWithClose
或 SftpCommit
消息未发送到 Snowflake 队列,则 Snowpipe 不会引入该文件。
备注
Snowflake 仅支持 Azure 事件网格事件架构 (https://learn.microsoft.com/en-us/azure/event-grid/event-schema);不支持 Azure 事件网格 CloudEvents 架构 (https://learn.microsoft.com/en-us/azure/event-grid/cloud-event-schema)。
云平台支持¶
所有受支持的云平台 上托管的 Snowflake 账户都支持使用 Azure 事件网格消息触发自动 Snowpipe 数据加载。
流程流¶
Azure 容器的 Microsoft Azure 事件网格 (https://azure.microsoft.com/en-us/services/event-grid/) 通知会自动触发 Snowpipe 数据加载。
下图显示了 Snowpipe 自动引入的流程流:
数据文件在暂存区中加载。
Blob 存储事件消息通过事件网格通知 Snowpipe 文件已做好加载准备。Snowpipe 将文件复制到队列中。
Snowflake 提供的虚拟仓库根据指定管道中定义的参数将数据从排队文件加载到目标表中。
有关信息,请参阅 为 Microsoft Azure Blob 存储自动化 Snowpipe。
配置对 Cloud Storage 的安全访问¶
备注
如果您已配置对存储数据文件的 Azure Blob 存储容器的安全访问,则可以跳过此部分。
本部分介绍如何配置 Snowflake 存储集成对象,以将云存储的身份验证责任委托给 Snowflake Identity and Access Management (IAM) 实体。
备注
我们强烈建议使用此选项,这样可以避免在访问云存储时提供 IAM 凭据。有关其他存储访问选项,请参阅 配置 Azure 容器以加载数据。
本部分将描述如何使用存储集成来允许 Snowflake 从外部 (Azure) 暂存区中引用的 Azure 容器读取数据,并向其中写入数据。集成是已命名的第一类 Snowflake 对象,无需传递显式云提供商凭据(如密钥或访问令牌)。集成对象会存储称为 应用程序注册 的 Azure Identity and Access Management (IAM) 用户 ID。您组织中的管理员在 Azure 账户中授予此应用程序必要的权限。
集成还必须指定容器(和可选路径),以限制用户在创建使用集成的外部暂存区时可以指定的位置。
备注
需要具有在 Azure 中拥有管理存储账户的权限,才能完成本部分中的说明。如果您不是 Azure 管理员,请让 Azure 管理员执行这些任务。
本部分内容:
第 1 步:在 Snowflake 中创建云存储集成¶
使用 CREATE STORAGE INTEGRATION 命令创建存储集成。存储集成是一个 Snowflake 对象,它存储为 Azure 云存储生成的服务主体,以及一组可选的允许或阻止的存储位置(即容器)。组织中的云提供商管理员会将存储位置的权限授予生成的服务主体。此选项可让用户在创建暂存区或加载数据时无需提供凭据。
单个存储集成可以支持多个外部(如 Azure)暂存区。暂存区定义中的 URL 必须与为 STORAGE_ALLOWED_LOCATIONS 参数指定的 Azure 容器(和可选路径)一致。
备注
只有账户管理员(具有 ACCOUNTADMIN 角色的用户)或具有全局 CREATE INTEGRATION 权限的角色才能执行此 SQL 命令。
CREATE STORAGE INTEGRATION <integration_name>
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'AZURE'
ENABLED = TRUE
AZURE_TENANT_ID = '<tenant_id>'
STORAGE_ALLOWED_LOCATIONS = ('azure://<account>.blob.core.windows.net/<container>/<path>/', 'azure://<account>.blob.core.windows.net/<container>/<path>/')
[ STORAGE_BLOCKED_LOCATIONS = ('azure://<account>.blob.core.windows.net/<container>/<path>/', 'azure://<account>.blob.core.windows.net/<container>/<path>/') ]
其中:
integration_name
是新集成的名称。tenant_id
是允许和阻止的存储账户所属的 Office 365 租户的 ID。存储集成只能对一个租户进行身份验证,因此允许和阻止的存储位置必须引用同属于此租户的存储账户。要查找租户 ID,请登录 Azure 门户并点击 Azure Active Directory » Properties。租户 ID 将显示在 Tenant ID 字段中。
container
是存储数据文件的 Azure 容器的名称(例如mycontainer
)。STORAGE_ALLOWED_LOCATIONS 和 STORAGE_BLOCKED_LOCATIONS 参数分别允许或阻止在创建或修改引用此集成的暂存区时访问这些容器。path
是一个可选路径,可用于提供对容器中逻辑目录的精细控制。
以下示例创建一个集成,该集成明确限制使用该集成的外部暂存区只能引用两个容器和路径中的任意一个。在后面的步骤中,我们将创建一个外部暂存区,并引用这些容器和路径之一。使用此集成的多个外部暂存区可以引用受允许的容器和路径:
CREATE STORAGE INTEGRATION azure_int TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = 'AZURE' ENABLED = TRUE AZURE_TENANT_ID = 'a123b4c5-1234-123a-a12b-1a23b45678c9' STORAGE_ALLOWED_LOCATIONS = ('azure://myaccount.blob.core.windows.net/mycontainer1/mypath1/', 'azure://myaccount.blob.core.windows.net/mycontainer2/mypath2/') STORAGE_BLOCKED_LOCATIONS = ('azure://myaccount.blob.core.windows.net/mycontainer1/mypath1/sensitivedata/', 'azure://myaccount.blob.core.windows.net/mycontainer2/mypath2/sensitivedata/');
第 2 步:授予 Snowflake 对存储位置的访问权限¶
执行 DESCRIBE INTEGRATION 命令以检索许可 URL:
DESC STORAGE INTEGRATION <integration_name>;
其中:
integration_name
是您在 第 1 步:在 Snowflake 中创建云存储集成 中创建的集成的名称。
请注意以下列中的值:
- AZURE_CONSENT_URL:
Microsoft 权限请求页面的 URL。
- AZURE_MULTI_TENANT_APP_NAME:
为账户创建的 Snowflake 客户端应用程序的名称。在本部分后面的步骤中,需要授予此应用程序所需的权限,使其在受允许的存储位置上获取访问令牌。
在 Web 浏览器中,导航到 AZURE_CONSENT_URL 列中的 URL。该页面显示 Microsoft 权限请求页面。
点击 Accept 按钮。此操作允许向为 Snowflake 账户创建的 Azure 服务主体授予对租户内指定资源的访问令牌。仅当您授予服务主体对容器的适当权限时,才能成功获取访问令牌(请参阅下一步)。
Microsoft 权限请求页面会重定向到 Snowflake 公司站点 (snowflake.com)。
登录 Microsoft Azure 门户。
导航到 Azure Services » Storage Accounts。点击要授予 Snowflake 服务主体访问权限的存储账户的名称。
点击 Access Control (IAM) » Add role assignment。
选择要授予 Snowflake 服务主体的所需角色:
Storage Blob Data Reader
仅授予读取访问权限。此权限允许从存储账户中暂存的文件加载数据。Storage Blob Data Contributor
授予读取和写入访问权限。此权限允许加载存储账户中暂存的文件内的数据,或是将数据卸载到存储账户中暂存的文件。该角色还允许执行 REMOVE 命令来移除存储账户中暂存的文件。
搜索 Snowflake 服务主体。这是 DESC STORAGE INTEGRATION 输出(第 1 步中)中 AZURE_MULTI_TENANT_APP_NAME 属性的身份。在 AZURE_MULTI_TENANT_APP_NAME 属性中搜索下划线 之前 的字符串。
重要
Azure 可能需要一个小时或更长时间才能创建通过此部分中的 Microsoft 请求页面请求的 Snowflake 服务主体。如果服务主体不能立即使用,我们建议等待一两个小时,然后再次搜索。
如果删除服务主体,存储集成将停止工作。
点击 Review + assign 按钮。
备注
根据 Microsoft Azure 文档,角色分配传播至多可能需要五分钟。
Snowflake 会将临时凭据缓存一段时间,但不能超过 60 分钟的过期时间。如果您撤消 Snowflake 的访问权限,用户或许也能够从云存储位置列出文件并加载数据,直到缓存过期。
使用 Azure 事件网格配置自动化¶
第 1 步:配置事件网格订阅¶
本部分介绍如何使用 Azure CLI 为 Azure 存储事件设置事件网格订阅。有关本部分中描述的步骤的更多信息,请参阅 Azure 文档中的以下文章:
https://docs.microsoft.com/en-us/azure/event-grid/custom-event-to-queue-storage (https://docs.microsoft.com/en-us/azure/event-grid/custom-event-to-queue-storage)
https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blob-event-quickstart (https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blob-event-quickstart)
创建资源组¶
事件网格 主题 提供了一个端点,来源(即 Azure 存储)会将事件发送到该端点。主题用作相关事件的集合。事件网格主题是 Azure 资源,必须放在 Azure 资源组中。
执行以下命令以创建资源组:
az group create --name <resource_group_name> --location <location>
其中:
resource_group_name
是新资源组的名称。location
指的是 Azure 存储账户的位置,即 Snowflake 术语中的 区域。
启用事件网格资源提供商¶
执行以下命令以注册事件网格资源提供商。请注意,仅当您以前未将事件网格与 Azure 账户搭配使用时,才需要执行此步骤:
az provider register --namespace Microsoft.EventGrid
az provider show --namespace Microsoft.EventGrid --query "registrationState"
为数据文件创建存储账户¶
执行以下命令以创建存储账户来存储数据文件。此账户必须是 Blob 存储(即 BlobStorage
类别)或 GPv2(即 StorageV2
类别)账户,因为只有这两种账户类型支持事件消息。
备注
如果您已经有 Blob 存储或 GPv2 账户,则可以使用该账户。
例如,创建 Blob 存储账户:
az storage account create --resource-group <resource_group_name> --name <storage_account_name> --sku Standard_LRS --location <location> --kind BlobStorage --access-tier Hot
其中:
resource_group_name
是在 创建资源组 中创建的资源组的名称。storage_account_name
是新存储账户的名称。location
是您的 Azure 存储账户的位置。
为存储队列创建存储账户¶
执行以下命令以创建存储账户来托管存储队列。此账户必须是 GPv2 账户,因为只有此类账户才支持将事件消息发送到存储队列。
备注
如果已经有一个 GPv2 账户,则可以使用该账户托管数据文件和存储队列。
例如,创建 GPv2 账户:
az storage account create --resource-group <resource_group_name> --name <storage_account_name> --sku Standard_LRS --location <location> --kind StorageV2
其中:
resource_group_name
是在 创建资源组 中创建的资源组的名称。storage_account_name
是新存储账户的名称。location
是您的 Azure 存储账户的位置。
创建存储队列¶
单个 Azure 队列存储队列可以收集许多事件网格订阅的事件消息。为获得最佳性能,Snowflake 建议创建单个存储队列以容纳与 Snowflake 相关的所有订阅。
执行以下命令以创建存储队列。存储队列会存储一组消息,在本例中是来自事件网格的事件消息:
az storage queue create --name <storage_queue_name> --account-name <storage_account_name>
其中:
storage_queue_name
是新存储队列的名称。storage_account_name
是在 为存储队列创建存储账户 中创建的存储账户的名称。
导出存储账户和队列 IDs 以供参考¶
执行以下命令以设置存储账户和队列 IDs 的环境变量,这些指令稍后将请求这些变量:
Linux 或 macOS:
export storageid=$(az storage account show --name <data_storage_account_name> --resource-group <resource_group_name> --query id --output tsv) export queuestorageid=$(az storage account show --name <queue_storage_account_name> --resource-group <resource_group_name> --query id --output tsv) export queueid="$queuestorageid/queueservices/default/queues/<storage_queue_name>"
Windows:
set storageid=$(az storage account show --name <data_storage_account_name> --resource-group <resource_group_name> --query id --output tsv) set queuestorageid=$(az storage account show --name <queue_storage_account_name> --resource-group <resource_group_name> --query id --output tsv) set queueid="%queuestorageid%/queueservices/default/queues/<storage_queue_name>"
其中:
data_storage_account_name
是在 为数据文件创建存储账户 中创建的存储账户的名称。queue_storage_account_name
是在 为存储队列创建存储账户 中创建的存储账户的名称。resource_group_name
是在 创建资源组 中创建的资源组的名称。storage_queue_name
是在 创建存储队列 中创建的存储队列的名称。
安装事件网格扩展程序¶
执行以下命令以安装 Azure CLI 的事件网格扩展程序:
az extension add --name eventgrid
创建事件网格订阅¶
执行以下命令以创建事件网格订阅。订阅主题可告知事件网格要跟踪哪些事件:
Linux 或 macOS:
az eventgrid event-subscription create \ --source-resource-id $storageid \ --name <subscription_name> --endpoint-type storagequeue \ --endpoint $queueid \ --advanced-filter data.api stringin CopyBlob PutBlob PutBlockList FlushWithClose SftpCommit
Windows:
az eventgrid event-subscription create \ --source-resource-id %storageid% \ --name <subscription_name> --endpoint-type storagequeue \ --endpoint %queueid% \ -advanced-filter data.api stringin CopyBlob PutBlob PutBlockList FlushWithClose SftpCommit
其中:
storageid
和queueid
是在 导出存储账户和队列 IDs 以供参考 中设置的存储账户和队列 ID 环境变量。subscription_name
是新事件网格订阅的名称。
第 2 步:创建通知集成¶
通知集成是一个 Snowflake 对象,在 Snowflake 和第三方云消息队列服务(如 Azure 事件网格)之间提供一个接口。
备注
单个通知集成支持单个 Azure 存储队列。在多个通知集成中引用同一存储队列可能会导致目标表中缺少数据,因为事件通知会在多个通知集成之间拆分。因此,如果管道引用了与现有管道相同的存储队列,则会阻止管道创建。
检索存储队列 URL 和租户 ID¶
登录 Microsoft Azure 门户。
导航到 Storage account » Queue service » Queues。记录在 创建存储队列 中创建的队列的 URL,以供以后参考。URL 具有以下格式:
https://<storage_account_name>.queue.core.windows.net/<storage_queue_name>
导航到 Azure Active Directory » Properties。记录 Tenant ID 值以供以后参考。需要目录 ID 或 租户 ID 来生成授予 Snowflake 访问事件网格订阅的许可 URL。
创建通知集成¶
使用 CREATE NOTIFICATION INTEGRATION 命令创建通知集成。
备注
只有账户管理员(具有 ACCOUNTADMIN 角色的用户)或具有全局 CREATE INTEGRATION 权限的角色才能执行此 SQL 命令。
通知集成的 Azure 服务主体与为存储集成创建的服务主体不同。
CREATE NOTIFICATION INTEGRATION <integration_name>
ENABLED = true
TYPE = QUEUE
NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
AZURE_STORAGE_QUEUE_PRIMARY_URI = '<queue_URL>'
AZURE_TENANT_ID = '<directory_ID>';
其中:
integration_name
是新集成的名称。queue_URL
和directory_ID
是在 检索存储队列 URL 和租户 ID 中记录的队列 URL 和租户 ID。
例如:
CREATE NOTIFICATION INTEGRATION my_notification_int
ENABLED = true
TYPE = QUEUE
NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
AZURE_STORAGE_QUEUE_PRIMARY_URI = 'https://myqueue.queue.core.windows.net/mystoragequeue'
AZURE_TENANT_ID = 'a123bcde-1234-5678-abc1-9abc12345678';
授予 Snowflake 对存储队列的访问权限¶
请注意,本部分中的特定步骤需要在本地安装 Azure CLI。
执行 DESCRIBE INTEGRATION 命令以检索许可 URL:
DESC NOTIFICATION INTEGRATION <integration_name>;
其中:
integration_name
是您在 创建通知集成 中创建的集成的名称。
请注意以下列中的值:
- AZURE_CONSENT_URL:
Microsoft 权限请求页面的 URL。
- AZURE_MULTI_TENANT_APP_NAME:
为账户创建的 Snowflake 客户端应用程序的名称。在本部分后面的步骤中,需要授予此应用程序必要的权限,使其获取允许主题的访问令牌。
在 Web 浏览器中,导航到 AZURE_CONSENT_URL 列中的 URL。该页面显示 Microsoft 权限请求页面。
点击 Accept 按钮。此操作允许为 Snowflake 账户创建的 Azure 服务主体获取租户内任何资源的访问令牌。仅当您授予服务主体对容器的适当权限时,才能成功获取访问令牌(请参阅下一步)。
Microsoft 权限请求页面会重定向到 Snowflake 公司站点 (snowflake.com)。
登录 Microsoft Azure 门户。
导航到 Azure Active Directory » Enterprise applications。验证是否列出了在本部分第 2 步中记录的 Snowflake 应用程序标识符。
重要
如果以后在 Azure AD 中删除 Snowflake 应用程序,通知集成将会停止工作。
导航到 Queues »
storage_queue_name
,其中storage_queue_name
是在 创建存储队列 中创建的存储队列的名称。点击 Access Control (IAM) » Add role assignment。
搜索 Snowflake 服务主体。这是 DESC NOTIFICATION INTEGRATION 输出(第 1 步中)中 AZURE_MULTI_TENANT_APP_NAME 属性的身份。在 AZURE_MULTI_TENANT_APP_NAME 属性中搜索下划线 之前 的字符串。
重要
Azure 可能需要一个小时或更长时间才能创建通过此部分中的 Microsoft 请求页面请求的 Snowflake 服务主体。如果服务主体不能立即使用,我们建议等待一两个小时,然后再次搜索。
如果删除服务主体,通知集成将停止工作。
授予 Snowflake 应用程序以下权限:
Role: 存储队列数据贡献者
Assign access to: Azure AD 用户、组或服务主体
Select:
appDisplayName
值。
Snowflake 应用程序标识符现在应该列于 Storage Queue Data Contributor 下方(同一对话框上)。
第 3 步:创建暂存区(如果需要)¶
使用 CREATE STAGE 命令创建引用 Azure 容器的外部暂存区。Snowpipe 会从暂存区中获取数据文件,并在加载到目标表之前将它们临时加入队列。
或者,您可以使用现有的外部暂存区。
备注
要配置对云存储位置的安全访问,请参阅 配置对 Cloud Storage 的安全访问 (本主题内容)。
要在 CREATE STAGE 语句中引用存储集成,角色必须具有对存储集成对象的 USAGE 权限。
以下示例会在活动架构中为用户会话创建一个名为 mystage
的暂存区。云存储 URL 包括路径 load/files
。该暂存区引用名为 my_storage_int
的存储集成。
USE SCHEMA snowpipe_db.public; CREATE STAGE mystage URL = 'azure://myaccount.blob.core.windows.net/mycontainer/load/files/' STORAGE_INTEGRATION = my_storage_int;
备注
将 blob.core.windows.net
端点用于所有受支持类型的 Azure Blob 存储账户,包括 Data Lake Storage Gen2。
第 4 步:创建已启用自动引入的管道¶
使用 CREATE PIPE 命令创建管道。管道定义了 Snowpipe 用来将数据从引入队列加载到目标表中的 COPY INTO <table> 语句。
例如,在 snowpipe_db.public
架构中创建管道,将数据从暂存在 mystage
暂存区中的文件加载到 mytable
表中:
create pipe snowpipe_db.public.mypipe auto_ingest = true integration = 'MY_NOTIFICATION_INT' as copy into snowpipe_db.public.mytable from @snowpipe_db.public.mystage file_format = (type = 'JSON');
其中:
notification_integration_name
是在 第 2 步:创建通知集成 中创建的通知集成的名称。
重要
集成名称必须全部使用大写输入。
验证 COPY INTO 中的存储位置引用,<table> statement does not overlap with the reference in existing pipes in the account. Otherwise, multiple pipes could load the same set of data files into the target tables. For example, this situation can occur when multiple pipe definitions reference the same storage location with different levels of granularity, such as
<storage_location>/path1/
and<storage_location>/path1/path2/
. In this example, if files are staged in<storage_location>/path1/path2/
,两个管道都将加载这些文件的副本。执行 SHOW PIPES、查询 Account Usage 中的 PIPES 视图,或查询 Information Schema 中的 PIPES 视图以查看账户内所有管道定义中的 COPY INTO <table> 语句,
现已配置附带自动引入功能的 Snowpipe!
当新数据文件添加到 Azure 容器时,事件消息会告知 Snowpipe,将它们加载到管道中定义的目标表中。
第 5 步:加载历史文件¶
要加载在配置事件网格消息 之前 存在于外部暂存区中的任何数据文件积压,请执行 ALTER PIPE ...REFRESH 语句。
第 6 步:删除暂存文件¶
在成功加载数据并且不再需要这些文件后删除暂存文件。有关信息,请参阅 在 Snowpipe 加载数据后删除暂存文件。
SYSTEM$PIPE_STATUS 输出¶
SYSTEM$PIPE_STATUS 函数检索管道当前状态的 JSON 表示形式。
对于将 AUTO_INGEST 设置为 TRUE 的管道,该函数返回包含以下名称/值对的 JSON 对象(如果适用于当前管道状态):
{"executionState":"<value>","oldestFileTimestamp":<value>,"pendingFileCount":<value>,"notificationChannelName":"<value>","numOutstandingMessagesOnChannel":<value>,"lastReceivedMessageTimestamp":"<value>","lastForwardedMessageTimestamp":"<value>","error":<value>,"fault":<value>}
有关输出值的说明,请参阅 SQL 函数的引用主题。