适用于 AWS 的异步远程服务示例

本主题包含一个异步 AWS Lambda 函数(远程服务)示例。您可以按照 第 1 步:在管理控制台中创建远程服务(AWS Lambda 函数) 中所述的相同步骤创建此示例函数。

本主题内容:

代码概述

文档的这一部分提供有关在 AWS 上创建异步外部函数的信息。(在实现第一个异步外部函数之前,可能需要阅读异步外部函数的 概念概述。)

在 AWS 上,异步远程服务必须克服以下限制:

  • 由于 HTTP POST 和 GET 是彼此独立的请求,远程服务必须保留有关 POST 请求启动的工作流的信息,以便稍后可以由 GET 请求查询状态。

    通常,每个 HTTP POST 和 HTTP GET 均在单独的进程或线程中调用处理程序函数的独立实例。彼此独立的实例不共享内存。为了使 GET 处理程序能够读取状态或已处理的数据,GET 处理程序必须访问 AWS 上可用的共享存储资源。

  • POST 处理程序发送初始 HTTP 202 响应代码的唯一方法是通过终止处理程序执行的 return 语句(或等效语句)。因此,在返回 HTTP 202 之前,POST 处理程序必须启动一个 独立的 进程(或线程),来执行远程服务的实际数据处理工作。此独立进程通常需要访问 GET 处理程序可见的存储。

为了克服这些限制,异步远程服务采用的一种方法是使用 3 个进程(或线程)和共享存储:

异步远程服务的流程图示

在此模型中,流程负责:

  • HTTP POST 处理程序:

    • 读取输入数据。在 Lambda 函数中,这是从处理程序函数的 event 输入参数的主体中读取的。

    • 读取批次 ID。在 Lambda 函数中,这是从 event 输入参数的标头中读取的。

    • 启动数据处理过程,并将数据和批次 ID 传递给它。数据通常在调用期间传递,但可以通过将其写入到外部存储来传递。

    • 在数据处理进程和 HTTP GET 处理程序进程都能访问的共享存储中记录批次 ID。

    • 如果需要,记录此批次尚未完成的处理。

    • 如果未检测到错误,则返回 HTTP 202。

  • 数据处理代码:

    • 读取输入数据。

    • 处理数据。

    • 使结果可供 GET 处理程序使用(将结果数据写入共享存储,或提供可用于查询结果的 API)。

    • 通常会更新此批次的状态(例如,从 IN_PROGRESS 更改为 SUCCESS)以指示结果已准备好供读取。

    • 退出。(可选)此过程可以返回错误指示符。Snowflake 不会直接看到此错误指示符(Snowflake 只能看到来自 POST 处理程序和 GET 处理程序的 HTTP 返回代码),但从数据处理过程返回错误指示符可能会在调试期间有所帮助。

  • GET 处理程序:

    • 读取批次 ID。在 Lambda 函数中,这是从 event 输入参数的标头中读取的。

    • 读取存储以获取此批次的当前状态(例如 IN_PROGRESSSUCCESS)。

    • 如果处理仍在进行中,则返回 202。

    • 如果处理成功完成,则:

      • 读取结果。

      • 清理存储。

      • 返回结果和 HTTP 代码 200。

    • 如果存储状态指示错误,则:

      • 清理存储。

      • 返回错误代码。

    请注意,如果处理花费的时间长度足以发送多个 HTTP GET 请求,则可能会为批处理调用多次 GET 处理程序。

此模型有许多可能的变体。例如:

  • 批次 ID 和状态可在数据处理过程开始时写入,而不是在 POST 过程结束时写入。

  • 数据处理可以在单独的函数(例如单独的 Lambda 函数)中完成,甚至可以作为完全独立的服务完成。

  • 数据处理代码不一定需要写入共享存储。相反,处理后的数据可通过另一种方式提供。例如,API 可以接受批次 ID 作为参数并返回数据。

实现代码应考虑到处理时间过长或失败的可能性,因此必须清理任何不完整的结果,以免浪费存储空间。

存储机制必须可跨多个进程(或者线程)共享。可能的存储机制包括:

  • AWS 提供的存储机制,例如:

    • 磁盘空间(例如 Amazon Elastic File System (EFS) (https://aws.amazon.com/efs/))。

    • 通过 AWS 提供的本地数据库服务器(例如 Amazon DynamoDB (https://aws.amazon.com/dynamodb/))。

  • 位于 AWS 外部但可从 AWS 访问的存储。

在上述 3 个进程中,每个进程的代码都可以编写为 3 个单独的 Lambda 函数(一个用于 POST 处理程序、一个用于数据处理函数、一个用于 GET 处理程序),也可编写为能以不同方式调用的单个函数。

下面的示例 Python 代码是一个 Lambda 函数,可以 分别 为 POST、数据处理和 GET 进程调用。

示例代码

此代码显示了一个带输出的示例查询。此示例中的重点是这三个进程及其交互方式,而非共享存储机制 (DynamoDB) 或数据转换(情绪分析)。此代码的结构设置支持轻松将示例存储机制和数据转换替换为不同的机制和数据转换。

为简单起见,以下示例:

  • 对一些重要值(例如 AWS 区域)进行了硬编码。

  • 假设存在某些资源(例如 Dynamo 中的 Jobs 表)。

import json
import time
import boto3

HTTP_METHOD_STRING = "httpMethod"
HEADERS_STRING = "headers"
BATCH_ID_STRING = "sf-external-function-query-batch-id"
DATA_STRING = "data"
REGION_NAME = "us-east-2"

TABLE_NAME = "Jobs"
IN_PROGRESS_STATUS = "IN_PROGRESS"
SUCCESS_STATUS = "SUCCESS"

def lambda_handler(event, context):
    # this is called from either the GET or POST
    if (HTTP_METHOD_STRING in event):
        method = event[HTTP_METHOD_STRING]
        if method == "POST":
            return initiate(event, context)
        elif method == "GET":
            return poll(event, context)
        else:
            return create_response(400, "Function called from invalid method")

    # if not called from GET or POST, then this lambda was called to
    # process data
    else:
        return process_data(event, context)


# Reads batch_ID and data from the request, marks the batch_ID as being processed, and
# starts the processing service.
def initiate(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    data = json.loads(event["body"])[DATA_STRING]

    lambda_name = context.function_name

    write_to_storage(batch_id, IN_PROGRESS_STATUS, "NULL")
    lambda_response = invoke_process_lambda(batch_id, data, lambda_name)

    # lambda response returns 202, because we are invoking it with
    # InvocationType = 'Event'
    if lambda_response["StatusCode"] != 202:
        response = create_response(400, "Error in inititate: processing lambda not started")
    else:
        response = {
            'statusCode': lambda_response["StatusCode"]
        }

    return response


# Processes the data passed to it from the POST handler. In this example,
# the processing is to perform sentiment analysis on text.
def process_data(event, context):
    data = event[DATA_STRING]
    batch_id = event[BATCH_ID_STRING]

    def process_data_impl(data):
        comprehend = boto3.client(service_name='comprehend', region_name=REGION_NAME)
        # create return rows
        ret = []
        for i in range(len(data)):
            text = data[i][1]
            sentiment_response = comprehend.detect_sentiment(Text=text, LanguageCode='en')
            sentiment_score = json.dumps(sentiment_response['SentimentScore'])
            ret.append([i, sentiment_score])
        return ret

    processed_data = process_data_impl(data)
    write_to_storage(batch_id, SUCCESS_STATUS, processed_data)

    return create_response(200, "No errors in process")


# Repeatedly checks on the status of the batch_ID, and returns the result after the
# processing has been completed.
def poll(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    processed_data = read_data_from_storage(batch_id)

    def parse_processed_data(response):
        # in this case, the response is the response from DynamoDB
        response_metadata = response['ResponseMetadata']
        status_code = response_metadata['HTTPStatusCode']

        # Take action depending on item status
        item = response['Item']
        job_status = item['status']
        if job_status == SUCCESS_STATUS:
            # the row number is stored at index 0 as a Decimal object,
            # we need to convert it into a normal int to be serialized to JSON
            data = [[int(row[0]), row[1]] for row in item['data']]
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'data': data
                })
            }
        elif job_status == IN_PROGRESS_STATUS:
            return {
                'statusCode': 202,
                "body": "{}"
            }
        else:
            return create_response(500, "Error in poll: Unknown item status.")

    return parse_processed_data(processed_data)


def create_response(code, msg):
    return {
        'statusCode': code,
        'body': msg
    }


def invoke_process_lambda(batch_id, data, lambda_name):
    # Create payload to be sent to processing lambda
    invoke_payload = json.dumps({
        BATCH_ID_STRING: batch_id,
        DATA_STRING: data
    })

    # Invoke processing lambda asynchronously by using InvocationType='Event'.
    # This allows the processing to continue while the POST handler returns HTTP 202.
    lambda_client = boto3.client('lambda', region_name=REGION_NAME,)
    lambda_response = lambda_client.invoke(
        FunctionName=lambda_name,
        InvocationType='Event',
        Payload=invoke_payload
    )
    # returns 202 on success if InvocationType = 'Event'
    return lambda_response


def write_to_storage(batch_id, status, data):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    # Put in progress item in table
    item_to_store = {
        'batch_id': batch_id,
        'status': status,
        'data': data,
        'timestamp': "{}".format(time.time())
    }
    db_response = table.put_item(
        Item=item_to_store
    )


def read_data_from_storage(batch_id):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    response = table.get_item(Key={'batch_id': batch_id},
                          ConsistentRead=True)
    return response
Copy

示例调用和输出

下面是对异步外部函数的示例调用以及示例输出,包括情绪分析结果:

create table test_tb(a string);
insert into test_tb values
    ('hello world'),
    ('I am happy');
select ext_func_async(a) from test_tb;

Row | EXT_FUNC_ASYNC(A)
0   | {"Positive": 0.47589144110679626, "Negative": 0.07314028590917587, "Neutral": 0.4493273198604584, "Mixed": 0.0016409909585490823}
1   | {"Positive": 0.9954453706741333, "Negative": 0.00039307220140472054, "Neutral": 0.002452891319990158, "Mixed": 0.0017087293090298772}
Copy

有关示例代码的备注

  • 数据处理函数通过调用以下代码来实现调用:

    lambda_response = lambda_client.invoke(
        ...
        InvocationType='Event',
        ...
    )
    
    Copy

    如上所示,InvocationType 应为“Event”,因为第二个进程(或线程)必须是异步的,并且 Event 是通过 invoke() 方法可用的唯一非阻塞调用类型。

  • 数据处理函数返回 HTTP 200 代码。但此 HTTP 200 代码不会直接返回给 Snowflake。Snowflake 看不到任何 HTTP 200,直到 GET 轮询状态,并看到数据处理函数成功完成此批次的处理为止。

语言: 中文