选择 2:使用 AWS Lambda 实现 Snowpipe 自动化¶
AWS Lambda 是一种计算服务,在被事件触发时运行,并执行已加载到系统中的代码。您可以修改本主题中提供的 Python 示例代码,并创建一个调用 Snowpipe REST API 的 Lambda 函数,以从外部暂存区(即 S3 桶;不支持 Azure 容器)加载数据。此函数将部署到您的 AWS 账户,并托管在该处。在 Lambda 中定义的事件(例如,当 S3 桶中的文件被更新时)调用此 Lambda 函数,并运行 Python 代码。
本主题介绍配置 Lambda 函数的必要步骤,让该函数自动使用 Snowpipe 微批量地连续加载数据。
备注
本主题假定您已按照 使用 Snowpipe REST API 做好加载数据的准备 中的说明配置了 Snowpipe。
本主题内容:
第 1 步:编写用于调用 Snowpipe REST API 的 Python 代码¶
Python 示例代码
from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.backends import default_backend
import os
with open("./rsa_key.p8", 'rb') as pem_in:
pemlines = pem_in.read()
private_key_obj = load_pem_private_key(pemlines,
os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
default_backend())
private_key_text = private_key_obj.private_bytes(
Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format
# List of files in the stage specified in the pipe definition
ingest_manager = SimpleIngestManager(account='<account_identifier>',
host='<account_identifier>.snowflakecomputing.cn',
user='<user_login_name>',
pipe='<db_name>.<schema_name>.<pipe_name>',
private_key=private_key_text)
def handler(event, context):
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print("Bucket: " + bucket + " Key: " + key)
# List of files in the stage specified in the pipe definition
# wrapped into a class
staged_file_list = []
staged_file_list.append(StagedFile(key, None))
print('Pushing file list to ingest REST API')
resp = ingest_manager.ingest_files(staged_file_list)
备注
示例代码不考虑错误处理。例如,它不会重试失败的 ingest_manager
调用。
在使用示例代码之前,请进行以下更改:
更新安全参数:
private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """
指定您在 使用密钥对身份验证和密钥轮换 (请参阅 使用 Snowpipe REST API 做好加载数据的准备)中创建的私钥文件的内容。
使用
PRIVATE_KEY_PASSPHRASE
环境变量指定用于解密私钥文件的密码:Linux 或 macOS:
export PRIVATE_KEY_PASSPHRASE='<passphrase>'
Windows:
set PRIVATE_KEY_PASSPHRASE='<passphrase>'
更新会话参数:
account='<account_identifier>'
指定账户的唯一标识符(由 Snowflake 提供)。请参阅
host
描述。host='<account_identifier>.snowflakecomputing.cn'
为 Snowflake 账户指定唯一的主机名。
账户标识符的首选格式如下:
organization_name-account_name
Snowflake 组织和账户的名称。有关详细信息,请参阅 格式 1(首选):您所在组织的账户名称。
如果需要,还可以指定 账户定位器,以及托管该账户的 区域 和 云平台。有关详细信息,请参阅 格式 2:区域中的账户定位器。
user='<user_login_name>'
指定将运行 Snowpipe 代码的 Snowflake 用户的登录名。
pipe='<db_name>.<schema_name>.<pipe_name>'
指定用于加载数据的管道的完全限定名称,格式为
<db_name>.<schema_name>.<pipe_name>
。
在文件对象列表中指定要导入的文件的路径:
staged_file_list = []
指定的路径必须 相对于 文件所在的暂存区。包括每个文件的完整名称,包括文件扩展名。例如,gzip 压缩的 CSV 文件的扩展名可能是
.csv.gz
。
将文件保存在方便的位置。
本主题中的其余说明假定文件名为 SnowpipeLamdbaCode.py
。
第 2 步:创建 Lambda 函数部署包¶
按照以下说明为 Lambda 构建 Python 运行时环境,并添加您在 第 1 步:编写用于调用 Snowpipe REST API 的 Python 代码 (本主题内容)中修改的 Snowpipe 代码。有关这些步骤的更多信息,请参阅 AWS Lambda 部署包文档 (http://docs.aws.amazon.com/lambda/latest/dg/with-s3-example-deployment-pkg.html) (请参阅 Python 说明)。
重要
以下步骤中的脚本是一个代表性示例,并假定您正根据使用 YUM 包管理器(依赖于 RPM)的 Amazon Machine Instance (AMI) 来创建 AWS EC2 Linux 实例。如果您选择基于 Debian 的 Linux AMI,请相应地更新脚本。
按照 AWS EC2 说明 (http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EC2_GetStarted.html#ec2-launch-instance) 创建一个 AWS EC2 Linux 实例。此实例将提供计算资源来运行 Snowpipe 代码。
使用 SCP (安全复制)将 Snowpipe 代码文件复制到新的 AWS EC2 实例:
scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
其中:
<path>
是本地SnowpipeLambdaCode.py
文件的路径。<machine>.<region_id>
是 EC2 实例的 DNS 名称(例如ec2-54-244-54-199.us-west-2.compute.amazonaws.com
)。DNS 名称显示在 Amazon EC2 控制台的 Instances 屏幕上。
使用 SSH (安全 SHell)连接到 EC2 实例:
ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
在 EC2 实例上安装 Python 和相关库:
sudo yum install -y gcc zlib zlib-devel openssl openssl-devel wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz tar -xzvf Python-3.6.1.tgz cd Python-3.6.1 && ./configure && make sudo make install sudo /usr/local/bin/pip3 install virtualenv /usr/local/bin/virtualenv ~/shrink_venv source ~/shrink_venv/bin/activate pip install Pillow pip install boto3 pip install requests pip install snowflake-ingest
创建 .zip 部署包 (
Snowpipe.zip
):cd $VIRTUAL_ENV/lib/python3.6/site-packages zip -r9 ~/Snowpipe.zip . cd ~ zip -g Snowpipe.zip SnowpipeLambdaCode.py
第 3 步:为 Lambda 创建一个 AWS IAM 角色¶
按照 AWS Lambda 文档 (http://docs.aws.amazon.com/lambda/latest/dg/with-s3-example-create-iam-role.html) 所述,创建一个 IAM 角色来执行 Lambda 函数。
记录该角色的 IAM Amazon 资源名称 (ARN) (http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html#identifiers-arns)。您将在下一步中使用它。
第 4 步:创建 Lambda 函数¶
上传您在 第 2 步:创建 Lambda 函数部署包 (本主题内容)中创建的 .zip
部署包,以创建 Lambda 函数:
aws lambda create-function \ --region us-west-2 \ --function-name IngestFile \ --zip-file fileb://~/Snowpipe.zip \ --role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \ --handler SnowpipeLambdaCode.handler \ --runtime python3.6 \ --profile adminuser \ --timeout 10 \ --memory-size 1024
对于 --role
,请指定您在 第 3 步:为 Lambda 创建一个 AWS IAM 角色 (本主题内容)中记录的角色 ARN。
记录输出中新函数的 ARN。您将在下一步中使用它。
第 5 步:允许调用 Lambda 函数¶
授予 S3 调用函数所需的权限。
对于 --source-arn
,指定您在 第 4 步:创建 Lambda 函数 (本主题内容)中记录的函数 ARN。
aws lambda add-permission \ --function-name IngestFile \ --region us-west-2 \ --statement-id enable-ingest-calls \ --action "lambda:InvokeFunction" \ --principal s3.amazonaws.com \ --source-arn arn:aws:s3:::<SourceBucket> \ --source-account <aws_account_id> \ --profile adminuser
第 6 步:注册 Lambda 通知事件¶
按照 Amazon S3 事件通知 (http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html) 中的说明,注册 Lambda 通知事件。在输入字段中,指定您在 第 4 步:创建 Lambda 函数 (本主题内容)中记录的函数 ARN。