表格 JavaScript UDFs (UDTFs)¶
可以使用 JavaScript 为用户定义的 表函数 (UDTF) 编写处理程序。
处理程序代码处理在 UDTF 调用中收到的行,并返回表格结果。收到的行由 Snowflake 隐式分区,或在函数调用的语法中显式分区。可以使用编写的回调函数来处理单个行以及将它们分组到的分区。
JavaScript 代码必须满足以下要求才能使 UDTF 有效:
代码必须定义单个字面量 JavaScript 对象。
定义的对象必须包含名为
processRow()
的回调函数。有关更多信息,请参阅 对象回调函数。
重要
如果 JavaScript 代码不满足这些要求,则仍将创建 UDTF,但在查询中调用该函数时将会失败。
备注
表格函数 (UDTFs) 的输入实参限制为 500 个,输出列限制为 500 列。
对象回调函数¶
通过 JavaScript 代码,Snowflake 在执行查询期间调用回调函数来与 UDTF 进行交互。以下框架概述了所有可用的回调函数及其预期签名:
{
processRow: function (row, rowWriter, context) {/*...*/},
finalize: function (rowWriter, context) {/*...*/},
initialize: function (argumentInfo, context) {/*...*/},
}
请注意,只有 processRow()
是必需的,其他函数都是可选的。
processRow()
¶
对于输入关系中的每一行,都会调用一次此回调函数。在 row
对象中传递 processRow()
的实参。对于在用于创建 UDTF 的 CREATE FUNCTION 语句中定义的每个实参,row
对象上都有一个同名且全大写的属性。此属性的值是当前行的实参的值。(该值将转换为 JavaScript 值。)
用户提供的代码使用 rowWriter
实参来生成输出行。rowWriter
对象定义单个函数 writeRow()
。writeRow()
函数接受一个实参,即*行对象*,它是输出表中表示为 JavaScript 对象的单行。对于在 CREATE FUNCTION 命令的 RETURNS 子句中定义的每一列,可以在行对象上定义相应的属性。行对象上该属性的值将成为输出关系中相应列的值。行对象上没有相应属性的任何输出列在结果表中都将具有 NULL 值。
finalize()
¶
在所有行都传递给 processRow()
之后,会调用一次 finalize()
回调函数。(如果将数据分组到 分区 中,则在将该分区中的所有行传递给 processRow()
后,会为每个分区调用一次 finalize()
。)
此回调函数可用于输出任何此类状态:可能已使用传递给 processRow()
的相同行 rowWriter
在 processRow()
中进行了聚合。
备注
虽然 Snowflake 支持大型分区,会调整超时以成功处理分区,但特别大的分区可能导致处理超时(例如 finalize
需要太长时间才能完成)。如果您需要针对特定使用场景调整超时阈值,请联系 Snowflake 支持部门 (https://community.snowflake.com/s/article/How-To-Submit-a-Support-Case-in-Snowflake-Lodge)。
initialize()
¶
在调用 processRow()
之前,会为每个分区调用一次此回调函数。
使用 initialize()
设置结果计算期间所需的任何状态。
initialize()
函数的 argumentInfo
参数包含有关用户定义函数的实参的元数据。例如,如果将 UDF 定义为:
CREATE FUNCTION f(argument_1 INTEGER, argument_2 VARCHAR) ...
则 argumentInfo
包含有关 argument_1
和 argument_2
的信息。
argumentInfo
包含这些实参中每一个的属性。每个属性都是一个具有以下值的对象:
type
:字符串。此实参的类型。isConst
:布尔。如果为 true,则此实参的值是常量(即对于每一行都相同)。constValue
:如果isConst`(如上面定义)为 true,则此条目包含实参的常量值;否则,此字段为 :code:`undefined
。
initialize()
函数无法生成输出行。
回调函数的一般使用说明¶
所有三个回调函数都接受一个
context
对象;该对象保留供将来使用,目前为空。小心
修改
context
对象可能会产生未定义的行为。可以根据需要在该对象上定义其他函数和属性,以便在 UDTF 中使用
回调函数的实参是位置性的,可以命名为任何名称;但是,出于本主题的目的,上述名称用于其余的讨论和示例。
分区¶
在许多情况下,您可能希望将行分组到*分区*中。分区有两个主要优势:
它允许您根据共同特征对行进行分组。这可让您一起处理组中的所有行,并独立处理每个组。
它允许 Snowflake 划分工作负载,以增加并行化机会,从而提高性能。
例如,可以按每只股票一组的形式对股票价格数据进行分区。单个公司的所有股票价格可以一起处理,而不同公司的组则独立处理。
以下语句在各个分区上调用命名为 js_udtf()
的 UDTF。每个分区都包含 PARTITION BY
表达式计算结果为相同值(例如,相同的股票代码)的所有行。
SELECT * FROM tab1, TABLE(js_udtf(tab1.c1, tab1.c2) OVER (PARTITION BY <expression>)) ...;
指定要用于 UDTF 的分区表达式时,Snowflake 会:
为每个分区调用一次
initialize()
。为该分区中的每一行调用一次
processRow()
。为每个分区调用一次 :code:`finalize()`(在处理该分区中的最后一行之后)。
您可能还希望按指定的顺序处理每个分区的行。例如,如果要计算股票价格随时间变化的移动平均值,则按时间戳对股票价格进行排序(以及按股票或公司进行分区)。以下示例演示如何执行此操作:
SELECT * FROM tab1, TABLE(js_udtf(tab1.c1, tab1.c2) OVER (PARTITION BY <expression> ORDER BY <expression>)) ...;
指定 ORDER BY
子句时,将按 ORDER BY
表达式定义的顺序处理行。具体而言,行将按照 ORDER BY
表达式定义的顺序传递给 processRow()
。
在大多数情况下,对数据进行分区几乎会自动增加并行化的机会,从而提高性能。Snowflake 通常并行执行多个 UDTF 实例。(对于此讨论,JavaScript UDTF 的实例定义为用于在 Snowflake 中表示函数的 JavaScript 对象的一个实例。) 行的每个分区都会传递给 UDTF 的一个实例。
但请注意,分区和 UDTF 实例之间不一定存在一对一的关系。尽管每个分区仅由一个 UDTF 处理,但反之不一定如此 – 一个 UDTF 实例可以处理多个分区。因此,使用 initialize()
和 finalize()
专门设置和清除每个分区非常重要,这样做能避免将累积值从一个分区的处理“结转”到另一个分区的处理等问题。
结果列¶
本部分介绍 JavaScript UDTF 返回的列:
在 CREATE FUNCTION 命令的 RETURNS 子句中定义的所有列都在输出关系中返回。
传入 UDTF 的所有列也会返回。
在 processRow()
回调中生成的行和 finalize()
生成的行存在以下区别:
在
processRow()
中生成行时,Snowflake 可以将其与输入相关联,即作为row
实参传入函数中的输入。请注意,如果给定的processRow()
调用生成多行,则输入属性将复制到每个输出行中。对于在
processRow()
中生成的行,所有输入列都将在输出关系中重复。在
finalize()
回调中,Snowflake 无法将其与任何单行相关联,因为没有要关联到的“当前行”。对于在
finalize()
回调中生成的行,只有在 PARTITION BY 子句中使用的列是重复的(因为这些列对于当前分区中的任何行都是相同的);所有其他属性都是 NULL。如果未指定 PARTITION BY 子句,则所有输入属性均为 NULL。
在查询中调用 JavaScript UDTFs¶
在查询的 FROM 子句中调用 UDTF 时,请在 TABLE 关键字后面的括号内指定 UDTF 的名称和实参。
换言之,在调用 UDTF 时,对 TABLE 关键字使用如下形式:
SELECT ...
FROM TABLE ( udtf_name (udtf_arguments) )
备注
有关调用 UDFs 和 UDTFs 的更多信息,请参阅 调用 UDF。
无分区¶
此简单示例演示如何调用 UDTF。此示例传递字面量值。UDTF 只按照与参数传递顺序相反的顺序返回参数。此示例不使用分区。
SELECT * FROM TABLE(js_udtf(10.0::FLOAT, 20.0::FLOAT));
+----+----+
| Y | X |
|----+----|
| 20 | 10 |
+----+----+
此示例调用 UDTF,并从另一个表向其传递值。在此示例中,为名为 tab1
的表中的每一行都调用一次名为 js_udtf
的 UDTF。每次调用该函数时,都从当前行的 c1
和 c2
列向其传递值。如上所述,在没有 PARTITION BY
子句的情况下调用 UDTF。
SELECT * FROM tab1, TABLE(js_udtf(tab1.c1, tab1.c2)) ;
当不使用分区时,Snowflake 执行引擎会根据多个因素对输入本身进行分区,这些因素包括处理函数的仓库的大小、输入关系的基数等。在此模式下运行时,用户代码不能对分区进行任何假设。这在以下情况下最有用:函数只需要单独查看行即可生成其输出,并且没有跨行聚合状态。
显式分区¶
也可以使用分区来调用 JavaScript UDTFs。例如:
SELECT * FROM tab1, TABLE(js_udtf(tab1.c1, tab1.c2) OVER (PARTITION BY tab1.c3 ORDER BY tab1.c1));
使用空 OVER
子句的显式分区¶
SELECT * FROM tab1, TABLE(js_udtf(tab1.c1, tab1.c2) OVER ());
空 OVER
子句表示每一行都属于同一个分区(即整个输入关系是一个分区)。
备注
使用空 OVER
子句调用 JavaScript UDTF 时应谨慎行事,因为这会限制 Snowflake 只能创建函数的一个实例,从而使 Snowflake 无法并行化计算。
示例 JavaScript UDTFs¶
本部分包含多个示例 JavaScript UDTFs。
基本 Hello World
示例¶
以下 JavaScript UDTF 未接受任何参数,并且始终返回相同的值。它主要用于说明目的:
CREATE OR REPLACE FUNCTION HelloWorld0()
RETURNS TABLE (OUTPUT_COL VARCHAR)
LANGUAGE JAVASCRIPT
AS '{
processRow: function f(row, rowWriter, context){
rowWriter.writeRow({OUTPUT_COL: "Hello"});
rowWriter.writeRow({OUTPUT_COL: "World"});
}
}';
SELECT output_col FROM TABLE(HelloWorld0());
输出:
+------------+
| OUTPUT_COL |
+============+
| Hello |
+------------+
| World |
+------------+
以下 JavaScript UDTF 也用于说明目的,但使用了一个输入参数。请注意,JavaScript 区分大小写,但 SQL 强制标识符为大写,因此,当 JavaScript 代码引用 SQL 参数名称时,JavaScript 代码必须使用大写。
另请注意,函数参数是通过 get_params()
函数中名为 row
的参数访问的:
CREATE OR REPLACE FUNCTION HelloHuman(First_Name VARCHAR, Last_Name VARCHAR)
RETURNS TABLE (V VARCHAR)
LANGUAGE JAVASCRIPT
AS '{
processRow: function get_params(row, rowWriter, context){
rowWriter.writeRow({V: "Hello"});
rowWriter.writeRow({V: row.FIRST_NAME}); // Note the capitalization and the use of "row."!
rowWriter.writeRow({V: row.LAST_NAME}); // Note the capitalization and the use of "row."!
}
}';
SELECT V AS Greeting FROM TABLE(HelloHuman('James', 'Kirk'));
输出:
+------------+
| GREETING |
+============+
| Hello |
+------------+
| James |
+------------+
| Kirk |
+------------+
说明回调函数的基本示例¶
以下 JavaScript UDTF 说明了所有 API 回调函数和各种输出列。它只是按原样返回所有行,并提供在每个分区中看到的字符数的计数。它还说明了如何使用 THIS
引用在整个分区中共享状态。请注意,该示例使用 initialize()
回调将计数器初始化为零;这是必需的,因为给定的函数实例可用于处理多个分区:
-- set up for the sample
CREATE TABLE parts (p FLOAT, s STRING);
INSERT INTO parts VALUES (1, 'michael'), (1, 'kelly'), (1, 'brian');
INSERT INTO parts VALUES (2, 'clara'), (2, 'maggie'), (2, 'reagan');
-- creation of the UDTF
CREATE OR REPLACE FUNCTION "CHAR_SUM"(INS STRING)
RETURNS TABLE (NUM FLOAT)
LANGUAGE JAVASCRIPT
AS '{
processRow: function (row, rowWriter, context) {
this.ccount = this.ccount + 1;
this.csum = this.csum + row.INS.length;
rowWriter.writeRow({NUM: row.INS.length});
},
finalize: function (rowWriter, context) {
rowWriter.writeRow({NUM: this.csum});
},
initialize: function(argumentInfo, context) {
this.ccount = 0;
this.csum = 0;
}}';
以下查询说明了如何在无分区的情况下在 parts
表上调用 CHAR_SUM
UDTF :
SELECT * FROM parts, TABLE(char_sum(s));
输出:
+--------+---------+-----+
| P | S | NUM |
+--------+---------+-----+
| 1 | michael | 7 |
| 1 | kelly | 5 |
| 1 | brian | 5 |
| 2 | clara | 5 |
| 2 | maggie | 6 |
| 2 | reagan | 6 |
| [NULL] | [NULL] | 34 |
+--------+---------+-----+
如果未指定分区,Snowflake 会自动定义分区。在此示例中,由于行数较少,因此仅创建一个分区(即仅执行一次 finalize()
调用)。请注意,最后一行在输入列中具有 NULL 值。
相同的查询,但具有显式分区:
SELECT * FROM parts, TABLE(char_sum(s) OVER (PARTITION BY p));
输出:
+--------+---------+-----+
| P | S | NUM |
+--------+---------+-----+
| 1 | michael | 7 |
| 1 | kelly | 5 |
| 1 | brian | 5 |
| 1 | [NULL] | 17 |
| 2 | clara | 5 |
| 2 | maggie | 6 |
| 2 | reagan | 6 |
| 2 | [NULL] | 17 |
+--------+---------+-----+
此示例在 p
列上进行分区,并生成两个分区。对于每个分区,在 finalize()
回调中返回一行,且总共生成两行,由 s
列中的 NULL 值区分。因为 p
是 PARTITION BY 列,所以,在 finalize()
中创建的行具有定义当前分区的 p
的值。
使用表值和其他 UDTFs 作为输入的扩展示例¶
此基本 UDTF 将 IP 地址的“范围”转换为完整的 IP 地址列表。输入的组成部分如下:IP 地址的前 3 段(例如 '192.168.1'
),然后是用于生成最后一段的范围起始和结束数字(例如 42
和 45
):
CREATE OR REPLACE FUNCTION range_to_values(PREFIX VARCHAR, RANGE_START FLOAT, RANGE_END FLOAT)
RETURNS TABLE (IP_ADDRESS VARCHAR)
LANGUAGE JAVASCRIPT
AS $$
{
processRow: function f(row, rowWriter, context) {
var suffix = row.RANGE_START;
while (suffix <= row.RANGE_END) {
rowWriter.writeRow( {IP_ADDRESS: row.PREFIX + "." + suffix} );
suffix = suffix + 1;
}
}
}
$$;
SELECT * FROM TABLE(range_to_values('192.168.1', 42::FLOAT, 45::FLOAT));
输出:
+--------------+
| IP_ADDRESS |
+==============+
| 192.168.1.42 |
+--------------+
| 192.168.1.43 |
+--------------+
| 192.168.1.44 |
+--------------+
| 192.168.1.45 |
+--------------+
在前面示例的基础上,您可能希望计算多个范围的单个 IP 地址。接下来的这条语句创建一个范围表,可用于扩展到各个 IP 地址。然后,查询将表中的行输入到 range_to_values()
UDTF,以返回各个 IP 地址:
CREATE TABLE ip_address_ranges(prefix VARCHAR, range_start INTEGER, range_end INTEGER);
INSERT INTO ip_address_ranges (prefix, range_start, range_end) VALUES
('192.168.1', 42, 44),
('192.168.2', 10, 12),
('192.168.2', 40, 40)
;
SELECT rtv.ip_address
FROM ip_address_ranges AS r, TABLE(range_to_values(r.prefix, r.range_start::FLOAT, r.range_end::FLOAT)) AS rtv;
输出:
+--------------+
| IP_ADDRESS |
+==============+
| 192.168.1.42 |
+--------------+
| 192.168.1.43 |
+--------------+
| 192.168.1.44 |
+--------------+
| 192.168.2.10 |
+--------------+
| 192.168.2.11 |
+--------------+
| 192.168.2.12 |
+--------------+
| 192.168.2.40 |
+--------------+
注意
在此示例中,FROM 子句中所用的语法与内部联接(即 FROM t1, t2
)的语法相同,但执行的操作不是真正的内部联接。实际行为是使用 ip_address changes
表中每一行的值调用 range_to_values()
函数。换句话说,相当于编写了以下代码:
for input_row in ip_address_ranges: output_row = range_to_values(input_row.prefix, input_row.range_start, input_row.range_end)
将值传递给 UDTF 的概念可以扩展到多个 UDTFs。下一个示例创建一个名为 UDTF 的 fake_ipv4_to_ipv6()
,该函数将 IPV4 地址“转换”为 IPV6 地址。然后,查询在涉及另一个 UDTF 的更复杂语句中调用该函数:
-- Example UDTF that "converts" an IPV4 address to a range of IPV6 addresses.
-- (for illustration purposes only and is not intended for actual use)
CREATE OR REPLACE FUNCTION fake_ipv4_to_ipv6(ipv4 VARCHAR)
RETURNS TABLE (IPV6 VARCHAR)
LANGUAGE JAVASCRIPT
AS $$
{
processRow: function f(row, rowWriter, context) {
rowWriter.writeRow( {IPV6: row.IPV4 + "." + "000.000.000.000"} );
rowWriter.writeRow( {IPV6: row.IPV4 + "." + "..."} );
rowWriter.writeRow( {IPV6: row.IPV4 + "." + "FFF.FFF.FFF.FFF"} );
}
}
$$;
SELECT ipv6 FROM TABLE(fake_ipv4_to_ipv6('192.168.3.100'));
输出:
+-------------------------------+
| IPV6 |
+===============================+
| 192.168.3.100.000.000.000.000 |
+-------------------------------+
| 192.168.3.100.... |
+-------------------------------+
| 192.168.3.100.FFF.FFF.FFF.FFF |
+-------------------------------+
以下查询使用前面创建的 fake_ipv4_to_ipv6
和 range_to_values()
UDTFs,以及来自 ip_address changes
表的输入。换句话说,它从一组 IP 地址范围开始,将它们转换为单个 IPV4 地址,然后获取每个 IPV4 地址并将其“转换”为 IPV6 地址范围:
SELECT rtv6.ipv6
FROM ip_address_ranges AS r,
TABLE(range_to_values(r.prefix, r.range_start::FLOAT, r.range_end::FLOAT)) AS rtv,
TABLE(fake_ipv4_to_ipv6(rtv.ip_address)) AS rtv6
WHERE r.prefix = '192.168.2' -- limits the output for this example
;
输出:
+------------------------------+
| IPV6 |
+==============================+
| 192.168.2.10.000.000.000.000 |
+------------------------------+
| 192.168.2.10.... |
+------------------------------+
| 192.168.2.10.FFF.FFF.FFF.FFF |
+------------------------------+
| 192.168.2.11.000.000.000.000 |
+------------------------------+
| 192.168.2.11.... |
+------------------------------+
| 192.168.2.11.FFF.FFF.FFF.FFF |
+------------------------------+
| 192.168.2.12.000.000.000.000 |
+------------------------------+
| 192.168.2.12.... |
+------------------------------+
| 192.168.2.12.FFF.FFF.FFF.FFF |
+------------------------------+
| 192.168.2.40.000.000.000.000 |
+------------------------------+
| 192.168.2.40.... |
+------------------------------+
| 192.168.2.40.FFF.FFF.FFF.FFF |
+------------------------------+
请注意,此示例使用了两次联接语法,但这两个操作都不是真正的联接;两者都是对 UDTF 的调用,而且使用表的输出或另一个 UDTF 作为输入。
真正的内部联接是不区分顺序的。例如,以下语句是相同的:
table1 INNER JOIN table2 ON ...
table2 INNER JOIN table1 ON ...
向 UDTF 输入值 不是 真正的联接,而且操作 不是 不区分顺序的。例如,以下查询与前面的示例相同,只是它颠倒了 UDTFs 子句中的 FROM 顺序:
SELECT rtv6.ipv6
FROM ip_address_ranges AS r,
TABLE(fake_ipv4_to_ipv6(rtv.ip_address)) AS rtv6,
TABLE(range_to_values(r.prefix, r.range_start::FLOAT, r.range_end::FLOAT)) AS rtv
WHERE r.prefix = '192.168.2' -- limits the output for this example
;
查询失败,并显示以下错误消息:
SQL compilation error: error line 3 at position 35 invalid identifier 'RTV.IP_ADDRESS'
rtv.ip_address
标识符无效,因为在使用之前未定义该标识符。在真正的联接中,这不会发生,但在使用联接语法处理 UDTFs 时,可能会发生此错误。
接下来,尝试运行一个语句,它将向 UDTF 输入值和真正的联接混合起来使用;但是,请注意,向 UDTF 输入值和执行内部联接都使用相同的语法,这可能会造成混淆:
-- First, create a small table of IP address owners.
-- This table uses only IPv4 addresses for simplicity.
DROP TABLE ip_address_owners;
CREATE TABLE ip_address_owners (ip_address VARCHAR, owner_name VARCHAR);
INSERT INTO ip_address_owners (ip_address, owner_name) VALUES
('192.168.2.10', 'Barbara Hart'),
('192.168.2.11', 'David Saugus'),
('192.168.2.12', 'Diego King'),
('192.168.2.40', 'Victoria Valencia')
;
-- Now join the IP address owner table to the IPv4 addresses.
SELECT rtv.ip_address, ipo.owner_name
FROM ip_address_ranges AS r,
TABLE(range_to_values(r.prefix, r.range_start::FLOAT, r.range_end::FLOAT)) AS rtv,
ip_address_owners AS ipo
WHERE ipo.ip_address = rtv.ip_address AND
r.prefix = '192.168.2' -- limits the output for this example
;
输出:
+--------------+-------------------+
| IP_ADDRESS | OWNER_NAME |
+==============+===================+
| 192.168.2.10 | Barbara Hart |
+--------------+-------------------+
| 192.168.2.11 | David Saugus |
+--------------+-------------------+
| 192.168.2.12 | Diego King |
+--------------+-------------------+
| 192.168.2.40 | Victoria Valencia |
+--------------+-------------------+
注意
前面的示例如上所述工作;但是,在将 UDTFs 与真正的联接结合使用时应小心,因为这可能会导致不确定和/或意外的行为。
另请注意,此行为将来可能会发生变更。