矢量化 Python UDTFs¶
本主题介绍矢量化 Python UDTFs。
概述¶
矢量化 Python UDTFs(用户定义的表函数)提供了一种批量操作行的方法。
Snowflake 支持两种矢量化 UDTFs:
带有矢量化
end_partition方法的 UDTFs带有矢量化
process方法的 UDTFs
您必须选择一种,因为 UDTF 不能同时具有矢量化 process 方法和矢量化 end_partition 方法。
带有矢量化 end_partition 方法的 UDTFs¶
UDTFs with a vectorized end_partition method enable seamless partition-by-partition processing by operating on
partitions as pandas DataFrames (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
and returning results as
pandas DataFrames (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
or lists of pandas arrays (https://pandas.pydata.org/docs/reference/api/pandas.array.html)
or pandas Series (https://pandas.pydata.org/docs/reference/series.html).
This facilitates integration with libraries that operate on pandas DataFrames or pandas arrays.
将矢量化 end_partition 方法用于以下任务:
逐个分区处理数据,而不是逐行处理数据。
为每个分区返回多行或多列。
使用在 Pandas DataFrames 上操作的库来进行数据分析。
采用矢量化 process 方法的 UDTFs¶
带有矢量化 process 方法的 UDTFs 提供一种批量操作行的方法,并假设操作执行一对一的映射。换句话说,该方法会为每个输入行返回一个输出行。列数不受限制。
将矢量化 process 方法用于以下任务:
批量应用存在多列结果的一对一转换。
使用需要
pandas.DataFrame的库。批量处理行,无需明确分区。
利用 to_pandas() API 将查询结果直接转换为 Pandas DataFrame。
先决条件¶
需要 Snowpark Library for Python 版本 1.14.0 或更高版本。
创建使用矢量化 end_partition 方法的 UDTF¶
可选:使用
__init__方法定义一个处理程序类,该方法将在处理每个分区之前调用。注意:请勿定义
process方法。定义一个
end_partition方法,该方法接受一个 DataFrame 实参并返回或产生一个pandas.DataFrame,或者pandas.Series或pandas.arrays的元组,其中每个数组都是一列。结果的列类型必须与 UDTF 定义中的列类型匹配。
使用
@vectorized装饰器或_sf_vectorized_input函数属性将end_partition方法标记为矢量化。有关更多信息,请参阅 矢量化 Python UDFs.:code:
@vectorized装饰器只能在 Python UDTF 在 Snowflake 中执行时使用;例如,当使用 SQL 工作表时。使用客户端或 Python 工作表执行时,必须使用该函数属性。
备注
带有矢量化 end_partition 方法的 UDTF 输入 DataFrame 的默认列名与 SQL 函数的签名相符。列名称遵循 SQL 标识符要求。也就是说,如果标识符未加引号,则该标识符将大写,如果该标识符使用了双引号,则将保持原样。
以下代码块是使用 @vectorized 装饰器创建使用矢量化 end_partition 方法的 UDTF 的示例。
以下代码块是使用函数属性创建使用矢量化 end_partition 方法的 UDTF 的示例。
备注
必须使用 PARTITION BY 子句,调用使用矢量化 end_partition 方法的 UDTF 来构建分区。
要使用同一分区中的所有数据调用 UDTF,请执行以下操作:
要使用按列 x 分区的数据调用 UDTF,请执行以下操作:
示例:使用常规 UDTF 与使用矢量化 end_partition 方法的 UDTF 的行集合¶
使用常规 UDTF 的行集合:
使用矢量化 end_partition 方法使用 UDTF 的行集合:
示例:计算分区中每列的汇总统计数据¶
以下是一个示例,说明如何使用 Pandas describe() 方法计算分区中每列的汇总统计数据。
创建一个表并生成 3 个分区,每个分区 5 行:
查看数据:
创建函数:
执行以下步骤之一:
调用函数并按
id进行分区:调用函数并将整个表视为一个分区:
采用矢量化 process 方法创建 UDTF¶
定义处理程序类,类似于常规 UDTFs,可选择
__init__和end_partition方法。定义
process方法,该方法接受一个 DataFrame 实参并返回一个pandas.DataFrame或者pandas.Series或pandas.arrays的元组,其中每个数组都是一列。结果的列类型必须与 UDTF 定义中的列类型匹配。返回的结果必须正好为 DataFrame 或 tuple。这与矢量化
end_partition方法不同,您可以在其中生成或返回一个列表。使用
@vectorized装饰器或_sf_vectorized_input函数属性将process方法标记为矢量化。有关更多信息,请参阅 矢量化 Python UDFs.:code:
@vectorized装饰器只能在 Python UDTF 在 Snowflake 中执行时使用;例如,当使用 SQL 工作表时。使用客户端或 Python 工作表执行时,必须使用该函数属性。可选:如果您的 Python 处理程序函数超过执行时间限制,请 设置目标批量大小。
备注
带有矢量化 process 方法的 UDTF 输入 DataFrame 的默认列名与 SQL 函数的签名相符。列名称遵循 SQL 标识符要求。也就是说,如果标识符未加引号,则该标识符将大写,如果该标识符使用了双引号,则将保持原样。
对于带有矢量化 process 方法的 UDTF,可以将其处理程序实现为分区感知的方式处理批次,或简单地逐批处理。有关详细信息,请参阅 有状态和无状态处理。
示例:带有矢量化 process 方法的 UDTF 应用独热编码¶
使用带有矢量化 process 方法的 UDTF 为包含十个类别的表应用独热编码。
示例结果:
准备打印表:
示例结果:
您可以使用矢量化 UDF 获取相同结果,但不太方便。您需要将结果打包成一列,然后解包该列以将结果恢复为可用的 Pandas DataFrame。
使用矢量化 UDF 的示例:
类型支持¶
矢量化 UDTFs 支持与矢量化 UDFs 相同的 SQL 类型。但是,对于矢量化 UDTFs,适合 64 位或更小整数类型的小数位数为 0 的 SQL NUMBER 实参将始终映射到 Int16、Int32 或 Int64。与标量 UDFs 不同,如果 UDTF 的实参不可为空,则不会将其转换为 int16、int32 或 int64。
要查看显示 SQL 类型如何映射到 Pandas 数据类型的表,请参阅矢量化 Python UDFs 主题中的 类型支持表。
最佳实践¶
如果必须随每一行返回一个标量,请构建一个重复值列表,而不是解包
numpy数组来创建元组。例如,对于两列结果,不执行以下操作:使用此:
要提高性能,请将半结构化数据解包到列中。
例如,如果有一个变体列
obj,其中包含元素x(int)、y(float)和z(string),那么不定义具有类似以下签名的 UDTF,并使用vec_udtf(obj)进行调用:定义具有类似以下签名的 UDTF,并使用
vec_udtf(obj:x, obj:y, obj:z)进行调用:默认情况下,Snowflake 将输入编码为支持 NULL 值的 Pandas 数据类型(例如 Int64 (https://pandas.pydata.org/docs/reference/api/pandas.Int64Dtype.html))。如果使用需要基本类型(如
numpy)的库,并且输入没有 NULL 值,则应该在使用库之前将列转换为基本类型。例如:有关更多信息,请参阅 类型支持。
采用带有矢量化
end_partition方法的 UDTFs 时,为了提高性能并防止超时,请避免使用pandas.concat累积部分结果。相反,只要准备好了,就会产生部分结果。例如,不执行以下操作:
执行以下操作: