在 Python 中为 DataFrames 创建用户定义的聚合函数 (UDAFs)¶
您可以使用 Snowpark Python APIs 创建和调用用户定义的聚合函数 (UDAFs)。UDAF 将一行或多行作为输入,并产生单行输出。它可对多行数值进行数学计算,如求和、求平均值、计数、求最小值或最大值、标准偏差和估算,还可进行一些非数学运算。
要在 Snowpark 中创建和注册 UDAF,您必须执行以下操作:
实施 UDAF 处理程序。
该处理程序包含 UDAF 的逻辑。UDAF 处理程序必须实施 Snowflake 在调用 UDAF 时将在运行时调用的函数。有关更多信息,请参阅 实施处理程序。
在 Snowflake 数据库中注册 UDAF 及其处理程序。
注册 UDAF 后,您可以通过 SQL 或使用 Snowpark API 进行调用。您可以使用 Snowpark API 注册 UDAF 及其处理程序。有关注册的更多信息,请参阅 注册 UDAF。
实施处理程序¶
正如 聚合函数处理程序界面 中详细描述的那样,UDAF 处理程序类必须实施 Snowflake 用来调用 UDAF 的方法。无论您是使用 Snowpark API 注册 UDAF,还是 使用 CREATE FUNCTION 语句,借助 SQL 来创建,都可以使用编写的类作为处理程序。
UDAF 处理程序类实施了下表中列出的方法,Snowflake 在运行时会调用这些方法。请参阅 本主题中的示例。
方法 |
要求 |
描述 |
---|---|---|
|
必填 |
将汇总的内部状态初始化。 |
|
必填 |
返回汇总的内部状态。
|
|
必填 |
根据新的输入行累积汇总的状态。 |
|
必填 |
结合两个中间汇总状态。 |
|
必填 |
根据汇总状态生成最终结果。 |
注册 UDAF¶
实施 UDAF 处理程序后,可以使用 Snowpark API 在 Snowflake 数据库上注册 UDAF。注册 UDAF 将创建 UDAF,以便可以调用它。
您可以像注册标量 UDF 一样,将 UDAF 注册为命名函数或匿名函数。有关注册标量 UDF 的相关信息,请参阅 创建匿名 UDF 和 创建和注册命名的 UDF。注册 UDAF 时,指定 Snowflake 创建 UDAF 所需的参数值。
您可以使用以下函数和方法注册该函数:
使用
register
方法或udaf
函数,指定处理程序类的名称以及实参,从而定义函数。您还可以将udaf
用作处理程序类上的@udaf
装饰器。有关参考信息,请参阅下文:
使用
register_from_file
函数,指向包含 Python 源代码的 Python 文件或 zip 文件。有关函数参考,请参阅 Snowflake.snowpark.udaf.UDAFRegistration.register_from_file。
示例¶
创建一个带有返回值和单参数的 UDAF¶
以下处理程序示例中的 Python 代码支持 sum_int
UDAF,它会接收一个整型实参,将数值跨行相加并返回结果。
注册该函数¶
>>> import snowflake.snowpark as snowpark
>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udaf
>>> def main(session: snowpark.Session):
>>> class PythonSumUDAF:
... def __init__(self):
... # This aggregate state is a primitive Python data type.
... self._partial_sum = 0
...
... @property
... def aggregate_state(self):
... return self._partial_sum
...
... def accumulate(self, input_value):
... self._partial_sum += input_value
...
... def merge(self, other_partial_sum):
... self._partial_sum += other_partial_sum
...
... def finish(self):
... return self._partial_sum
>>> sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType()])
调用函数¶
下面示例中的 Python 代码借助 DataFrame 调用 sum_int
UDAF。
>>> df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
>>> result = df.agg(sum_udaf("a")).collect()
>>> print(result.collect())
创建一个带有返回值和两个参数的 UDAF¶
注册该函数¶
下面处理程序示例中的 Python 代码支持 sum_int
UDAF,它接收两个整型参数,将参数值跨行相加并返回结果。
>>> import snowflake.snowpark as snowpark
>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udaf
>>> def main(session: snowpark.Session):
>>> class PythonSumUDAF:
... def __init__(self):
... self._partial_sum = 0
...
... @property
... def aggregate_state(self):
... return self._partial_sum
...
... def accumulate(self, input_value, input_value2):
... self._partial_sum += input_value + input_value2
...
... def merge(self, other_partial_sum):
... self._partial_sum += other_partial_sum
...
... def finish(self):
... return self._partial_sum
>>> sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
调用函数¶
下面示例中的 Python 代码借助 DataFrame 调用 sum_int
UDAF。
>>> df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
>>> result = df.agg(sum_udaf("a", "b"))
>>> print(result.collect())