snowflake.snowpark.udaf.UDAFRegistration¶
- class snowflake.snowpark.udaf.UDAFRegistration(session: Optional[Session])[source] (https://github.com/snowflakedb/snowpark-python/blob/v1.16.0/src/snowflake/snowpark/udaf.py#L108-L716)¶
Bases:
object
Provides methods to register lambdas and functions as UDAFs in the Snowflake database. For more information about Snowflake Python UDAFs, see Python UDAFs.
session.udaf
returns an object of this class. You can use this object to register UDAFs that you plan to use in the current session or permanently. The methods that register a UDAF return aUserDefinedAggregateFunction
object, which you can also use inColumn
expressions.Registering a UDAF is like registering a scalar UDF, you can use
register()
orsnowflake.snowpark.functions.udaf()
to explicitly register it. You can also use the decorator @udaf. They all usecloudpickle
to transfer the code from the client to the server. Another way is to useregister_from_file()
. Refer to modulesnowflake.snowpark.udaf.UDAFRegistration
for when to use them.To query a registered UDAF is the same as to query other aggregate functions. Refer to
agg()
. If you want to query a UDAF right after it’s created, you can call the createdUserDefinedAggregateFunction
instance like in Example 1 below.- Example 1
Create a temporary UDAF and call it:
>>> from snowflake.snowpark.types import IntegerType >>> from snowflake.snowpark.functions import call_function, col, udaf >>> class PythonSumUDAF: ... def __init__(self) -> None: ... self._sum = 0 ... ... @property ... def aggregate_state(self): ... return self._sum ... ... def accumulate(self, input_value): ... self._sum += input_value ... ... def merge(self, other_sum): ... self._sum += other_sum ... ... def finish(self): ... return self._sum
>>> sum_udaf = udaf(PythonSumUDAF, return_type=IntegerType(), input_types=[IntegerType()]) >>> df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b") >>> df.agg(sum_udaf("a").alias("sum_a")).collect() # Query it by calling it [Row(SUM_A=6)] >>> df.select(call_function(sum_udaf.name, col("a")).alias("sum_a")).collect() # Query it by using the name [Row(SUM_A=6)]
- Example 2
Create a UDAF with type hints and
@udaf
decorator and query it:>>> from snowflake.snowpark.functions import udaf >>> @udaf ... class PythonSumUDAF: ... def __init__(self) -> None: ... self._sum = 0 ... ... @property ... def aggregate_state(self) -> int: ... return self._sum ... ... def accumulate(self, input_value: int) -> None: ... self._sum += input_value ... ... def merge(self, other_sum: int) -> None: ... self._sum += other_sum ... ... def finish(self) -> int: ... return self._sum >>> df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b") >>> df.agg(PythonSumUDAF("a").alias("sum_a")).collect() # Query it by calling it [Row(SUM_A=6)] >>> df.select(call_function(PythonSumUDAF.name, col("a")).alias("sum_a")).collect() # Query it by using the name [Row(SUM_A=6)]
- Example 3
Create a permanent UDAF with a name and call it in SQL:
>>> from snowflake.snowpark.functions import udaf >>> _ = session.sql("create or replace temp stage mystage").collect() >>> @udaf(is_permanent=True, name="sum_udaf", replace=True, stage_location="@mystage") ... class PythonSumUDAF: ... def __init__(self) -> None: ... self._sum = 0 ... ... @property ... def aggregate_state(self) -> int: ... return self._sum ... ... def accumulate(self, input_value: int) -> None: ... self._sum += input_value ... ... def merge(self, other_sum: int) -> None: ... self._sum += other_sum ... ... def finish(self) -> int: ... return self._sum >>> session.sql("select sum_udaf(column1) as sum1 from values (1, 2), (2, 3)").collect() [Row(SUM1=3)]
- Example 4
Create a UDAF with UDF-level imports and type hints:
>>> from resources.test_udf_dir.test_udf_file import mod5 >>> from snowflake.snowpark.functions import udaf >>> @udaf(imports=[("tests/resources/test_udf_dir/test_udf_file.py", "resources.test_udf_dir.test_udf_file")]) ... class SumMod5UDAF: ... def __init__(self) -> None: ... self._sum = 0 ... ... @property ... def aggregate_state(self) -> int: ... return self._sum ... ... def accumulate(self, input_value: int) -> None: ... self._sum = mod5(self._sum + input_value) ... ... def merge(self, other_sum: int) -> None: ... self._sum = mod5(self._sum + other_sum) ... ... def finish(self) -> int: ... return self._sum >>> df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b") >>> df.agg(SumMod5UDAF("a").alias("sum_mod5_a")).collect() [Row(SUM_MOD5_A=1)]
- Example 5
Create a UDAF with UDF-level packages and type hints:
>>> import math >>> from snowflake.snowpark.functions import udaf >>> import numpy as np >>> @udaf(packages=["numpy"]) ... class SumSinUDAF: ... def __init__(self) -> None: ... self._sum = 0 ... ... @property ... def aggregate_state(self) -> float: ... return self._sum ... ... def accumulate(self, input_value: float) -> None: ... self._sum += input_value ... ... def merge(self, other_sum: float) -> None: ... self._sum += other_sum ... ... def finish(self) -> float: ... return np.sin(self._sum) >>> df = session.create_dataframe([[0.0], [0.5 * math.pi]]).to_df("a") >>> df.agg(SumSinUDAF("a").alias("sum_sin_a")).collect() [Row(SUM_SIN_A=1.0)]
- Example 6
Creating a UDAF from a local Python file:
>>> sum_udaf = session.udaf.register_from_file( ... file_path="tests/resources/test_udaf_dir/test_udaf_file.py", ... handler_name="MyUDAFWithTypeHints", ... ) >>> df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b") >>> df.agg(sum_udaf("a").alias("sum_a")).collect() [Row(SUM_A=6)]
- Example 7
Creating a UDAF from a Python file on an internal stage:
>>> from snowflake.snowpark.functions import udaf >>> from snowflake.snowpark.types import IntegerType >>> _ = session.sql("create or replace temp stage mystage").collect() >>> _ = session.file.put("tests/resources/test_udaf_dir/test_udaf_file.py", "@mystage", auto_compress=False) >>> sum_udaf = session.udaf.register_from_file( ... file_path="@mystage/test_udaf_file.py", ... handler_name="MyUDAFWithoutTypeHints", ... input_types=[IntegerType()], ... return_type=IntegerType(), ... ) >>> df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b") >>> df.agg(sum_udaf("a").alias("sum_a")).collect() [Row(SUM_A=6)]
See also
Methods
describe
(udaf_obj)Returns a
DataFrame
that describes the properties of a UDAF.register
(handler[, return_type, ...])Registers a Python function as a Snowflake Python UDAF and returns the UDAF.
register_from_file
(file_path, handler_name)Registers a Python class as a Snowflake Python UDAF from a Python or zip file, and returns the UDAF.