Option 2: Automate Snowpipe with AWS Lambda¶
AWS Lambda 是一种计算服务,在被事件触发时运行,并执行已加载到系统中的代码。您可以修改本主题中提供的 Python 示例代码,并创建一个调用 Snowpipe REST API 的 Lambda 函数,以从外部暂存区(即 S3 桶;不支持 Azure 容器)加载数据。此函数将部署到您的 AWS 账户,并托管在该处。在 Lambda 中定义的事件(例如,当 S3 桶中的文件被更新时)调用此 Lambda 函数,并运行 Python 代码。
本主题介绍配置 Lambda 函数的必要步骤,让该函数自动使用 Snowpipe 微批量地连续加载数据。
备注
本主题假定您已按照 Data loading preparation using the Snowpipe REST API 中的说明配置了 Snowpipe。
第 1 步:编写用于调用 Snowpipe REST API 的 Python 代码¶
Python 示例代码
备注
示例代码不考虑错误处理。例如,它不会重试失败的 ingest_manager 调用。
在使用示例代码之前,请进行以下更改:
更新安全参数:
private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """指定您在 Use key pair authentication & key rotation (请参阅 Data loading preparation using the Snowpipe REST API)中创建的私钥文件的内容。
使用
PRIVATE_KEY_PASSPHRASE环境变量指定用于解密私钥文件的密码:Linux 或 macOS:
Windows:
更新会话参数:
account='<account_identifier>'指定账户的唯一标识符(由 Snowflake 提供)。请参阅
host描述。host='<account_identifier>.snowflakecomputing.cn'为 Snowflake 账户指定唯一的主机名。
账户标识符的首选格式如下:
organization_name-account_nameSnowflake 组织和账户的名称。有关详细信息,请参阅 格式 1(首选):您所在组织的账户名称。
如果需要,还可以指定 账户定位器,以及托管该账户的 区域 和 云平台。有关详细信息,请参阅 格式 2:区域中的账户定位器。
user='<user_login_name>'指定将运行 Snowpipe 代码的 Snowflake 用户的登录名。
pipe='<db_name>.<schema_name>.<pipe_name>'指定用于加载数据的管道的完全限定名称,格式为
<db_name>.<schema_name>.<pipe_name>。
在文件对象列表中指定要导入的文件的路径:
staged_file_list = []指定的路径必须 相对于 文件所在的暂存区。包括每个文件的完整名称,包括文件扩展名。例如,gzip 压缩的 CSV 文件的扩展名可能是
.csv.gz。
将文件保存在方便的位置。
本主题中的其余说明假定文件名为 SnowpipeLambdaCode.py。
第 2 步:创建 Lambda 函数部署包¶
按照以下说明为 Lambda 构建 Python 运行时环境,并添加您在 第 1 步:编写用于调用 Snowpipe REST API 的 Python 代码 (本主题内容)中修改的 Snowpipe 代码。有关这些步骤的更多信息,请参阅 AWS Lambda 部署包文档 (http://docs.aws.amazon.com/lambda/latest/dg/with-s3-example-deployment-pkg.html) (请参阅 Python 说明)。
重要
以下步骤中的脚本是一个代表性示例,并假定您正根据使用 YUM 包管理器(依赖于 RPM)的 Amazon Machine Instance (AMI) 来创建 AWS EC2 Linux 实例。如果您选择基于 Debian 的 Linux AMI,请相应地更新脚本。
按照 AWS EC2 说明 (http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EC2_GetStarted.html#ec2-launch-instance) 创建一个 AWS EC2 Linux 实例。此实例将提供计算资源来运行 Snowpipe 代码。
使用 SCP (安全复制)将 Snowpipe 代码文件复制到新的 AWS EC2 实例:
其中:
<path>是本地SnowpipeLambdaCode.py文件的路径。<machine>.<region_id>是 EC2 实例的 DNS 名称(例如ec2-54-244-54-199.us-west-2.compute.amazonaws.com)。DNS 名称显示在 Amazon EC2 控制台的 Instances 屏幕上。
使用 SSH (安全 SHell)连接到 EC2 实例:
在 EC2 实例上安装 Python 和相关库:
创建 .zip 部署包 (
Snowpipe.zip):
第 3 步:为 Lambda 创建一个 AWS IAM 角色¶
按照 AWS Lambda 文档 (http://docs.aws.amazon.com/lambda/latest/dg/with-s3-example-create-iam-role.html) 所述,创建一个 IAM 角色来执行 Lambda 函数。
记录该角色的 IAM Amazon 资源名称 (ARN) (http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html#identifiers-arns)。您将在下一步中使用它。
第 4 步:创建 Lambda 函数¶
上传您在 第 2 步:创建 Lambda 函数部署包 (本主题内容)中创建的 .zip 部署包,以创建 Lambda 函数:
对于 --role,请指定您在 第 3 步:为 Lambda 创建一个 AWS IAM 角色 (本主题内容)中记录的角色 ARN。
记录输出中新函数的 ARN。您将在下一步中使用它。
第 5 步:允许调用 Lambda 函数¶
授予 S3 调用函数所需的权限。
对于 --source-arn,指定您在 第 4 步:创建 Lambda 函数 (本主题内容)中记录的函数 ARN。
第 6 步:注册 Lambda 通知事件¶
按照 Amazon S3 事件通知 (http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html) 中的说明,注册 Lambda 通知事件。在输入字段中,指定您在 第 4 步:创建 Lambda 函数 (本主题内容)中记录的函数 ARN。