矢量化 Python UDFs¶
本主题介绍矢量化 Python UDFs。
本主题内容:
概述¶
通过矢量化 Python UDFs,您可以定义 Python 函数,该函数接收批量输入行作为 Pandas DataFrames (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html),并返回批量结果作为 Pandas 数组 (https://pandas.pydata.org/docs/reference/api/pandas.array.html) 或 Series (https://pandas.pydata.org/docs/reference/series.html)。调用矢量化 Python UDFs 的方式与调用其他 Python UDFs 的方式相同。
与默认的逐行处理模式相比,使用矢量化 Python UDFs 的好处包括:
如果 Python 代码针对行批次高效运行,就有可能实现更高的性能。
如果要调用基于 Pandas DataFrames 或 Pandas 数组运行的库,则需要较少的转换逻辑。
使用矢量化 Python UDFs 时:
不需要更改使用 Python UDFs 编写查询的方式。所有批处理都由 UDF 框架(而非您自己的代码)处理。
与非矢量化 UDFs 一样,不能保证处理程序代码的哪些实例会看到输入的哪些批次。
矢量化 Python UDFs 入门¶
要创建矢量化 Python UDF,请使用受支持的机制之一来注释处理程序函数。
使用 vectorized
装饰器¶
_snowflake
模块向在 Snowflake 中执行的 Python UDFs 公开。在 Python 代码中,导入 _snowflake
模块,并使用 vectorized
装饰器,通过将 input
参数设置为 pandas.DataFrame
来指定处理程序预期会接收 Pandas DataFrame。
create function add_one_to_inputs(x number(10, 0), y number(10, 0))
returns number(10, 0)
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_one_to_inputs'
as $$
import pandas
from _snowflake import vectorized
@vectorized(input=pandas.DataFrame)
def add_one_to_inputs(df):
return df[0] + df[1] + 1
$$;
使用函数属性¶
您可以对处理程序函数设置特殊 _sf_vectorized_input
属性,而不必导入 _snowflake 模块并使用 vectorized
装饰器。
create function add_one_to_inputs(x number(10, 0), y number(10, 0))
returns number(10, 0)
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_one_to_inputs'
as $$
import pandas
def add_one_to_inputs(df):
return df[0] + df[1] + 1
add_one_to_inputs._sf_vectorized_input = pandas.DataFrame
$$;
设置目标批次大小¶
对 Python 处理程序函数的调用必须在 180 秒的时间限制内执行,目前,作为输入传递给处理程序函数的每个 DataFrame 最多可以包含数千行。为了不超出时间限制,您可能希望为处理程序函数设置目标批次大小,这会限制每个输入 DataFrame 的最大行数。请注意,设置较大的值并不能保证 Snowflake 将使用指定行数对批次进行编码。您可以使用 vectorized
装饰器或函数属性来设置目标批次大小。
备注
使用 max_batch_size
只是一种限制 UDF 在每一个批次中可以处理的行数的机制。例如,如果 UDF 写入的方式一次最多只能处理 100 行,则 max_batch_size
应设置为 100。不应将设置 max_batch_size
用作指定任意大的批次大小的机制。如果 UDF 能处理任何大小的批次,建议不设置此参数。
使用 vectorized
装饰器¶
若要使用 vectorized
装饰器设置目标批次大小,请为名为 max_batch_size
的实参传递一个正整数值。
例如,此语句创建一个矢量化 Python UDF,并将每个 Dataframe 限制为最多 100 行:
create function add_one_to_inputs(x number(10, 0), y number(10, 0))
returns number(10, 0)
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_one_to_inputs'
as $$
import pandas
from _snowflake import vectorized
@vectorized(input=pandas.DataFrame, max_batch_size=100)
def add_one_to_inputs(df):
return df[0] + df[1] + 1
$$;
使用函数属性¶
若要使用函数属性设置目标批次大小,请为处理程序函数的 _sf_max_batch_size
属性设置一个正整数值。
例如,此语句创建一个矢量化 Python UDF,并将每个 DataFrame 限制为最多 100 行:
create function add_one_to_inputs(x number(10, 0), y number(10, 0))
returns number(10, 0)
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_one_to_inputs'
as $$
import pandas
def add_one_to_inputs(df):
return df[0] + df[1] + 1
add_one_to_inputs._sf_vectorized_input = pandas.DataFrame
add_one_to_inputs._sf_max_batch_size = 100
$$;
DataFrame 编码¶
UDF 的成批实参在输入 Pandas DataFrames 中被编码为数组,每个 DataFrame 中的行数可能会有所不同。有关更多信息,请参阅 设置目标批次大小。在 DataFrame 中,实参可以通过其索引访问,即第一个实参的索引为 0,第二个实参的索引为 1,依此类推。UDF 处理程序返回的 Pandas 数组或系列必须与输入 DataFrame 的长度相同。
为了说明这一点,假设您定义了一个矢量化 Python UDF,如下所示:
create or replace function add_inputs(x int, y float)
returns float
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_inputs'
as $$
import pandas
from _snowflake import vectorized
@vectorized(input=pandas.DataFrame)
def add_inputs(df):
return df[0] + df[1]
$$;
这个 UDF 为第一个实参使用 df[0]
来访问 Pandas 数组,并为第二个实参使用 df[1]
。df[0] + df[1]
会获得一个 Pandas 数组,其中包含来自两个数组的对应元素的成对和。创建 UDF 后,您可以使用一些输入行来调用它:
select add_inputs(x, y)
from (
select 1 as x, 3.14::float as y union all
select 2, 1.59 union all
select 3, -0.5
);
+------------------+
| ADD_INPUTS(X, Y) |
|------------------|
| 4.14 |
| 3.59 |
| 2.5 |
+------------------+
在这里,add_inputs
Python 函数接收一个 DataFrame,类似于使用以下 Python 代码创建的 DataFrame:
>>> import pandas
>>> df = pandas.DataFrame({0: pandas.array([1, 2, 3]), 1: pandas.array([3.14, 1.59, -0.5])})
>>> df
0 1
0 1 3.14
1 2 1.59
2 3 -0.50
处理程序函数中的 return df[0] + df[1]
行将生成类似于以下 Python 代码的数组:
>>> df[0] + df[1]
0 4.14
1 3.59
2 2.50
dtype: float64
类型支持¶
矢量化 Python UDFs 支持以下 SQL 类型 的实参和返回值。该表反映了如何将每个 SQL 实参编码为特定 dtype (https://pandas.pydata.org/docs/user_guide/basics.html#basics-dtypes) 的 Pandas 数组。
SQL 类型 |
Pandas dtype |
备注 |
---|---|---|
NUMBER |
|
为确保 UDF 的某个输入实参解析为不可为 null,请从使用 |
FLOAT |
|
NULL 值被编码为 NaN 值。在输出中, NaN 值被解析为 NULLs。 |
BOOLEAN |
|
|
VARCHAR |
|
Snowflake SQL 和 Pandas 都使用 UTF-8 编码来表示字符串。 |
BINARY |
|
|
DATE |
|
每个值都编码为不带时间分量的 |
VARIANT |
|
对于实参,每个 Variant 行都会动态转换为 Python 类型,对于返回值则会反过来转换。以下类型将转换为字符串而不是原生 Python 类型: |
OBJECT |
|
|
ARRAY |
|
|
TIME |
|
每个值都编码为相对于午夜的偏移量。NULL 值编码为 |
TIMESTAMP_LTZ |
|
使用本地时区,将每个值编码为相对于 UTC Unix 纪元的纳秒尺度的 |
TIMESTAMP_NTZ |
|
将每个值编码为纳秒尺度的 |
TIMESTAMP_TZ |
|
将每个值编码为纳秒尺度的 |
GEOGRAPHY |
|
将每个值格式化为 GeoJSON,然后将其转换为 Python |
接受以下类型作为输出:Pandas Series
或 array
、NumPy array
、常规 Python list
,以及包含 label-udf_python_batch_types`中所述预期类型的任何可迭代序列。在 dtype 是 :code:`bool、boolean
、int16
、int32
、int64
、Int16
、Int32
、Int64
或 float64
时,使用 Pandas Series
、array
和 NumPy array
是有效的,因为它们将其内容公开为 memoryviews
。这意味着可以复制内容,而不必按顺序读取每个值。