snowflake.snowpark.RelationalGroupedDataFrame.apply_in_pandas

RelationalGroupedDataFrame.apply_in_pandas(func: Callable, output_schema: StructType, **kwargs) DataFrame[source] (https://github.com/snowflakedb/snowpark-python/blob/v1.16.0/src/snowflake/snowpark/relational_grouped_dataframe.py#L266-L372)

Maps each grouped dataframe in to a pandas.DataFrame, applies the given function on data of each grouped dataframe, and returns a pandas.DataFrame. Internally, a vectorized UDTF with input func argument as the end_partition is registered and called. Additional kwargs are accepted to specify arguments to register the UDTF. Group by clause used must be column reference, not a general expression.

Requires pandas to be installed in the execution environment and declared as a dependency by either specifying the keyword argument packages=[“pandas] in this call or calling add_packages() beforehand.

Parameters:
  • func – A Python native function that accepts a single input argument - a pandas.DataFrame object and returns a pandas.Dataframe. It is used as input to end_partition in a vectorized UDTF.

  • output_schema – A StructType instance that represents the table function’s output columns.

  • input_names – A list of strings that represents the table function’s input column names. Optional, if unspecified, default column names will be ARG1, ARG2, etc.

  • kwargs – Additional arguments to register the vectorized UDTF. See register() for all options.

Examples::

Call apply_in_pandas using temporary UDTF:

>>> import pandas as pd
>>> from snowflake.snowpark.types import StructType, StructField, StringType, FloatType
>>> def convert(pandas_df):
...     return pandas_df.assign(TEMP_F = lambda x: x.TEMP_C * 9 / 5 + 32)
>>> df = session.createDataFrame([('SF', 21.0), ('SF', 17.5), ('SF', 24.0), ('NY', 30.9), ('NY', 33.6)],
...         schema=['location', 'temp_c'])
>>> df.group_by("location").apply_in_pandas(convert,
...     output_schema=StructType([StructField("location", StringType()),
...                               StructField("temp_c", FloatType()),
...                               StructField("temp_f", FloatType())])).order_by("temp_c").show()
---------------------------------------------
|"LOCATION"  |"TEMP_C"  |"TEMP_F"           |
---------------------------------------------
|SF          |17.5      |63.5               |
|SF          |21.0      |69.8               |
|SF          |24.0      |75.2               |
|NY          |30.9      |87.61999999999999  |
|NY          |33.6      |92.48              |
---------------------------------------------
Copy

Call apply_in_pandas using permanent UDTF with replacing original UDTF:

>>> from snowflake.snowpark.types import IntegerType, DoubleType
>>> _ = session.sql("create or replace temp stage mystage").collect()
>>> def group_sum(pdf):
...     return pd.DataFrame([(pdf.GRADE.iloc[0], pdf.DIVISION.iloc[0], pdf.VALUE.sum(), )])
...
>>> df = session.createDataFrame([('A', 2, 11.0), ('A', 2, 13.9), ('B', 5, 5.0), ('B', 2, 12.1)],
...                              schema=["grade", "division", "value"])
>>> df.group_by([df.grade, df.division] ).applyInPandas(
...     group_sum,
...     output_schema=StructType([StructField("grade", StringType()),
...                                        StructField("division", IntegerType()),
...                                        StructField("sum", DoubleType())]),
...                is_permanent=True, stage_location="@mystage", name="group_sum_in_pandas", replace=True
...            ).order_by("sum").show()
--------------------------------
|"GRADE"  |"DIVISION"  |"SUM"  |
--------------------------------
|B        |5           |5.0    |
|B        |2           |12.1   |
|A        |2           |24.9   |
--------------------------------
Copy
语言: 中文