将请求和响应转换器与远程服务的数据结合使用

使用请求和响应转换器,您可以更改外部函数使用的远程服务发送和接收的数据格式。

本主题内容:

目的

当 Snowflake 向远程服务发送数据时,Snowflake 根据 以下规则 格式化数据。同样,当 Snowflake 从远程服务接收数据时,Snowflake 希望根据相同的规则对数据进行格式化。

许多远程服务都希望以不同的格式处理数据。使用请求和响应转换器,您可以方便地执行以下操作:

  • 将数据从 Snowflake 格式转换为远程服务的本地输入格式(请求转换器)。

  • 将数据从远程服务的本地输出格式转换为 Snowflake 的格式(响应转换器)。

SQL 实现

要在 Snowflake 格式和远程服务的本地输入格式之间转换数据,可以使用 JavaScript UDFs (用户定义函数)。您几乎总是编写一对 UDFs:一个转换请求,一个转换响应。

Snowflake 将这些函数作为每个外部函数调用的一部分进行调用。例如,对于对远程服务的请求,Snowflake 调用请求转换器函数,向其传递 Snowflake 格式的数据,然后获取返回的数据并将其发送到远程服务。当远程服务返回数据时,Snowflake 调用响应转换器函数将数据转换回 Snowflake 理解的格式。

从用户的角度来看,在转换器转换时调用外部函数与在没有转换器的情况下调用外部函数相同。在 CREATE EXTERNAL FUNCTION 语句中指定转换器后,会自动调用它们。

一个外部函数一次最多可以有一个请求转换器和一个响应转换器。

请求和响应转换器 UDFs 可以是 安全 UDFs

将转换器功能分配给外部函数

若要指定将哪个用户定义函数用作转换器,请在 CREATE EXTERNAL FUNCTION 语句中包含 REQUEST_TRANSLATORRESPONSE_TRANSLATOR 子句。每个都采用要在运行时使用的转换器函数的名称。

例如:

CREATE EXTERNAL FUNCTION f(...)
    RETURNS OBJECT
    ...
    REQUEST_TRANSLATOR = my_request_translator_udf
    RESPONSE_TRANSLATOR = my_response_translator_udf
    ...
    AS <url_of_proxy_and_resource>;
Copy

CREATE EXTERNAL FUNCTION 语句中指定转换器的语法如下所示:

CREATE EXTERNAL FUNCTION f(...)
    RETURNS OBJECT
    ...
    [ REQUEST_TRANSLATOR = <request_translator_udf_name> ]
    [ RESPONSE_TRANSLATOR = <response_translator_udf_name> ]
    ...
Copy

其中:

request_translator_udf_name

请求转换器函数的名称。

response_translator_udf_name

响应转换器函数的名称。

REQUEST_TRANSLATORRESPONSE_TRANSLATOR 参数各接受一个 OBJECT 类型的参数。

还可以在 ALTER FUNCTION 命令中指定请求或响应转换器。您可以:

  • 在外部函数还没有转换器时添加转换器。

  • 替换现有转换器。

  • 移除转换器。

使用 SET 关键字添加新转换器或替换现有转换器。

要添加或替换转换器,请执行以下操作:

ALTER FUNCTION ...
    SET [REQUEST_TRANSLATOR | RESPONSE_TRANSLATOR] = <udf_name>;
Copy

其中

udf_name

之前创建的 JavaScript UDF 的名称。

要移除转换器,请执行以下操作:

ALTER FUNCTION ...
    UNSET [REQUEST_TRANSLATOR | RESPONSE_TRANSLATOR];
Copy

SQL 的要求

  • CREATE EXTERNAL FUNCTIONALTER FUNCTION 语句中转换器函数的名称应为:

    • 限定名称(例如 MyDatabase.MySchema.MyJavaScriptUDF)。

    • 在与使用它们的外部函数相同的数据库和架构中定义。

  • 当在 CREATE EXTERNAL FUNCTIONALTER FUNCTION 语句中指定了转换器时,转换器 UDF 必须已经存在。不能先指定名称,然后再创建 UDF,即使在创建 UDF 之前没有调用外部函数。

  • 在没有从所有使用它的外部功能中将其弃用之前,不应移除用作转换器的 UDF。(在调用外部函数时,如果转换器不存在,则 Snowflake 将失败并显示错误。)

  • 如果转换器 UDF 被修改(通过 ALTER FUNCTION),必须保持相同的接口要求。如果不保留接口要求,则在运行外部函数之前引发异常。

JavaScript 实现

在运行时,SQL 将一个 OBJECT 传递给转换器 UDF。JavaScript 代码将此作为 JavaScript 对象接收。

实现请求转换器

请求转换器输入属性

转换器 UDF 收到一个名为 event 的 JavaScript 对象。该对象包含以下属性:

  • bodydata 字段的格式与现有 Snowflake 行集批处理相同(即行数组)。

    例如:

    {
      "body": {
              "data": [
                        [0,"cat"],
                        [1,"dog"]
                      ]
              }
    }
    
    Copy

    现有数据嵌套在外部主体下。

  • serviceUrl:要调用的外部函数的定义 URL。

  • contextHeaders:包含所有与上下文相关的标头的对象,其中名称是字段名称。例如,对象可以包含字段名“SF_CONTEXT_CURRENT_DATABASE”,相应的值将是包含当前数据库名称的字符串。

请求转换器输出属性

请求转换器返回一个对象,该对象具有用于与外部服务 API 网关通信的字段。该对象有三个可选字段:

  • body:定义要传递给服务的实际正文。如果不定义,就没有正文。body 值应该是远程服务期望格式的字符串或 JSON 对象。如果值是字符串,则该字符串可以包含内部结构(例如与 JSON 兼容)。如果该值是 JSON 对象,则该对象被转换为一个字符串,以便它可以作为 HTTP POST 命令字符串的一部分。

  • urlSuffix:设置服务 URL 的后缀,该后缀添加到 serviceUrl 值的末尾。此后缀也允许包含查询参数。参数名称和值必须是 URL 编码的。例如,如果要将名为 a 的参数设置为 my param 值,需要对空格字符进行 URL 编码,因此该参数将是 ?a=my%20param

  • translatorData:从请求转换器传递到响应转换器。该字段可以传递上下文信息,如输入正文、服务 URL 或后缀或上下文标头。

这三个字段都是可选的。但是,实际上,大多数请求转换器至少返回正文数据。

实现响应转换器

响应转换器输入属性

响应转换器函数的输入参数是一个对象。以下示例使用包含两个属性的 EVENT

  • body:要从外部服务响应解码的响应。

  • translatorData:如果请求转换器返回此字段,则 Snowflake 会将其传递给响应转换器。

响应转换器输出属性

响应转换器响应作为 body 元素下的对象返回;该格式是现有的外部函数格式(行数组)。例如:

{
  "body": {
          "data": [
                    [0, "Life"],
                    [1, "the universe"],
                    [2, "and everything"]
                  ]
           }
}
Copy

转换器函数的要求

每个转换器 UDF 必须满足以下要求:

  • 必须是 JavaScript UDF

  • 必须恰好接受一个 OBJECT 类型的参数,表示一批行。

  • 必须返回一个 OBJECT 类型的值,该值也表示一批行。

  • 必须是标量 UDF (为传入的每一行 (OBJECT) 返回一行)。

    备注

    虽然转换器是标量的,但传递给转换器的 OBJECT 可以(通常会)在 OBJECT 的 JSON 中嵌入多行。

  • 响应转换器 UDF 返回的行数和顺序(在 OBJECT 内部)必须与传递给请求转换器 UDF 的行数和顺序(在 OBJECT 内部)相同。

请求转换器和响应转换器示例

下面的示例显示了请求转换器和响应转换器,用于将数据转换为进行情感分析的外部服务 Amazon Comprehend BatchDetectSentiment (https://docs.aws.amazon.com/comprehend/latest/dg/API_BatchDetectSentiment.html) 所需的格式。请求转换器对 HTTP 请求进行调整,以匹配后端服务期望的格式。

要使用转换器,需要一个 API 网关。此示例使用了一个 API 网关,该网关已经被配置为与情感分析服务进行对话。有关如何作为后端与 Amazon Web Services (AWS) 服务集成的更多信息,请参阅 AWS 文档中的 使用 API Gateway 控制台设置 API 集成请求 (https://docs.aws.amazon.com/apigateway/latest/developerguide/how-to-method-settings-console.html)。

在添加转换器之前让 API 集成成功运行很有帮助。

设置

设置一个数据库来保存演示数据。

选择有权创建外部函数的角色:

USE ROLE ACCOUNTADMIN;
Copy

指定要使用的仓库、数据库和架构:

USE WAREHOUSE w;
USE DATABASE a;
USE SCHEMA b;
Copy

创建一个表来保存测试句子:

CREATE TABLE demo(vc varchar);
INSERT INTO demo VALUES('Today is a good day'),('I am feeling mopey');
Copy

转换前的请求正文

此外部函数没有请求转换器或响应转换器:

CREATE OR REPLACE EXTERNAL FUNCTION ComprehendSentiment(thought varchar)
RETURNS VARIANT
API_INTEGRATION = aws_comprehend_gateway
AS 'https://<MY_GATEWAY>.execute-api.us-east-1.amazonaws.com/test/comprehend_proxy';
Copy

可以使用演示表中的测试数据调用外部函数:

SELECT ComprehendSentiment(vc), vc FROM demo;
Copy

生成的请求正文使用 Snowflake 外部函数数据格式:

{"body":{"data:" [[0, "Today is a good day"],[1,"I am feeling mopey"]]}}
Copy

但是,外部情绪分析服务需要一种不同的格式,用于指定语言和字符串数组:

{"body": { Language: "en", TextList: [ "Today is a good day", "I am feeling mopey"]}}
Copy

下一节介绍如何添加请求转换器以将请求正文更改为所需格式。

转换请求正文

通过使用请求转换器,可以将上述默认输入(采用 Snowflake 数据格式)转换为外部服务所需的格式。

以下 SQL 创建了一个 awscomprehendrequest_translator 转换器函数。

CREATE OR REPLACE FUNCTION AWSComprehendrequest_translator(EVENT OBJECT)
RETURNS OBJECT
LANGUAGE JAVASCRIPT AS
'
var textlist = []
for(i = 0; i < EVENT.body.data.length; i++) {
   let row = EVENT.body.data[i];
   // row[0] is the row number and row[1] is the input text.
   textlist.push(row[1]); //put text into the textlist
}
// create the request for the service. Also pass the input request as part of the output.
return { "body": { "LanguageCode": "en", "TextList" : textlist }, "translatorData": EVENT.body }
';
Copy

在请求转换器函数中,代码:

  • 循环遍历每个输入行。对于每一行,都将 row[1] 中的字符串添加到 textlist 数组中。row[0] 的值是行号,可以忽略。

  • 返回一个 JSON 正文,该正文具有与外部服务的要求相匹配的语言代码和文本列表。

  • 通过 translatorData 字段返回数据。此数据由响应转换器使用。在此示例中,发送的是原始输入数据。将使用响应转换器中输入数据的长度来了解有多少个输入请求。

可以通过直接调用请求转换器进行测试。

SELECT AWSComprehendrequest_translator(parse_json('{"body":{"data": [[0, "I am so happy we got a sunny day for my birthday."], [1, "$$$$$."], [2, "Today is my last day in the old house."]]}}'));
Copy

请求转换器将正文调整为外部服务预期的格式。

{"body":{
   "LanguageCode": "en",
   "TextList": [
      "I am so happy we got a sunny day for my birthday.",
      "$$$$$.",
      "Today is my last day in the old house."
               ]
         },
   "translatorData": {
      "data": [[0, "I am so happy we got a sunny day for my birthday."],
               [1, "$$$$$."],
               [2, "Today is my last day in the old house."]]
                     }
}
Copy

添加响应转换器之前的响应正文

来自外部服务的响应正文如下所示。

{"body":{
   "ErrorList": [ { "ErrorCode": 57, "ErrorMessage": "Language unknown", "Index": 1} ],
   "ResultList":[ { "Index": 0, "Sentiment": "POSITIVE",
                    "SentimentScore": { "Mixed": 25, "Negative": 5, "Neutral": 1, "Positive": 90 }},
                  { "Index": 2, "Sentiment": "NEGATIVE",
                    "SentimentScore": { "Mixed": 25, "Negative": 75, "Neutral": 30, "Positive": 20 }}
                ]
         }
}
Copy

转换响应正文

响应转换器处理从外部服务返回的结果。结果包含 ErrorList 中的错误和 ResultList 中的结果的组合。

响应转换器代码将这些结果组合在一起,以形成一个完整的集合,该集合与传递给外部服务的行的顺序相匹配。响应转换器以 Snowflake 格式返回结果。

以下 SQL 创建了一个 awscomprehendresponse_translator 转换器函数。

CREATE OR REPLACE FUNCTION AWSComprehendresponse_translator(EVENT OBJECT)
RETURNS OBJECT
LANGUAGE JAVASCRIPT AS
'
// Combine the scored results and the errors into a single list.
var responses = new Array(EVENT.translatorData.data.length);
// output format: array of {
// "Sentiment": (POSITIVE, NEUTRAL, MIXED, NEGATIVE, or ERROR),
// "SentimentScore": <score>, "ErrorMessage": ErrorMessage }.
// If error, set ErrorMessage; otherwise, set SentimentScore.
// Insert good results into proper position.
for(i = 0; i < EVENT.body.ResultList.length; i++) {
   let row = EVENT.body.ResultList[i];
   let result = [row.Index, {"Sentiment": row.Sentiment, "SentimentScore": row.SentimentScore}]
   responses[row.Index] = result
}
// Insert errors.
for(i = 0; i < EVENT.body.ErrorList.length; i++) {
   let row = EVENT.body.ErrorList[i];
   let result = [row.Index, {"Sentiment": "Error", "ErrorMessage": row.ErrorMessage}]
   responses[row.Index] = result
}
return { "body": { "data" : responses } };
';
Copy

在响应转换器函数中,代码:

  • translatorData 数组长度的输入大小初始化一个名为 responses 的数组。您将 translatorData 从请求转换器发送到响应转换器以传递测试字符串的原始列表。

  • 循环访问每个非错误结果,并将其放入结果列表中。

  • 循环显示错误结果,并将其放入结果列表中。结果列表有一个索引位置,告诉您它是什么条目。生成结果的顺序必须与输入顺序匹配。结果列表还包含情绪信息。

收集完所有响应后,它们会以 Snowflake 期望的格式在 JSON 正文中返回。

下面的直接测试将返回具有正确格式的 JSON 正文。

SELECT AWSComprehendresponse_translator(
    parse_json('{
        "translatorData": {
            "data": [[0, "I am so happy we got a sunny day for my birthday."],
                    [1, "$$$$$."],
                    [2, "Today is my last day in the old house."]]
                          }
        "body": {
            "ErrorList":  [ { "ErrorCode": 57,  "ErrorMessage": "Language unknown",  "Index": 1 } ],
            "ResultList": [
                            { "Index": 0,  "Sentiment": "POSITIVE",
                              "SentimentScore": { "Mixed": 25,  "Negative": 5,  "Neutral": 1,  "Positive": 90 }
                            },
                            { "Index": 2, "Sentiment": "NEGATIVE",
                              "SentimentScore": { "Mixed": 25,  "Negative": 75,  "Neutral": 30,  "Positive": 20 }
                            }
                          ]
            },
        }'
    )
);
Copy

将转换器分配给外部函数

通过将函数名作为值分配给 request_translatorresponse_translator 参数,向外部函数添加请求和响应转换器函数。这样,当外部函数运行时,它们将被自动调用。

CREATE OR REPLACE EXTERNAL FUNCTION ComprehendSentiment(thought varchar)
RETURNS VARIANT
API_INTEGRATION = aws_comprehend_gateway
request_translator = db_name.schema_name.AWSComprehendrequest_translator
response_translator = db_name.schema_name.AWSComprehendresponse_translator
AS 'https://<MY_GATEWAY>.execute-api.us-east-1.amazonaws.com/test/comprehend_proxy';
Copy

可以描述函数以获取有关它的信息。

DESCRIBE FUNCTION ComprehendSentiment(VARCHAR);
Copy

调用外部函数

通过使用单个句子调用外部函数来测试外部函数。

SELECT ComprehendSentiment('Today is a good day');
Copy

您将看到情绪分析结果。

{"Sentiment": "POSITIVE",
 "SentimentScore":{"Mixed":0.002436627633869648,
                   "Negative":0.0014803812373429537,
                   "Neutral":0.015923455357551575,
                   "Positive": 0.9801595211029053}}
Copy

通过使用多个句子调用外部函数来测试外部函数。使用您之前创建的同一 demo 表。

SELECT ComprehendSentiment(vc), vc FROM demo;
Copy

将显示情绪分析结果。

显示情绪分析结果的表。

调用外部函数时,请求转换器会自动将数据转换为外部服务所需的格式。然后,响应转换器自动将来自外部服务的响应转换回 Snowflake 所需的格式。

测试请求和响应转换器的提示

  • 测试用例值通常是 OBJECT 值(键值对的集合)。这些文件的格式应符合 这些规则 的要求。

  • 您可以通过传入转换为字符串的示例输入来开始测试请求转换器或响应转换器。例如:

    select my_request_translator_function(parse_json('{"body": {"data": [ [0,"cat",867], [1,"dog",5309] ] } }'));
    
    Copy

    PARSE_JSON() 的输入必须是 JSON 格式的字符串。)

  • 如果合适,用 NULL 值进行测试。

    • 在测试用例中至少包含一个 SQL NULL 值。

    • 在测试用例中至少包含一个 JSON NULL 值。

  • 转换请求和转换响应通常是相反的过程。概念:

    my_response_translator_udf(my_request_translator_udf(x)) = x
    
    Copy

    如果数据格式匹配,可以使用此特性来帮助测试请求转换器和响应转换器。创建一个具有良好测试值的表,然后执行类似于以下内容的命令:

    SELECT test_case_column
        FROM test_table
        WHERE my_response_translator_udf(my_request_translator_udf(x)) != x;
    
    Copy

    查询不应返回任何行。

    请注意,转换请求和转换响应并不总是 完全 相反的。有关它们可能不相反的示例,请参阅 TO_JSON() 函数 文档的“用法说明”部分中关于相反函数的讨论。

语言: 中文