You are viewing documentation about an older version (1.16.0). View latest version

snowflake.snowpark.udaf.UDAFRegistration

class snowflake.snowpark.udaf.UDAFRegistration(session: Optional[Session])[source] (https://github.com/snowflakedb/snowpark-python/blob/v1.16.0/src/snowflake/snowpark/udaf.py#L108-L716)

Bases: object

Provides methods to register lambdas and functions as UDAFs in the Snowflake database. For more information about Snowflake Python UDAFs, see Python UDAFs.

session.udaf returns an object of this class. You can use this object to register UDAFs that you plan to use in the current session or permanently. The methods that register a UDAF return a UserDefinedAggregateFunction object, which you can also use in Column expressions.

Registering a UDAF is like registering a scalar UDF, you can use register() or snowflake.snowpark.functions.udaf() to explicitly register it. You can also use the decorator @udaf. They all use cloudpickle to transfer the code from the client to the server. Another way is to use register_from_file(). Refer to module snowflake.snowpark.udaf.UDAFRegistration for when to use them.

To query a registered UDAF is the same as to query other aggregate functions. Refer to agg(). If you want to query a UDAF right after it’s created, you can call the created UserDefinedAggregateFunction instance like in Example 1 below.

Example 1

Create a temporary UDAF and call it:

>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import call_function, col, udaf
>>> class PythonSumUDAF:
...     def __init__(self) -> None:
...         self._sum = 0
...
...     @property
...     def aggregate_state(self):
...         return self._sum
...
...     def accumulate(self, input_value):
...         self._sum += input_value
...
...     def merge(self, other_sum):
...         self._sum += other_sum
...
...     def finish(self):
...         return self._sum
Copy
>>> sum_udaf = udaf(PythonSumUDAF, return_type=IntegerType(), input_types=[IntegerType()])
>>> df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
>>> df.agg(sum_udaf("a").alias("sum_a")).collect()  # Query it by calling it
[Row(SUM_A=6)]
>>> df.select(call_function(sum_udaf.name, col("a")).alias("sum_a")).collect()  # Query it by using the name
[Row(SUM_A=6)]
Copy
Example 2

Create a UDAF with type hints and @udaf decorator and query it:

>>> from snowflake.snowpark.functions import udaf
>>> @udaf
... class PythonSumUDAF:
...     def __init__(self) -> None:
...         self._sum = 0
...
...     @property
...     def aggregate_state(self) -> int:
...         return self._sum
...
...     def accumulate(self, input_value: int) -> None:
...         self._sum += input_value
...
...     def merge(self, other_sum: int) -> None:
...         self._sum += other_sum
...
...     def finish(self) -> int:
...         return self._sum
>>> df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
>>> df.agg(PythonSumUDAF("a").alias("sum_a")).collect()  # Query it by calling it
[Row(SUM_A=6)]
>>> df.select(call_function(PythonSumUDAF.name, col("a")).alias("sum_a")).collect()  # Query it by using the name
[Row(SUM_A=6)]
Copy
Example 3

Create a permanent UDAF with a name and call it in SQL:

>>> from snowflake.snowpark.functions import udaf
>>> _ = session.sql("create or replace temp stage mystage").collect()
>>> @udaf(is_permanent=True, name="sum_udaf", replace=True, stage_location="@mystage")
... class PythonSumUDAF:
...     def __init__(self) -> None:
...         self._sum = 0
...
...     @property
...     def aggregate_state(self) -> int:
...         return self._sum
...
...     def accumulate(self, input_value: int) -> None:
...         self._sum += input_value
...
...     def merge(self, other_sum: int) -> None:
...         self._sum += other_sum
...
...     def finish(self) -> int:
...         return self._sum
>>> session.sql("select sum_udaf(column1) as sum1 from values (1, 2), (2, 3)").collect()
[Row(SUM1=3)]
Copy
Example 4

Create a UDAF with UDF-level imports and type hints:

>>> from resources.test_udf_dir.test_udf_file import mod5
>>> from snowflake.snowpark.functions import udaf
>>> @udaf(imports=[("tests/resources/test_udf_dir/test_udf_file.py", "resources.test_udf_dir.test_udf_file")])
... class SumMod5UDAF:
...     def __init__(self) -> None:
...         self._sum = 0
...
...     @property
...     def aggregate_state(self) -> int:
...         return self._sum
...
...     def accumulate(self, input_value: int) -> None:
...         self._sum = mod5(self._sum + input_value)
...
...     def merge(self, other_sum: int) -> None:
...         self._sum = mod5(self._sum + other_sum)
...
...     def finish(self) -> int:
...         return self._sum
>>> df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
>>> df.agg(SumMod5UDAF("a").alias("sum_mod5_a")).collect()
[Row(SUM_MOD5_A=1)]
Copy
Example 5

Create a UDAF with UDF-level packages and type hints:

>>> import math
>>> from snowflake.snowpark.functions import udaf
>>> import numpy as np
>>> @udaf(packages=["numpy"])
... class SumSinUDAF:
...     def __init__(self) -> None:
...         self._sum = 0
...
...     @property
...     def aggregate_state(self) -> float:
...         return self._sum
...
...     def accumulate(self, input_value: float) -> None:
...         self._sum += input_value
...
...     def merge(self, other_sum: float) -> None:
...         self._sum += other_sum
...
...     def finish(self) -> float:
...         return np.sin(self._sum)
>>> df = session.create_dataframe([[0.0], [0.5 * math.pi]]).to_df("a")
>>> df.agg(SumSinUDAF("a").alias("sum_sin_a")).collect()
[Row(SUM_SIN_A=1.0)]
Copy
Example 6

Creating a UDAF from a local Python file:

>>> sum_udaf = session.udaf.register_from_file(
...     file_path="tests/resources/test_udaf_dir/test_udaf_file.py",
...     handler_name="MyUDAFWithTypeHints",
... )
>>> df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
>>> df.agg(sum_udaf("a").alias("sum_a")).collect()
[Row(SUM_A=6)]
Copy
Example 7

Creating a UDAF from a Python file on an internal stage:

>>> from snowflake.snowpark.functions import udaf
>>> from snowflake.snowpark.types import IntegerType
>>> _ = session.sql("create or replace temp stage mystage").collect()
>>> _ = session.file.put("tests/resources/test_udaf_dir/test_udaf_file.py", "@mystage", auto_compress=False)
>>> sum_udaf = session.udaf.register_from_file(
...     file_path="@mystage/test_udaf_file.py",
...     handler_name="MyUDAFWithoutTypeHints",
...     input_types=[IntegerType()],
...     return_type=IntegerType(),
... )
>>> df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
>>> df.agg(sum_udaf("a").alias("sum_a")).collect()
[Row(SUM_A=6)]
Copy

Methods

describe(udaf_obj)

Returns a DataFrame that describes the properties of a UDAF.

register(handler[, return_type, ...])

Registers a Python function as a Snowflake Python UDAF and returns the UDAF.

register_from_file(file_path, handler_name)

Registers a Python class as a Snowflake Python UDAF from a Python or zip file, and returns the UDAF.

Language: English