snowflake.snowpark.RelationalGroupedDataFrame.applyInPandas¶
- RelationalGroupedDataFrame.applyInPandas(func: Callable, output_schema: StructType, **kwargs) DataFrame [source] (https://github.com/snowflakedb/snowpark-python/blob/v1.26.0/snowpark-python/src/snowflake/snowpark/relational_grouped_dataframe.py#L324-L456)¶
Maps each grouped dataframe in to a pandas.DataFrame, applies the given function on data of each grouped dataframe, and returns a pandas.DataFrame. Internally, a vectorized UDTF with input
func
argument as theend_partition
is registered and called. Additionalkwargs
are accepted to specify arguments to register the UDTF. Group by clause used must be column reference, not a general expression.Requires
pandas
to be installed in the execution environment and declared as a dependency by either specifying the keyword argument packages=[“pandas] in this call or callingadd_packages()
beforehand.- Parameters:
func – A Python native function that accepts a single input argument - a
pandas.DataFrame
object and returns apandas.Dataframe
. It is used as input toend_partition
in a vectorized UDTF.output_schema – A
StructType
instance that represents the table function’s output columns.input_names – A list of strings that represents the table function’s input column names. Optional, if unspecified, default column names will be ARG1, ARG2, etc.
kwargs – Additional arguments to register the vectorized UDTF. See
register()
for all options.
- Examples::
Call
apply_in_pandas
using temporary UDTF:>>> import pandas as pd >>> from snowflake.snowpark.types import StructType, StructField, StringType, FloatType >>> def convert(pandas_df): ... return pandas_df.assign(TEMP_F = lambda x: x.TEMP_C * 9 / 5 + 32) >>> df = session.createDataFrame([('SF', 21.0), ('SF', 17.5), ('SF', 24.0), ('NY', 30.9), ('NY', 33.6)], ... schema=['location', 'temp_c']) >>> df.group_by("location").apply_in_pandas(convert, ... output_schema=StructType([StructField("location", StringType()), ... StructField("temp_c", FloatType()), ... StructField("temp_f", FloatType())])).order_by("temp_c").show() --------------------------------------------- |"LOCATION" |"TEMP_C" |"TEMP_F" | --------------------------------------------- |SF |17.5 |63.5 | |SF |21.0 |69.8 | |SF |24.0 |75.2 | |NY |30.9 |87.61999999999999 | |NY |33.6 |92.48 | ---------------------------------------------
Call
apply_in_pandas
using permanent UDTF with replacing original UDTF:>>> from snowflake.snowpark.types import IntegerType, DoubleType >>> _ = session.sql("create or replace temp stage mystage").collect() >>> def group_sum(pdf): ... return pd.DataFrame([(pdf.GRADE.iloc[0], pdf.DIVISION.iloc[0], pdf.VALUE.sum(), )]) ... >>> df = session.createDataFrame([('A', 2, 11.0), ('A', 2, 13.9), ('B', 5, 5.0), ('B', 2, 12.1)], ... schema=["grade", "division", "value"]) >>> df.group_by([df.grade, df.division] ).applyInPandas( ... group_sum, ... output_schema=StructType([StructField("grade", StringType()), ... StructField("division", IntegerType()), ... StructField("sum", DoubleType())]), ... is_permanent=True, stage_location="@mystage", name="group_sum_in_pandas", replace=True ... ).order_by("sum").show() -------------------------------- |"GRADE" |"DIVISION" |"SUM" | -------------------------------- |B |5 |5.0 | |B |2 |12.1 | |A |2 |24.9 | --------------------------------
See also