snowflake.snowpark.udtf.UDTFRegistration¶
- class snowflake.snowpark.udtf.UDTFRegistration(session: Optional[Session])[source] (https://github.com/snowflakedb/snowpark-python/blob/v1.16.0/src/snowflake/snowpark/udtf.py#L100-L970)¶
Bases:
object
Provides methods to register classes as UDTFs in the Snowflake database. For more information about Snowflake Python UDTFs, see Python UDTFs.
session.udtf
returns an object of this class. You can use this object to register UDTFs that you plan to use in the current session or permanently. The methods that register a UDTF return aUserDefinedTableFunction
object, which you can also use to call the UDTF.Registering a UDTF is like registering a scalar UDF, you can use
register()
orsnowflake.snowpark.functions.udtf()
to explicitly register it. You can also use the decorator @udtf. They all usecloudpickle
to transfer the code from the client to the server. Another way is to useregister_from_file()
. Refer to modulesnowflake.snowpark.udtf.UDTFRegistration
for when to use them.To query a registered UDTF is the same as to query other table functions. Refer to
table_function()
andjoin_table_function()
. If you want to query a UDTF right after it’s created, you can call the createdUserDefinedTableFunction
instance like in Example 1 below.- Example 1
Create a temporary UDTF and call it:
>>> from snowflake.snowpark.types import IntegerType, StructField, StructType >>> from snowflake.snowpark.functions import udtf, lit >>> class GeneratorUDTF: ... def process(self, n): ... for i in range(n): ... yield (i, ) >>> generator_udtf = udtf(GeneratorUDTF, output_schema=StructType([StructField("number", IntegerType())]), input_types=[IntegerType()]) >>> session.table_function(generator_udtf(lit(3))).collect() # Query it by calling it [Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)] >>> session.table_function(generator_udtf.name, lit(3)).collect() # Query it by using the name [Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)] >>> # Or you can lateral-join a UDTF like any other table functions >>> df = session.create_dataframe([2, 3], schema=["c"]) >>> df.join_table_function(generator_udtf(df["c"])).sort("c", "number").show() ------------------ |"C" |"NUMBER" | ------------------ |2 |0 | |2 |1 | |3 |0 | |3 |1 | |3 |2 | ------------------
- Example 2
Create a UDTF with type hints and
@udtf
decorator and query it:>>> from snowflake.snowpark.types import IntegerType, StructField, StructType >>> from snowflake.snowpark.functions import udtf, lit >>> @udtf(output_schema=["number"]) ... class generator_udtf: ... def process(self, n: int) -> Iterable[Tuple[int]]: ... for i in range(n): ... yield (i, ) >>> session.table_function(generator_udtf(lit(3))).collect() # Query it by calling it [Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)] >>> session.table_function(generator_udtf.name, lit(3)).collect() # Query it by using the name [Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
- Example 3
Create a permanent UDTF with a name and call it in SQL:
>>> from snowflake.snowpark.types import IntegerType, StructField, StructType >>> from snowflake.snowpark.functions import udtf, lit >>> _ = session.sql("create or replace temp stage mystage").collect() >>> class GeneratorUDTF: ... def process(self, n): ... for i in range(n): ... yield (i, ) >>> generator_udtf = udtf( ... GeneratorUDTF, output_schema=StructType([StructField("number", IntegerType())]), input_types=[IntegerType()], ... is_permanent=True, name="generator_udtf", replace=True, stage_location="@mystage" ... ) >>> session.sql("select * from table(generator_udtf(3))").collect() [Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
- Example 4
Create a UDTF with type hints:
>>> from snowflake.snowpark.types import IntegerType, StructField, StructType >>> from snowflake.snowpark.functions import udtf, lit >>> @udtf(output_schema=["n1", "n2"]) ... class generator_udtf: ... def process(self, n: int) -> Iterable[Tuple[int, int]]: ... for i in range(n): ... yield (i, i+1) >>> session.table_function(generator_udtf(lit(3))).collect() [Row(N1=0, N2=1), Row(N1=1, N2=2), Row(N1=2, N2=3)]
- Example 5
Create a UDTF with type hints by using
...
for multiple columns of the same type:>>> from snowflake.snowpark.types import IntegerType, StructField, StructType >>> from snowflake.snowpark.functions import udtf, lit >>> @udtf(output_schema=["n1", "n2"]) ... class generator_udtf: ... def process(self, n: int) -> Iterable[Tuple[int, ...]]: ... for i in range(n): ... yield (i, i+1) >>> session.table_function(generator_udtf(lit(3))).collect() [Row(N1=0, N2=1), Row(N1=1, N2=2), Row(N1=2, N2=3)]
- Example 6
Create a UDTF with UDF-level imports and type hints:
>>> from resources.test_udf_dir.test_udf_file import mod5 >>> from snowflake.snowpark.types import IntegerType, StructField, StructType >>> from snowflake.snowpark.functions import udtf, lit >>> @udtf(output_schema=["number"], imports=[("tests/resources/test_udf_dir/test_udf_file.py", "resources.test_udf_dir.test_udf_file")]) ... class generator_udtf: ... def process(self, n: int) -> Iterable[Tuple[int]]: ... for i in range(n): ... yield (mod5(i), ) >>> session.table_function(generator_udtf(lit(6))).collect() [Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2), Row(NUMBER=3), Row(NUMBER=4), Row(NUMBER=0)]
- Example 7
Create a UDTF with UDF-level packages and type hints:
>>> from snowflake.snowpark.types import IntegerType, StructField, StructType >>> from snowflake.snowpark.functions import udtf, lit >>> import numpy as np >>> @udtf(output_schema=["number"], packages=["numpy"]) ... class generator_udtf: ... def process(self, n: int) -> Iterable[Tuple[int]]: ... for i in np.arange(n): ... yield (i, ) >>> session.table_function(generator_udtf(lit(3))).collect() [Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
- Example 8
Creating a UDTF with the constructor and
end_partition
method.>>> from collections import Counter >>> from typing import Iterable, Tuple >>> from snowflake.snowpark.functions import lit >>> class MyWordCount: ... def __init__(self) -> None: ... self._total_per_partition = 0 ... ... def process(self, s1: str) -> Iterable[Tuple[str, int]]: ... words = s1.split() ... self._total_per_partition = len(words) ... counter = Counter(words) ... yield from counter.items() ... ... def end_partition(self): ... yield ("partition_total", self._total_per_partition)
>>> udtf_name = "word_count_udtf" >>> word_count_udtf = session.udtf.register( ... MyWordCount, ["word", "count"], name=udtf_name, is_permanent=False, replace=True ... ) >>> # Call it by its name >>> df1 = session.table_function(udtf_name, lit("w1 w2 w2 w3 w3 w3")) >>> df1.show() ----------------------------- |"WORD" |"COUNT" | ----------------------------- |w1 |1 | |w2 |2 | |w3 |3 | |partition_total |6 | -----------------------------
>>> # Call it by the returned callable instance >>> df2 = session.table_function(word_count_udtf(lit("w1 w2 w2 w3 w3 w3"))) >>> df2.show() ----------------------------- |"WORD" |"COUNT" | ----------------------------- |w1 |1 | |w2 |2 | |w3 |3 | |partition_total |6 | -----------------------------
- Example 9
Creating a UDTF from a local Python file:
>>> from snowflake.snowpark.types import IntegerType, StructField, StructType >>> from snowflake.snowpark.functions import udtf, lit >>> generator_udtf = session.udtf.register_from_file( ... file_path="tests/resources/test_udtf_dir/test_udtf_file.py", ... handler_name="GeneratorUDTF", ... output_schema=StructType([StructField("number", IntegerType())]), ... input_types=[IntegerType()] ... ) >>> session.table_function(generator_udtf(lit(3))).collect() [Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
- Example 10
Creating a UDTF from a Python file on an internal stage:
>>> from snowflake.snowpark.types import IntegerType, StructField, StructType >>> from snowflake.snowpark.functions import udtf, lit >>> _ = session.sql("create or replace temp stage mystage").collect() >>> _ = session.file.put("tests/resources/test_udtf_dir/test_udtf_file.py", "@mystage", auto_compress=False) >>> generator_udtf = session.udtf.register_from_file( ... file_path="@mystage/test_udtf_file.py", ... handler_name="GeneratorUDTF", ... output_schema=StructType([StructField("number", IntegerType())]), ... input_types=[IntegerType()] ... ) >>> session.table_function(generator_udtf(lit(3))).collect() [Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
You can use
udtf()
,register()
orpandas_udtf()
to create a vectorized UDTF by providing appropriate return and input types. If you would like to useregister_from_file()
to create a vectorized UDTF, you would need to explicitly mark the handler method as vectorized using either the decorator@vectorized(input=pandas.DataFrame)
or setting<class>.end_partition._sf_vectorized_input = pandas.DataFrame
- Example 11
Creating a vectorized UDTF by specifying a
PandasDataFrameType
asinput_types
and aPandasDataFrameType
with column names asoutput_schema
.>>> from snowflake.snowpark.types import PandasDataFrameType, IntegerType, StringType, FloatType >>> class multiply: ... def __init__(self): ... self.multiplier = 10 ... def end_partition(self, df): ... df.col1 = df.col1*self.multiplier ... df.col2 = df.col2*self.multiplier ... yield df >>> multiply_udtf = session.udtf.register( ... multiply, ... output_schema=PandasDataFrameType([StringType(), IntegerType(), FloatType()], ["id_", "col1_", "col2_"]), ... input_types=[PandasDataFrameType([StringType(), IntegerType(), FloatType()])], ... input_names = ['"id"', '"col1"', '"col2"'], ... ) >>> df = session.create_dataframe([['x', 3, 35.9],['x', 9, 20.5]], schema=["id", "col1", "col2"]) >>> df.select(multiply_udtf("id", "col1", "col2").over(partition_by=["id"])).sort("col1_").show() ----------------------------- |"ID_" |"COL1_" |"COL2_" | ----------------------------- |x |30 |359.0 | |x |90 |205.0 | -----------------------------
- Example 12
Creating a vectorized UDTF by specifying
PandasDataFrame
with nested types as type hints.>>> from snowflake.snowpark.types import PandasDataFrame >>> class multiply: ... def __init__(self): ... self.multiplier = 10 ... def end_partition(self, df: PandasDataFrame[str, int, float]) -> PandasDataFrame[str, int, float]: ... df.col1 = df.col1*self.multiplier ... df.col2 = df.col2*self.multiplier ... yield df >>> multiply_udtf = session.udtf.register( ... multiply, ... output_schema=["id_", "col1_", "col2_"], ... input_names = ['"id"', '"col1"', '"col2"'], ... ) >>> df = session.create_dataframe([['x', 3, 35.9],['x', 9, 20.5]], schema=["id", "col1", "col2"]) >>> df.select(multiply_udtf("id", "col1", "col2").over(partition_by=["id"])).sort("col1_").show() ----------------------------- |"ID_" |"COL1_" |"COL2_" | ----------------------------- |x |30 |359.0 | |x |90 |205.0 | -----------------------------
- Example 13
Creating a vectorized UDTF by specifying a
pandas.DataFrame
as type hints and aStructType
with type information and column names asoutput_schema
.>>> import pandas as pd >>> from snowflake.snowpark.types import IntegerType, StringType, FloatType, StructType, StructField >>> class multiply: ... def __init__(self): ... self.multiplier = 10 ... def end_partition(self, df: pd.DataFrame) -> pd.DataFrame: ... df.col1 = df.col1*self.multiplier ... df.col2 = df.col2*self.multiplier ... yield df >>> multiply_udtf = session.udtf.register( ... multiply, ... output_schema=StructType([StructField("id_", StringType()), StructField("col1_", IntegerType()), StructField("col2_", FloatType())]), ... input_types=[StringType(), IntegerType(), FloatType()], ... input_names = ['"id"', '"col1"', '"col2"'], ... ) >>> df = session.create_dataframe([['x', 3, 35.9],['x', 9, 20.5]], schema=["id", "col1", "col2"]) >>> df.select(multiply_udtf("id", "col1", "col2").over(partition_by=["id"])).sort("col1_").show() ----------------------------- |"ID_" |"COL1_" |"COL2_" | ----------------------------- |x |30 |359.0 | |x |90 |205.0 | -----------------------------
- Example 14
Same as Example 12, but does not specify input_names and instead set the column names in end_partition.
>>> from snowflake.snowpark.types import PandasDataFrameType, IntegerType, StringType, FloatType >>> class multiply: ... def __init__(self): ... self.multiplier = 10 ... def end_partition(self, df): ... df.columns = ["id", "col1", "col2"] ... df.col1 = df.col1*self.multiplier ... df.col2 = df.col2*self.multiplier ... yield df >>> multiply_udtf = session.udtf.register( ... multiply, ... output_schema=PandasDataFrameType([StringType(), IntegerType(), FloatType()], ["id_", "col1_", "col2_"]), ... input_types=[PandasDataFrameType([StringType(), IntegerType(), FloatType()])], ... ) >>> df = session.create_dataframe([['x', 3, 35.9],['x', 9, 20.5]], schema=["id", "col1", "col2"]) >>> df.select(multiply_udtf("id", "col1", "col2").over(partition_by=["id"])).sort("col1_").show() ----------------------------- |"ID_" |"COL1_" |"COL2_" | ----------------------------- |x |30 |359.0 | |x |90 |205.0 | -----------------------------
[Preview Feature] The syntax for declaring UDTF with a vectorized process method is similar to above. Defining
__init__
andend_partition
methods are optional. Theprocess
method only accepts one argument which is the pandas Dataframe object, and outputs the same number of rows as is in the given input. Both__init__
andend_partition
do not take any additional arguments.- Example 15
Vectorized UDTF process method without end_partition
>>> class multiply: ... def process(self, df: PandasDataFrame[str,int, float]) -> PandasDataFrame[int]: ... return (df['col1'] * 10, ) >>> multiply_udtf = session.udtf.register( ... multiply, ... output_schema=["col1x10"], ... input_names=['"id"', '"col1"', '"col2"'] ... ) >>> df = session.create_dataframe([['x', 3, 35.9],['x', 9, 20.5]], schema=["id", "col1", "col2"]) >>> df.select("id", "col1", "col2", multiply_udtf("id", "col1", "col2")).order_by("col1").show() -------------------------------------- |"ID" |"COL1" |"COL2" |"COL1X10" | -------------------------------------- |x |3 |35.9 |30 | |x |9 |20.5 |90 | --------------------------------------
- Example 16
Vectorized UDTF process method with end_partition
>>> class mean: ... def __init__(self) -> None: ... self.sum = 0 ... self.len = 0 ... def process(self, df: pd.DataFrame) -> pd.DataFrame: ... self.sum += df['value'].sum() ... self.len += len(df) ... return ([None] * len(df),) ... def end_partition(self): ... return ([self.sum / self.len],) >>> mean_udtf = session.udtf.register(mean, ... output_schema=StructType([StructField("mean", FloatType())]), ... input_types=[StringType(), IntegerType()], ... input_names=['"name"', '"value"']) >>> df = session.create_dataframe([["x", 10], ["x", 20], ["x", 33], ["y", 10], ["y", 25], ], schema=["name", "value"]) >>> df.select("name", "value", mean_udtf("name", "value").over(partition_by="name")).order_by("name", "value").show() ----------------------------- |"NAME" |"VALUE" |"MEAN" | ----------------------------- |x |NULL |21.0 | |x |10 |NULL | |x |20 |NULL | |x |33 |NULL | |y |NULL |17.5 | |y |10 |NULL | |y |25 |NULL | -----------------------------
- Example 17
Vectorized UDTF process method with end_partition and max_batch_size
>>> class sum: ... def __init__(self): ... self.sum = None ... def process(self, df): ... if self.sum is None: ... self.sum = df ... else: ... self.sum += df ... return df ... def end_partition(self): ... return self.sum >>> sum_udtf = session.udtf.register(sum, ... output_schema=PandasDataFrameType([StringType(), IntegerType()], ["id_", "col1_"]), ... input_types=[PandasDataFrameType([StringType(), IntegerType()])], ... max_batch_size=1) >>> df = session.create_dataframe([["x", 10], ["x", 20], ["x", 33], ["y", 10], ["y", 25], ], schema=["id", "col1"]) >>> df.select("id", "col1", sum_udtf("id", "col1").over(partition_by="id")).order_by("id", "col1").show() ----------------------------------- |"ID" |"COL1" |"ID_" |"COL1_" | ----------------------------------- |x |NULL |xxx |63 | |x |10 |x |10 | |x |20 |x |20 | |x |33 |x |33 | |y |NULL |yy |35 | |y |10 |y |10 | |y |25 |y |25 | -----------------------------------
See also
Methods
register
(handler, output_schema[, ...])Registers a Python class as a Snowflake Python UDTF and returns the UDTF.
register_from_file
(file_path, handler_name, ...)Registers a Python class as a Snowflake Python UDTF from a Python or zip file, and returns the UDTF.