选择 2:使用 AWS Lambda 实现 Snowpipe 自动化

AWS Lambda 是一种计算服务,在被事件触发时运行,并执行已加载到系统中的代码。您可以修改本主题中提供的 Python 示例代码,并创建一个调用 Snowpipe REST API 的 Lambda 函数,以从外部暂存区(即 S3 桶;不支持 Azure 容器)加载数据。此函数将部署到您的 AWS 账户,并托管在该处。在 Lambda 中定义的事件(例如,当 S3 桶中的文件被更新时)调用此 Lambda 函数,并运行 Python 代码。

本主题介绍配置 Lambda 函数的必要步骤,让该函数自动使用 Snowpipe 微批量地连续加载数据。

备注

本主题假定您已按照 使用 Snowpipe REST API 做好加载数据的准备 中的说明配置了 Snowpipe。

本主题内容:

第 1 步:编写用于调用 Snowpipe REST API 的 Python 代码

Python 示例代码

from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.backends import default_backend

import os

with open("./rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

# List of files in the stage specified in the pipe definition
ingest_manager = SimpleIngestManager(account='<account_identifier>',
                   host='<account_identifier>.snowflakecomputing.cn',
                   user='<user_login_name>',
                   pipe='<db_name>.<schema_name>.<pipe_name>',
                   private_key=private_key_text)

def handler(event, context):
  for record in event['Records']:
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']

    print("Bucket: " + bucket + " Key: " + key)
    # List of files in the stage specified in the pipe definition
    # wrapped into a class
    staged_file_list = []
    staged_file_list.append(StagedFile(key, None))

    print('Pushing file list to ingest REST API')
    resp = ingest_manager.ingest_files(staged_file_list)
Copy

备注

示例代码不考虑错误处理。例如,它不会重试失败的 ingest_manager 调用。

在使用示例代码之前,请进行以下更改:

  1. 更新安全参数:

    private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """

    指定您在 使用密钥对身份验证和密钥轮换 (请参阅 使用 Snowpipe REST API 做好加载数据的准备)中创建的私钥文件的内容。

    使用 PRIVATE_KEY_PASSPHRASE 环境变量指定用于解密私钥文件的密码:

    • Linux 或 macOS:

      export PRIVATE_KEY_PASSPHRASE='<passphrase>'
      
      Copy
    • Windows:

      set PRIVATE_KEY_PASSPHRASE='<passphrase>'
      
      Copy
  2. 更新会话参数:

    account='<account_identifier>'

    指定账户的唯一标识符(由 Snowflake 提供)。请参阅 host 描述。

    host='<account_identifier>.snowflakecomputing.cn'

    为 Snowflake 账户指定唯一的主机名。

    账户标识符的首选格式如下:

    organization_name-account_name

    Snowflake 组织和账户的名称。有关详细信息,请参阅 格式 1(首选):您所在组织的账户名称

    如果需要,还可以指定 账户定位器,以及托管该账户的 区域云平台。有关详细信息,请参阅 格式 2:区域中的账户定位器

    user='<user_login_name>'

    指定将运行 Snowpipe 代码的 Snowflake 用户的登录名。

    pipe='<db_name>.<schema_name>.<pipe_name>'

    指定用于加载数据的管道的完全限定名称,格式为 <db_name>.<schema_name>.<pipe_name>

  3. 在文件对象列表中指定要导入的文件的路径:

    staged_file_list = []

    指定的路径必须 相对于 文件所在的暂存区。包括每个文件的完整名称,包括文件扩展名。例如,gzip 压缩的 CSV 文件的扩展名可能是 .csv.gz

  4. 将文件保存在方便的位置。

本主题中的其余说明假定文件名为 SnowpipeLamdbaCode.py

第 2 步:创建 Lambda 函数部署包

按照以下说明为 Lambda 构建 Python 运行时环境,并添加您在 第 1 步:编写用于调用 Snowpipe REST API 的 Python 代码 (本主题内容)中修改的 Snowpipe 代码。有关这些步骤的更多信息,请参阅 AWS Lambda 部署包文档 (http://docs.aws.amazon.com/lambda/latest/dg/with-s3-example-deployment-pkg.html) (请参阅 Python 说明)。

重要

以下步骤中的脚本是一个代表性示例,并假定您正根据使用 YUM 包管理器(依赖于 RPM)的 Amazon Machine Instance (AMI) 来创建 AWS EC2 Linux 实例。如果您选择基于 Debian 的 Linux AMI,请相应地更新脚本。

  1. 按照 AWS EC2 说明 (http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EC2_GetStarted.html#ec2-launch-instance) 创建一个 AWS EC2 Linux 实例。此实例将提供计算资源来运行 Snowpipe 代码。

  2. 使用 SCP (安全复制)将 Snowpipe 代码文件复制到新的 AWS EC2 实例:

    scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
    
    Copy

    其中:

    • <path> 是本地 SnowpipeLambdaCode.py 文件的路径。

    • <machine>.<region_id> 是 EC2 实例的 DNS 名称(例如 ec2-54-244-54-199.us-west-2.compute.amazonaws.com)。

      DNS 名称显示在 Amazon EC2 控制台的 Instances 屏幕上。

  3. 使用 SSH (安全 SHell)连接到 EC2 实例:

    ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
    
    Copy
  4. 在 EC2 实例上安装 Python 和相关库:

    sudo yum install -y gcc zlib zlib-devel openssl openssl-devel
    
    wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz
    
    tar -xzvf Python-3.6.1.tgz
    
    cd Python-3.6.1 && ./configure && make
    
    sudo make install
    
    sudo /usr/local/bin/pip3 install virtualenv
    
    /usr/local/bin/virtualenv ~/shrink_venv
    
    source ~/shrink_venv/bin/activate
    
    pip install Pillow
    
    pip install boto3
    
    pip install requests
    
    pip install snowflake-ingest
    
    Copy
  5. 创建 .zip 部署包 (Snowpipe.zip):

    cd $VIRTUAL_ENV/lib/python3.6/site-packages
    
    zip -r9 ~/Snowpipe.zip .
    
    cd ~
    
    zip -g Snowpipe.zip SnowpipeLambdaCode.py
    
    Copy

第 3 步:为 Lambda 创建一个 AWS IAM 角色

按照 AWS Lambda 文档 (http://docs.aws.amazon.com/lambda/latest/dg/with-s3-example-create-iam-role.html) 所述,创建一个 IAM 角色来执行 Lambda 函数。

记录该角色的 IAM Amazon 资源名称 (ARN) (http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html#identifiers-arns)。您将在下一步中使用它。

第 4 步:创建 Lambda 函数

上传您在 第 2 步:创建 Lambda 函数部署包 (本主题内容)中创建的 .zip 部署包,以创建 Lambda 函数:

aws lambda create-function \
--region us-west-2 \
--function-name IngestFile \
--zip-file fileb://~/Snowpipe.zip \
--role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \
--handler SnowpipeLambdaCode.handler \
--runtime python3.6 \
--profile adminuser \
--timeout 10 \
--memory-size 1024
Copy

对于 --role,请指定您在 第 3 步:为 Lambda 创建一个 AWS IAM 角色 (本主题内容)中记录的角色 ARN。

记录输出中新函数的 ARN。您将在下一步中使用它。

第 5 步:允许调用 Lambda 函数

授予 S3 调用函数所需的权限。

对于 --source-arn,指定您在 第 4 步:创建 Lambda 函数 (本主题内容)中记录的函数 ARN。

aws lambda add-permission \
--function-name IngestFile \
--region us-west-2 \
--statement-id enable-ingest-calls \
--action "lambda:InvokeFunction" \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::<SourceBucket> \
--source-account <aws_account_id> \
--profile adminuser
Copy

第 6 步:注册 Lambda 通知事件

按照 Amazon S3 事件通知 (http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html) 中的说明,注册 Lambda 通知事件。在输入字段中,指定您在 第 4 步:创建 Lambda 函数 (本主题内容)中记录的函数 ARN。

语言: 中文