在 Python 中为 DataFrames 创建用户定义的聚合函数 (UDAFs)

您可以使用 Snowpark Python APIs 创建和调用用户定义的聚合函数 (UDAFs)。UDAF 将一行或多行作为输入,并产生单行输出。它可对多行数值进行数学计算,如求和、求平均值、计数、求最小值或最大值、标准偏差和估算,还可进行一些非数学运算。

要在 Snowpark 中创建和注册 UDAF,您必须执行以下操作:

  • 实施 UDAF 处理程序。

    该处理程序包含 UDAF 的逻辑。UDAF 处理程序必须实施 Snowflake 在调用 UDAF 时将在运行时调用的函数。有关更多信息,请参阅 实施处理程序

  • 在 Snowflake 数据库中注册 UDAF 及其处理程序。

    注册 UDAF 后,您可以通过 SQL 或使用 Snowpark API 进行调用。您可以使用 Snowpark API 注册 UDAF 及其处理程序。有关注册的更多信息,请参阅 注册 UDAF

实施处理程序

正如 聚合函数处理程序界面 中详细描述的那样,UDAF 处理程序类必须实施 Snowflake 用来调用 UDAF 的方法。无论您是使用 Snowpark API 注册 UDAF,还是 使用 CREATE FUNCTION 语句,借助 SQL 来创建,都可以使用编写的类作为处理程序。

UDAF 处理程序类实施了下表中列出的方法,Snowflake 在运行时会调用这些方法。请参阅 本主题中的示例

方法

要求

描述

__init__

必填

将汇总的内部状态初始化。

aggregate_state

必填

返回汇总的内部状态。

  • 此方法必须有一个 ` @property 装饰器 <https://docs.python.org/3.8/library/functions.html#property (https://docs.python.org/3.8/library/functions.html#property)>`_。

  • 汇总状态对象可以是任何可使用 ` Python pickle 库 <https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled (https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled)>`_ 序列化的 Python 数据类型。

  • 对于简单的汇总状态,可使用原始 Python 数据类型。对于更复杂的汇总状态,请使用 ` Python 数据类 <https://docs.python.org/3/library/dataclasses.html (https://docs.python.org/3/library/dataclasses.html)>`_。

accumulate

必填

根据新的输入行累积汇总的状态。

merge

必填

结合两个中间汇总状态。

finish

必填

根据汇总状态生成最终结果。

注册 UDAF

实施 UDAF 处理程序后,可以使用 Snowpark API 在 Snowflake 数据库上注册 UDAF。注册 UDAF 将创建 UDAF,以便可以调用它。

您可以像注册标量 UDF 一样,将 UDAF 注册为命名函数或匿名函数。有关注册标量 UDF 的相关信息,请参阅 创建匿名 UDF创建和注册命名的 UDF。注册 UDAF 时,指定 Snowflake 创建 UDAF 所需的参数值。

您可以使用以下函数和方法注册该函数:

示例

创建一个带有返回值和单参数的 UDAF

以下处理程序示例中的 Python 代码支持 sum_int UDAF,它会接收一个整型实参,将数值跨行相加并返回结果。

注册该函数

>>> import snowflake.snowpark as snowpark
>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udaf
>>> def main(session: snowpark.Session):
>>> class PythonSumUDAF:
...   def __init__(self):
...     # This aggregate state is a primitive Python data type.
...     self._partial_sum = 0
...
...   @property
...   def aggregate_state(self):
...     return self._partial_sum
...
...   def accumulate(self, input_value):
...     self._partial_sum += input_value
...
...   def merge(self, other_partial_sum):
...     self._partial_sum += other_partial_sum
...
...   def finish(self):
...     return self._partial_sum
>>> sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType()])
Copy

调用函数

下面示例中的 Python 代码借助 DataFrame 调用 sum_int UDAF。

>>> df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
>>> result = df.agg(sum_udaf("a")).collect()
>>> print(result.collect())
Copy

创建一个带有返回值和两个参数的 UDAF

注册该函数

下面处理程序示例中的 Python 代码支持 sum_int UDAF,它接收两个整型参数,将参数值跨行相加并返回结果。

>>> import snowflake.snowpark as snowpark
>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udaf
>>> def main(session: snowpark.Session):
>>>   class PythonSumUDAF:
...     def __init__(self):
...       self._partial_sum = 0
...
...     @property
...    def aggregate_state(self):
...      return self._partial_sum
...
...    def accumulate(self, input_value, input_value2):
...      self._partial_sum += input_value + input_value2
...
...    def merge(self, other_partial_sum):
...      self._partial_sum += other_partial_sum
...
...    def finish(self):
...      return self._partial_sum
>>> sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
Copy

调用函数

下面示例中的 Python 代码借助 DataFrame 调用 sum_int UDAF。

>>> df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
>>> result = df.agg(sum_udaf("a", "b"))
>>> print(result.collect())
Copy
语言: 中文