用 Python 编写 UDTF¶
本主题内容:
您可以用 Python 实现用户定义的 表函数 (UDTF) 处理程序。此处理程序代码在调用 UDTF 时执行。本主题介绍如何使用 Python 实现处理程序并创建 UDTF。
UDTF 是返回表格结果的用户定义函数 (UDF)。有关使用 Python 实现的 UDF 处理程序的更多信息,请参阅 创建 Python UDFs。有关 UDFs 的更多一般信息,请参阅 用户定义函数概述。
在 UDTF 的处理程序中,您可以处理输入行(请参阅本主题中的 处理行)。您还可以具有针对每个输入分区执行的逻辑(请参阅本主题中的 处理分区)。
创建 Python UDTF 时,请执行以下操作:
使用在调用 UDTF 时 Snowflake 将调用的方法实现类。
有关更多详细信息,请参阅 实施处理程序 (本主题内容)。
在 SQL 中使用 CREATE FUNCTION 命令创建 UDTF,将类指定为处理程序。创建 UDTF 时,请指定:
UDTF 输入参数的数据类型。
UDTF 返回的列的数据类型。
调用 UDTF 时作为处理程序执行的代码。
用于实现处理程序的语言。
有关语法的更多信息,请参阅 使用 CREATE FUNCTION 创建 UDTF (本主题内容)。
您可以调用 UDF 或 UDTF,如 调用 UDF 中所述。
备注
表函数 (UDTFs) 限制输入实参为 500 个,输出列为 500 个。
Snowflake 目前支持使用以下版本的 Python 编写 UDTFs :
3.8
3.9
3.10
3.11
在 CREATE FUNCTION 语句中,将 runtime_version
设置为所需的版本。
实施处理程序¶
您实现处理程序类,将 UDTF 实参值处理为表格结果并处理分区输入。有关处理程序类示例,请参阅 处理程序类示例 (本主题内容)。
使用 CREATE FUNCTION 创建 UDTF 时,您将此类指定为 UDTF 的处理程序。有关用于创建函数的 SQL 的更多信息,请参阅 使用 CREATE FUNCTION 创建 UDTF (本主题内容)。
处理程序类实现在调用 UDTF 时 Snowflake 将调用的方法。此类包含 UDTF 的逻辑。
方法 |
要求 |
描述 |
---|---|---|
|
可选 |
初始化输入分区的有状态处理的状态。有关更多信息,请参阅 初始化处理程序 (本主题内容)。 |
|
必填 |
处理每个输入行,以元组形式返回表格值。Snowflake 调用此方法,从 UDTF 的实参传递输入。有关更多信息,请参阅 定义 process 方法 (本主题内容)。 |
|
可选 |
完成输入分区的处理,以元组形式返回表格值。有关更多信息,请参阅 完成分区处理 (本主题内容)。 |
请注意,从处理程序类中的任何方法引发异常都会导致处理停止。调用 UDTF 的查询失败,并显示错误消息。
备注
如果代码不满足此处所述的要求,UDTF 创建或执行可能会失败。Snowflake 将在 CREATE FUNCTION 语句执行时检测违规行为。
处理程序类示例¶
以下示例中的代码会创建 UDTF,其处理程序类处理分区中的行。process
方法处理每个输入行,并返回包含股票销售总成本的一行。处理分区中的行后,它(从其 end_partition
方法)返回分区中包含的所有销售的总额。
create or replace function stock_sale_sum(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
handler='StockSaleSum'
as $$
class StockSaleSum:
def __init__(self):
self._cost_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
cost = quantity * price
self._cost_total += cost
yield (symbol, cost)
def end_partition(self):
yield (self._symbol, self._cost_total)
$$;
以下示例中的代码调用前面的 UDF,从 stocks_table
表中的 symbol
、quantity
和 price
列传递值。有关调用 UDTF 的更多信息,请参阅 调用 UDF。
select stock_sale_sum.symbol, total
from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
初始化处理程序¶
您可以选择在处理程序类中实现 __init__
方法,Snowflake 将在处理程序开始处理行之前调用该方法。例如,可以使用此方法为处理程序建立某些分区范围的状态。__init__
方法可能不会生成输出行。
方法的签名必须采用以下形式:
def __init__(self):
例如,您可能希望:
初始化分区的状态,然后在
process
和end_partition
方法中使用此状态。执行长时间运行的初始化,每个分区只需执行一次,而不是每行执行一次。
备注
您还可以在处理程序类之外包含该代码,从而在分区处理开始之前(例如在类声明之前)执行逻辑。
有关处理分区的更多信息,请参阅 处理分区 (本主题内容)。
如果您使用 __init__
方法,请记住 __init__
具有以下特征:
只能将
self
视作实参。无法生成输出行。请为其使用
process
方法实现。针对每个分区调用一次,并且在调用
process
方法之前。
处理行¶
实现 Snowflake 将为每个输入行调用的 process
方法。
定义 process
方法¶
定义 process
方法,该方法接收从 SQL 类型转换的 UDTF 实参作为值,返回 Snowflake 将用于创建 UDTF 的表格返回值的数据。
方法的签名必须采用以下形式:
def process(self, *args):
process
方法必须满足以下条件:
有
self
参数。声明与 UDTF 参数对应的方法参数。
方法参数名称不必与 UDTF 参数名称匹配,但方法参数的声明顺序必须与 UDTF 参数的声明 顺序相同。
将 UDTF 实参值传递到方法时,Snowflake 会将值从 SQL 类型转换为您在方法中使用的 Python 类型。有关 Snowflake 如何在 SQL 和 Python 数据类型之间映射的信息,请参阅 SQL-Python 数据类型映射。
生成一个或多个元组(或返回包含元组的可迭代对象),其中元组的序列对应 UDTF 返回值列的序列。
元组元素的显示顺序必须与 UDTF 返回值列的声明 顺序相同。有关更多信息,请参阅 返回值 (本主题内容)。
Snowflake 会将值从 Python 类型转换为 UDTF 声明所需的 SQL 类型。有关 Snowflake 如何在 SQL 和 Python 数据类型之间映射的信息,请参阅 SQL-Python 数据类型映射。
如果处理程序类中的方法引发异常,则处理将停止。调用 UDTF 的查询将失败,并显示错误消息。如果 process
方法返回 None
,则处理将停止。(即使 process
方法返回 None
,仍会调用 end_partition
方法。)
process 方法示例
以下示例中的代码显示 StockSale
处理程序类,该类具有 process
方法,用于处理三个 UDTF 实参(symbol
、quantity
和 price
),返回单个行,其中包含两列(symbol
和 total
)。请注意, process
方法参数的声明顺序与 stock_sale
参数的声明顺序相同。process
方法的 yield
语句中的实参与 stock_sale
RETURNS TABLE 子句中声明的列采用相同的顺序。
create or replace function stock_sale(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
handler='StockSale'
as $$
class StockSale:
def process(self, symbol, quantity, price):
cost = quantity * price
yield (symbol, cost)
$$;
以下示例中的代码调用前面的 UDF,从 stocks_table
表中的 symbol
、quantity
和 price
列传递值。
select stock_sale.symbol, total
from stocks_table, table(stock_sale(symbol, quantity, price) over (partition by symbol));
返回值¶
返回输出行时,您可以使用 yield
或 return
(但不能同时使用两者)返回具有表格值的元组。如果方法返回或生成 None
,则针对当前行的处理将停止。
使用
yield
时,针对每个输出行执行单独的yield
语句。这是最佳实践,因为附带yield
的延迟评估可以实现更高效的处理,并有助于避免超时。元组中的每个元素成为 UDTF 返回的结果中的列值。
yield
实参的顺序必须匹配针对 CREATE FUNCTION 的 RETURNS TABLE 子句中返回值声明的列的顺序。以下示例中的代码返回表示两行的值。
def process(self, symbol, quantity, price): cost = quantity * price yield (symbol, cost) yield (symbol, cost)
请注意,由于 yield 实参是元组,因此在元组中传递单个值时,您必须包含尾随逗号,如以下示例所示。
yield (cost,)
使用
return
时,返回带元组的可迭代对象。元组中的每个值成为 UDTF 返回的结果中的列值。元组中列值的顺序必须匹配针对 CREATE FUNCTION 的 RETURNS TABLE 子句中返回值声明的列的顺序。
以下示例中的代码返回两行,每行包含两列:symbol 和 total。
def process(self, symbol, quantity, price): cost = quantity * price return [(symbol, cost), (symbol, cost)]
跳过行¶
要跳过输入行并处理下一行(例如,在验证输入行时),请让 process
方法返回以下其中一项:
使用
return
时,返回None
、包含None
的列表或空列表,以跳过该行。使用
yield
时,返回None
,以跳过一行。请注意,如果多次调用
yield
,则 Snowflake 会忽略返回None
的调用后的任何调用。
以下示例中的代码仅返回其 number
为正整数的行。如果 number
不是正数,则该方法返回 None
,以跳过当前行并继续处理下一行。
def process(self, number):
if number < 1:
yield None
else:
yield (number)
有状态处理和无状态处理¶
您可以实现处理程序以分区感知方式处理行,或者简单地逐行处理。
在 分区感知处理 中,处理程序包括用于管理分区范围状态的代码。这包括在分区处理开始时执行的
__init__
方法,以及 Snowflake 在处理分区的最后一行后调用的end_partition
方法。有关更多信息,请参阅 处理分区 (本主题内容)。在 分区无感知处理 中,处理程序无状态执行,忽略分区边界。
要以这种方式执行处理程序,请不要包含
__init__
或end_partition
方法。
处理分区¶
您可以使用按分区执行的代码(例如,管理状态)以及为分区中的每一行执行的代码来处理输入中的分区。
备注
有关在调用 UDTF 时指定分区的更多信息,请参阅 表函数和分区。
当查询包含分区时,它会使用指定的值(如列的值)汇总行。据说,处理程序接收的汇总行按该值分区。代码可以处理这些分区及其行,以便针对每个分区的处理都包含分区范围状态。
以下 SQL 示例中的代码查询股票销售信息。它执行 stock_sale_sum
UDTF,其输入按 symbol
列的值进行分区。
select stock_sale_sum.symbol, total
from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
请记住,即使对传入的行进行分区,代码也可能忽略分区分离,仅处理这些行。例如,您可以省略旨在处理分区范围状态的代码(例如处理程序类 __init__
方法和 end_partition
方法),仅实现 process
方法。有关更多信息,请参阅 有状态处理和无状态处理 (本主题内容)。
要将每个分区作为一个单元进行处理,您需要:
实现处理程序类
__init__
方法,用于初始化分区的处理。有关更多信息,请参阅 初始化处理程序 (本主题内容)。
使用
process
方法处理每一行时,包含分区感知代码。有关处理行的更多信息,请参阅 处理行 (本主题内容)。
实现
end_partition
方法以完成分区处理。有关更多信息,请参阅 完成分区处理 (本主题内容)。
下面介绍在包含旨在按分区执行的代码时处理程序调用的顺序。
在分区处理开始之前,Snowflake 使用处理程序类的
__init__
方法创建该类的实例。在此处,您可以建立分区范围状态。例如,您可以初始化实例变量,以保存从分区中的行计算得出的值。
对于分区中的每一行,Snowflake 都会调用
process
方法。每次执行该方法时,它都可以对状态值进行更改。例如,您可以让
process
方法更新实例变量的值。代码处理分区中的最后一行后,Snowflake 会调用
end_partition
方法。通过此方法,您可以返回包含要返回的分区级值的输出行。例如,您可以返回在处理分区中的行时一直更新的实例变量的值。
end_partition
方法不会从 Snowflake 接收任何实参。Snowflake 只是在您处理分区中的最后一行后调用该方法。
完成分区处理¶
您可以选择在处理程序类中实现 end_partition
方法。Snowflake 在您处理分区中的所有行后调用该方法。通过此方法,您可以在处理完分区的所有行后执行分区的代码。end_partition
方法可能会生成输出行,例如返回分区范围的计算结果。有关更多信息,请参阅 处理分区 (本主题内容)。
方法的签名必须采用以下形式:
def end_partition(self):
Snowflake 需要以下 end_partition
方法实现:
它不能是静态的。
除
self
外,它可能没有任何参数。作为返回表格值的替代方法,它可能会生成空列表或
None
。
备注
虽然 Snowflake 支持大型分区,会调整超时以成功处理分区,但特别大的分区可能导致处理超时(例如 end_partition
需要太长时间才能完成)。如果您需要针对特定使用场景调整超时阈值,请联系 ` Snowflake 支持部门 `_。
分区处理示例¶
以下示例中的代码首先计算每次购买的费用,然后将购买费用相加(在 process
方法中),从而计算股票购买所支付的总费用。代码返回 end_partition
方法中的总数。
有关包含此处理程序的 UDTF 示例,以及调用 UDTF,请参阅 处理程序类示例。
class StockSaleSum:
def __init__(self):
self._cost_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
cost = quantity * price
self._cost_total += cost
yield (symbol, cost)
def end_partition(self):
yield (self._symbol, self._cost_total)
处理分区时,请记住以下几点:
代码可能会处理未在 UDTF 调用中明确指定的分区。即使 UDTF 调用不包含 PARTITION BY 子句,Snowflake 也会明确对数据进行分区。
process
方法将按照分区的 ORDER BY 子句指定的顺序(如果有)接收行数据。
示例¶
使用导入的包¶
您可以使用 Snowflake 中提供的 Anaconda 第三方包精选列表所包含的 Python 包。要将这些包指定为 UDTF 中的依赖项,请使用 CREATE FUNCTION 中的 PACKAGES 子句。
您可以在 Snowflake 中执行以下 SQL,从而发现包含的包列表:
select * from information_schema.packages where language = 'python';
有关更多信息,请参阅 使用第三方包 和 创建 Python UDFs。
以下示例中的代码使用 NumPy (Numerical Python) (https://numpy.org/doc/stable/reference/index.html) 包中的函数,以根据一系列股票购买(其中每股具有不同的价格)计算每股的平均价。
create or replace function stock_sale_average(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
packages = ('numpy')
handler='StockSaleAverage'
as $$
import numpy as np
class StockSaleAverage:
def __init__(self):
self._price_array = []
self._quantity_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
self._price_array.append(float(price))
cost = quantity * price
yield (symbol, cost)
def end_partition(self):
np_array = np.array(self._price_array)
avg = np.average(np_array)
yield (self._symbol, avg)
$$;
以下示例中的代码调用前面的 UDF,从 stocks_table
表中的 symbol
、quantity
和 price
列传递值。有关调用 UDTF 的更多信息,请参阅 调用 UDF。
select stock_sale_average.symbol, total
from stocks_table,
table(stock_sale_average(symbol, quantity, price)
over (partition by symbol));
使用工作进程运行并发任务¶
您可以使用 Python 工作进程运行并发任务。当您需要运行的并行任务利用仓库节点上的多个 CPU 核心时,您可能会发现此操作很有用。
备注
Snowflake 建议您不要使用内置的 Python 多处理模块。
要解决 ` Python 全局解释器锁 <https://wiki.python.org/moin/GlobalInterpreterLock (https://wiki.python.org/moin/GlobalInterpreterLock)>`_ 阻止多任务处理方法扩展到所有 CPU 核心,您可以使用单独的工作进程而不是线程来执行并发任务。
您可以使用 joblib
库的 Parallel
类对 Snowflake 仓库执行此操作,如以下示例所示。
CREATE OR REPLACE FUNCTION joblib_multiprocessing_udtf(i INT)
RETURNS TABLE (result INT)
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'JoblibMultiprocessing'
PACKAGES = ('joblib')
AS $$
import joblib
from math import sqrt
class JoblibMultiprocessing:
def process(self, i):
pass
def end_partition(self):
result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
for r in result:
yield (r, )
$$;
备注
Snowflake Standard 和 Snowpark-Optimized Warehouses 用于 joblib.Parallel
的默认后端有所不同。
Standard Warehouse 默认值:
threading
Snowpark-Optimized Warehouse 默认值:
loky
(多处理)
您可以通过调用 joblib.parallel_backend
函数来替换默认后端设置,如下例所示。
import joblib
joblib.parallel_backend('loky')
使用 CREATE FUNCTION
创建 UDTF¶
您在 SQL 中使用 CREATE FUNCTION 命令创建 UDTF,将您编写的代码指定为处理程序。有关命令参考,请参阅 CREATE FUNCTION。
在创建 UDTF 时使用以下语法
CREATE OR REPLACE FUNCTION <name> ( [ <arguments> ] )
RETURNS TABLE ( <output_column_name> <output_column_type> [, <output_column_name> <output_column_type> ... ] )
LANGUAGE PYTHON
[ IMPORTS = ( '<imports>' ) ]
RUNTIME_VERSION = 3.8
[ PACKAGES = ( '<package_name>' [, '<package_name>' . . .] ) ]
[ TARGET_PATH = '<stage_path_and_file_name_to_write>' ]
HANDLER = '<handler_class>'
[ AS '<python_code>' ]
要将您编写的处理程序代码与 UDTF 关联,请在执行 CREATE FUNCTION 时执行以下操作:
在 RETURNS TABLE 中,以列名和类型对指定输出列。
将 LANGUAGE 设置为 PYTHON。
如果处理程序类位于外部位置(例如,在暂存区上),则将 IMPORTS 子句值设置为该类的路径和名称。
有关更多信息,请参阅 创建 Python UDFs。
将 RUNTIME_VERSION 设置为代码所需 Python 运行时的版本。受支持的 Python 版本包括:
3.8
3.9
3.10
3.11
将 PACKAGES 子句值设置为处理程序类所需的一个或多个包(如果有)的名称。
有关更多信息,请参阅 使用第三方包 和 创建 Python UDFs。
将 HANDLER 子句值设置为处理程序类的名称。
将 Python 处理程序代码与 UDTF 关联时,您可以内联包含代码,也可以在 Snowflake 暂存区上的某个位置引用代码。HANDLER 值区分大小写,并且必须与 Python 类的名称匹配。
有关更多信息,请参阅 UDFs 使用内联代码与,UDFs 使用从暂存区上传的代码。
重要
对于 标量 Python UDF,HANDLER 子句值包含方法名称。
对于 Python UDTF,HANDLER 子句值包含类名,但 不 包含方法名。
造成差异的原因是,对于标量 Python UDF,处理程序方法的名称由用户选择,因此 Snowflake 事先不知道,但对于 Python UDTF,方法(例如
end_partition
方法)的名称是已知的,因为它们必须与 Snowflake 指定的名称匹配。如果处理程序代码是使用 CREATE FUNCTION 内联指定,则
AS '<python_code>'
子句是必需项。