矢量化 Python UDTFs¶
本主题介绍矢量化 Python UDTFs。
本主题内容:
概述¶
矢量化 Python UDTFs(用户定义的表函数)提供了一种批量操作行的方法。
Snowflake 支持两种矢量化 UDTFs:
带有矢量化
end_partition
方法的 UDTFs带有矢量化
process
方法的 UDTFs
您必须选择一种,因为 UDTF 不能同时具有矢量化 process
方法和矢量化 end_partition
方法。
带有矢量化 end_partition 方法的 UDTFs¶
UDTFs with a vectorized end_partition
method enable seamless partition-by-partition processing by operating on
partitions as pandas DataFrames (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
and returning results as
pandas DataFrames (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
or lists of pandas arrays (https://pandas.pydata.org/docs/reference/api/pandas.array.html)
or pandas Series (https://pandas.pydata.org/docs/reference/series.html).
This facilitates integration with libraries that operate on pandas DataFrames or pandas arrays.
将矢量化 end_partition
方法用于以下任务:
逐个分区处理数据,而不是逐行处理数据。
为每个分区返回多行或多列。
使用在 Pandas DataFrames 上操作的库来进行数据分析。
采用矢量化 process 方法的 UDTFs¶
带有矢量化 process
方法的 UDTFs 提供一种批量操作行的方法,并假设操作执行一对一的映射。换句话说,该方法会为每个输入行返回一个输出行。列数不受限制。
将矢量化 process
方法用于以下任务:
批量应用存在多列结果的一对一转换。
使用需要
pandas.DataFrame
的库。批量处理行,无需明确分区。
利用 to_pandas() API 将查询结果直接转换为 Pandas DataFrame。
先决条件¶
需要 Snowpark Library for Python 版本 1.14.0 或更高版本。
创建使用矢量化 end_partition 方法的 UDTF¶
可选:使用
__init__
方法定义一个处理程序类,该方法将在处理每个分区之前调用。注意:请勿定义
process
方法。定义一个
end_partition
方法,该方法接受一个 DataFrame 实参并返回或产生一个pandas.DataFrame
,或者pandas.Series
或pandas.arrays
的元组,其中每个数组都是一列。结果的列类型必须与 UDTF 定义中的列类型匹配。
使用
@vectorized
装饰器或_sf_vectorized_input
函数属性将end_partition
方法标记为矢量化。有关更多信息,请参阅 矢量化 Python UDFs.:code:
@vectorized
装饰器只能在 Python UDTF 在 Snowflake 中执行时使用;例如,当使用 SQL 工作表时。使用客户端或 Python 工作表执行时,必须使用该函数属性。
备注
带有矢量化 end_partition
方法的 UDTF 输入 DataFrame 的默认列名与 SQL 函数的签名相符。列名称遵循 SQL 标识符要求。也就是说,如果标识符未加引号,则该标识符将大写,如果该标识符使用了双引号,则将保持原样。
以下代码块是使用 @vectorized
装饰器创建使用矢量化 end_partition
方法的 UDTF 的示例。
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
# initialize a state
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
以下代码块是使用函数属性创建使用矢量化 end_partition
方法的 UDTF 的示例。
import pandas
class handler:
def __init__(self):
# initialize a state
def end_partition(self, df):
# process the DataFrame
return result_df
handler.end_partition._sf_vectorized_input = pandas.DataFrame
备注
必须使用 PARTITION BY 子句,调用使用矢量化 end_partition
方法的 UDTF 来构建分区。
要使用同一分区中的所有数据调用 UDTF,请执行以下操作:
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
要使用按列 x 分区的数据调用 UDTF,请执行以下操作:
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
示例:使用常规 UDTF 与使用矢量化 end_partition 方法的 UDTF 的行集合¶
使用常规 UDTF 的行集合:
import pandas
class handler:
def __init__(self):
self.rows = []
def process(self, *row):
self.rows.append(row)
def end_partition(self):
df = pandas.DataFrame(self.rows)
# process the DataFrame
return result_df
使用矢量化 end_partition
方法使用 UDTF 的行集合:
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
self.rows = []
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
示例:计算分区中每列的汇总统计数据¶
以下是一个示例,说明如何使用 Pandas describe()
方法计算分区中每列的汇总统计数据。
创建一个表并生成 3 个分区,每个分区 5 行:
create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float); -- generate 3 partitions of 5 rows each insert into test_values select 'x', uniform(1.5,1000.5,random(1))::float col1, uniform(1.5,1000.5,random(2))::float col2, uniform(1.5,1000.5,random(3))::float col3, uniform(1.5,1000.5,random(4))::float col4, uniform(1.5,1000.5,random(5))::float col5 from table(generator(rowcount => 5)); insert into test_values select 'y', uniform(1.5,1000.5,random(10))::float col1, uniform(1.5,1000.5,random(20))::float col2, uniform(1.5,1000.5,random(30))::float col3, uniform(1.5,1000.5,random(40))::float col4, uniform(1.5,1000.5,random(50))::float col5 from table(generator(rowcount => 5)); insert into test_values select 'z', uniform(1.5,1000.5,random(100))::float col1, uniform(1.5,1000.5,random(200))::float col2, uniform(1.5,1000.5,random(300))::float col3, uniform(1.5,1000.5,random(400))::float col4, uniform(1.5,1000.5,random(500))::float col5 from table(generator(rowcount => 5));
查看数据:
select * from test_values; ----------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" | ----------------------------------------------------- |x |8.0 |99.4 |714.6 |168.7 |397.2 | |x |106.4 |237.1 |971.7 |828.4 |988.2 | |x |741.3 |207.9 |32.6 |640.6 |63.2 | |x |541.3 |828.6 |844.9 |77.3 |403.1 | |x |4.3 |723.3 |924.3 |282.5 |158.1 | |y |976.1 |562.4 |968.7 |934.3 |977.3 | |y |390.0 |244.3 |952.6 |101.7 |24.9 | |y |599.7 |191.8 |90.2 |788.2 |761.2 | |y |589.5 |201.0 |863.4 |415.1 |696.1 | |y |46.7 |659.7 |571.1 |938.0 |513.7 | |z |313.9 |188.5 |964.6 |435.4 |519.6 | |z |328.3 |643.1 |766.4 |148.1 |596.4 | |z |929.0 |255.4 |915.9 |857.2 |425.5 | |z |612.8 |816.4 |220.2 |879.5 |331.4 | |z |487.1 |704.5 |471.5 |378.9 |481.2 | -----------------------------------------------------
创建函数:
create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float) returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float) language python RUNTIME_VERSION = 3.9 packages=('pandas') handler='handler' as $$ from _snowflake import vectorized import pandas class handler: @vectorized(input=pandas.DataFrame) def end_partition(self, df): # using describe function to get the summary statistics result = df.describe().transpose() # add a column at the beginning for column ids result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5']) return result $$;
执行以下步骤之一:
调用函数并按
id
进行分区:-- partition by id select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5) over (partition by id)) order by id, column_name; -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |x |NULL |NULL |NULL |NULL |NULL |col1 |5 |280.25999999999993 |339.5609267863427 |4.3 |8.0 |106.4 |541.3 |741.3 | |x |NULL |NULL |NULL |NULL |NULL |col2 |5 |419.25999999999993 |331.72476995244114 |99.4 |207.9 |237.1 |723.3 |828.6 | |x |NULL |NULL |NULL |NULL |NULL |col3 |5 |697.62 |384.2964311569911 |32.6 |714.6 |844.9 |924.3 |971.7 | |x |NULL |NULL |NULL |NULL |NULL |col4 |5 |399.5 |321.2689294033894 |77.3 |168.7 |282.5 |640.6 |828.4 | |x |NULL |NULL |NULL |NULL |NULL |col5 |5 |401.96000000000004 |359.83584173897964 |63.2 |158.1 |397.2 |403.1 |988.2 | |y |NULL |NULL |NULL |NULL |NULL |col1 |5 |520.4 |339.16133329139984 |46.7 |390.0 |589.5 |599.7 |976.1 | |y |NULL |NULL |NULL |NULL |NULL |col2 |5 |371.84 |221.94799616126298 |191.8 |201.0 |244.3 |562.4 |659.7 | |y |NULL |NULL |NULL |NULL |NULL |col3 |5 |689.2 |371.01012789410476 |90.2 |571.1 |863.4 |952.6 |968.7 | |y |NULL |NULL |NULL |NULL |NULL |col4 |5 |635.46 |366.6140927460372 |101.7 |415.1 |788.2 |934.3 |938.0 | |y |NULL |NULL |NULL |NULL |NULL |col5 |5 |594.64 |359.0334218425911 |24.9 |513.7 |696.1 |761.2 |977.3 | |z |NULL |NULL |NULL |NULL |NULL |col1 |5 |534.22 |252.58182238633088 |313.9 |328.3 |487.1 |612.8 |929.0 | |z |NULL |NULL |NULL |NULL |NULL |col2 |5 |521.58 |281.4870103574941 |188.5 |255.4 |643.1 |704.5 |816.4 | |z |NULL |NULL |NULL |NULL |NULL |col3 |5 |667.72 |315.53336907528495 |220.2 |471.5 |766.4 |915.9 |964.6 | |z |NULL |NULL |NULL |NULL |NULL |col4 |5 |539.8199999999999 |318.73025742781306 |148.1 |378.9 |435.4 |857.2 |879.5 | |z |NULL |NULL |NULL |NULL |NULL |col5 |5 |470.82 |99.68626786072393 |331.4 |425.5 |481.2 |519.6 |596.4 | --------------------------------------------------------------------------------------------------------------------------------------------------------------------
调用函数并将整个表视为一个分区:
-- treat the whole table as one partition select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5) over (partition by 1)) order by id, column_name; --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |NULL |NULL |NULL |NULL |NULL |NULL |col1 |15 |444.96 |314.01110034974425 |4.3 |210.14999999999998 |487.1 |606.25 |976.1 | |NULL |NULL |NULL |NULL |NULL |NULL |col2 |15 |437.56 |268.95505944302295 |99.4 |204.45 |255.4 |682.1 |828.6 | |NULL |NULL |NULL |NULL |NULL |NULL |col3 |15 |684.8466666666667 |331.87254839915937 |32.6 |521.3 |844.9 |938.45 |971.7 | |NULL |NULL |NULL |NULL |NULL |NULL |col4 |15 |524.9266666666666 |327.074780585783 |77.3 |225.6 |435.4 |842.8 |938.0 | |NULL |NULL |NULL |NULL |NULL |NULL |col5 |15 |489.14 |288.9176669671038 |24.9 |364.29999999999995 |481.2 |646.25 |988.2 | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
采用矢量化 process 方法创建 UDTF¶
定义处理程序类,类似于常规 UDTFs,可选择
__init__
和end_partition
方法。定义
process
方法,该方法接受一个 DataFrame 实参并返回一个pandas.DataFrame
或者pandas.Series
或pandas.arrays
的元组,其中每个数组都是一列。结果的列类型必须与 UDTF 定义中的列类型匹配。返回的结果必须正好为 DataFrame 或 tuple。这与矢量化
end_partition
方法不同,您可以在其中生成或返回一个列表。使用
@vectorized
装饰器或_sf_vectorized_input
函数属性将process
方法标记为矢量化。有关更多信息,请参阅 矢量化 Python UDFs.:code:
@vectorized
装饰器只能在 Python UDTF 在 Snowflake 中执行时使用;例如,当使用 SQL 工作表时。使用客户端或 Python 工作表执行时,必须使用该函数属性。可选:如果您的 Python 处理程序函数超过执行时间限制,请 设置目标批量大小。
备注
带有矢量化 process
方法的 UDTF 输入 DataFrame 的默认列名与 SQL 函数的签名相符。列名称遵循 SQL 标识符要求。也就是说,如果标识符未加引号,则该标识符将大写,如果该标识符使用了双引号,则将保持原样。
对于带有矢量化 process
方法的 UDTF,可以将其处理程序实现为分区感知的方式处理批次,或简单地逐批处理。有关详细信息,请参阅 有状态和无状态处理。
示例:带有矢量化 process 方法的 UDTF 应用独热编码¶
使用带有矢量化 process
方法的 UDTF 为包含十个类别的表应用独热编码。
import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.types import PandasDataFrame
class one_hot_encode:
def process(self, df: PandasDataFrame[str]) -> PandasDataFrame[int,int,int,int,int,int,int,int,int,int]:
return pd.get_dummies(df)
process._sf_vectorized_input = pd.DataFrame
one_hot_encode_udtf = session.udtf.register(
one_hot_encode,
output_schema=["categ0", "categ1", "categ2", "categ3", "categ4", "categ5", "categ6", "categ7", "categ8", "categ9"],
input_names=['"categ"']
)
df_table = session.table("categories")
df_table.show()
示例结果:
-----------
|"CATEG" |
-----------
|categ1 |
|categ6 |
|categ8 |
|categ5 |
|categ7 |
|categ5 |
|categ1 |
|categ2 |
|categ2 |
|categ4 |
-----------
准备打印表:
res = df_table.select("categ", one_hot_encode_udtf("categ")).to_pandas()
print(res.head())
示例结果:
CATEG CATEG0 CATEG1 CATEG2 CATEG3 CATEG4 CATEG5 CATEG6 CATEG7 CATEG8 CATEG9
0 categ0 1 0 0 0 0 0 0 0 0 0
1 categ0 1 0 0 0 0 0 0 0 0 0
2 categ5 0 0 0 0 0 1 0 0 0 0
3 categ3 0 0 0 1 0 0 0 0 0 0
4 categ8 0 0 0 0 0 0 0 0 1 0
您可以使用矢量化 UDF 获取相同结果,但不太方便。您需要将结果打包成一列,然后解包该列以将结果恢复为可用的 Pandas DataFrame。
使用矢量化 UDF 的示例:
def one_hot_encode(df: PandasSeries[str]) -> PandasSeries[Variant]:
return pd.get_dummies(df).to_dict('records')
one_hot_encode._sf_vectorized_input = pd.DataFrame
one_hot_encode_udf = session.udf.register(
one_hot_encode,
output_schema=["encoding"],
)
df_table = session.table("categories")
df_table.show()
res = df_table.select(one_hot_encode_udf("categ")).to_df("encoding").to_pandas()
print(res.head())
0 {\n "categ0": false,\n "categ1": false,\n "...
1 {\n "categ0": false,\n "categ1": true,\n "c...
2 {\n "categ0": false,\n "categ1": false,\n "...
3 {\n "categ0": false,\n "categ1": false,\n "...
4 {\n "categ0": true,\n "categ1": false,\n "c...
类型支持¶
矢量化 UDTFs 支持与矢量化 UDFs 相同的 SQL 类型。但是,对于矢量化 UDTFs,适合 64 位或更小整数类型的小数位数为 0 的 SQL NUMBER
实参将始终映射到 Int16
、Int32
或 Int64
。与标量 UDFs 不同,如果 UDTF 的实参不可为空,则不会将其转换为 int16
、int32
或 int64
。
要查看显示 SQL 类型如何映射到 Pandas 数据类型的表,请参阅矢量化 Python UDFs 主题中的 类型支持表。
最佳实践¶
如果必须随每一行返回一个标量,请构建一个重复值列表,而不是解包
numpy
数组来创建元组。例如,对于两列结果,不执行以下操作:return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
使用此:
return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
要提高性能,请将半结构化数据解包到列中。
例如,如果有一个变体列
obj
,其中包含元素x(int)
、y(float)
和z(string)
,那么不定义具有类似以下签名的 UDTF,并使用vec_udtf(obj)
进行调用:create function vec_udtf(variant obj)
定义具有类似以下签名的 UDTF,并使用
vec_udtf(obj:x, obj:y, obj:z)
进行调用:create function vec_udtf(int, float, string)
默认情况下,Snowflake 将输入编码为支持 NULL 值的 Pandas 数据类型(例如 Int64 (https://pandas.pydata.org/docs/reference/api/pandas.Int64Dtype.html))。如果使用需要基本类型(如
numpy
)的库,并且输入没有 NULL 值,则应该在使用库之前将列转换为基本类型。例如:input_df['y'] = input_df['y'].astype("int64")
有关更多信息,请参阅 类型支持。
采用带有矢量化
end_partition
方法的 UDTFs 时,为了提高性能并防止超时,请避免使用pandas.concat
累积部分结果。相反,只要准备好了,就会产生部分结果。例如,不执行以下操作:
results = [] while(...): partial_result = pd.DataFrame(...) results.append(partial_result) return pd.concat(results)
执行以下操作:
while(...): partial_result = pd.DataFrame(...) yield partial_result