在 Snowpark Python 中调用函数和存储过程¶
若要处理 DataFrame 中的数据,可以调用系统定义的 SQL 函数、用户定义的函数和存储过程。本主题介绍如何在 Snowpark 中调用它们。
本主题内容:
若要处理 DataFrame 中的数据,可以调用系统定义的 SQL 函数、用户定义的函数和存储过程。
调用系统定义的函数¶
如需调用 系统定义的 SQL 函数,请使用 snowflake.snowpark.functions
模块中的等效函数。
下面的示例调用 functions
模块中的 upper
函数(与系统定义的 UPPER 函数等效),以使用大写字母形式返回 sample_product_data 表中 name 列中的值:
>>> # Import the upper function from the functions module.
>>> from snowflake.snowpark.functions import upper, col
>>> session.table("sample_product_data").select(upper(col("name")).alias("upper_name")).collect()
[Row(UPPER_NAME='PRODUCT 1'), Row(UPPER_NAME='PRODUCT 1A'), Row(UPPER_NAME='PRODUCT 1B'), Row(UPPER_NAME='PRODUCT 2'), Row(UPPER_NAME='PRODUCT 2A'), Row(UPPER_NAME='PRODUCT 2B'), Row(UPPER_NAME='PRODUCT 3'), Row(UPPER_NAME='PRODUCT 3A'), Row(UPPER_NAME='PRODUCT 3B'), Row(UPPER_NAME='PRODUCT 4'), Row(UPPER_NAME='PRODUCT 4A'), Row(UPPER_NAME='PRODUCT 4B')]
如果函数模块中没有系统定义的 SQL 函数,则可以使用以下方法之一:
使用
call_function
函数调用系统定义的函数。使用
function
函数,创建可用于调用系统定义的函数的一个函数对象。
call_function
和 function
在 snowflake.snowpark.functions
模块中定义。
对于 call_function
,将系统定义的函数的名称作为第一个实参传递。如果需要将列的值传递给系统定义的函数,请定义 Column 对象,并将其作为额外的实参传递给 call_function
函数。
下面的示例调用系统定义的函数 RADIANS,并传入来自 col1
列的值:
>>> # Import the call_function function from the functions module.
>>> from snowflake.snowpark.functions import call_function
>>> df = session.create_dataframe([[1, 2], [3, 4]], schema=["col1", "col2"])
>>> # Call the system-defined function RADIANS() on col1.
>>> df.select(call_function("radians", col("col1"))).collect()
[Row(RADIANS("COL1")=0.017453292519943295), Row(RADIANS("COL1")=0.05235987755982988)]
call_function
函数返回一个 Column
,可将其传递给 DataFrame 转换方法 <label-snowpark_python_dataframe_transform>`(例如 :code:`filter、select
等)。
对于 function
,传递系统定义的函数的名称,并使用返回的函数对象调用系统定义的函数。例如:
>>> # Import the call_function function from the functions module.
>>> from snowflake.snowpark.functions import function
>>> # Create a function object for the system-defined function RADIANS().
>>> radians = function("radians")
>>> df = session.create_dataframe([[1, 2], [3, 4]], schema=["col1", "col2"])
>>> # Call the system-defined function RADIANS() on col1.
>>> df.select(radians(col("col1"))).collect()
[Row(RADIANS("COL1")=0.017453292519943295), Row(RADIANS("COL1")=0.05235987755982988)]
调用用户定义的函数 (UDFs)¶
要调用您 按名称注册 的 UDFs,以及您通过执行 CREATE FUNCTION 创建的 UDFs,请使用 snowflake.snowpark.functions
模块中的 call_udf
函数。将 UDF 的名称作为第一个实参传递,将任何 UDF 参数作为额外的实参传递。
下面的示例调用 UDF 函数 minus_one
,并传入来自 col1
和 col2
列的值。该示例将来自 minus_one
的返回值传递给 DataFrame 的 select
方法。
>>> # Import the call_udf function from the functions module.
>>> from snowflake.snowpark.functions import call_udf
>>> # Runs the scalar function 'minus_one' on col1 of df.
>>> df = session.create_dataframe([[1, 2], [3, 4]], schema=["col1", "col2"])
>>> df.select(call_udf("minus_one", col("col1"))).collect()
[Row(MINUS_ONE("COL1")=0), Row(MINUS_ONE("COL1")=2)]
调用用户定义的表函数 (UDTFs)¶
要调用您按名称注册的 UDTFs,以及您通过执行 CREATE FUNCTION 创建的 UDTFs,请使用下列函数之一。两者都返回表示延迟计算的关系数据集的 DataFrame
。
请注意,您还可以使用它们来调用其他表函数,包括 系统定义的表函数。
有关注册 UDTF 的更多信息,请参阅 注册 UDTF。
若要在不指定横向联接的情况下调用 UDTF,请调用
snowflake.snowpark.Session
类中的table_function
函数。有关函数参考和示例,请参阅 Session.table_function。
以下示例中的代码使用
table_function
调用向udtf
函数注册的generator_udtf
函数。>>> from snowflake.snowpark.types import IntegerType, StructField, StructType >>> from snowflake.snowpark.functions import udtf, lit >>> class GeneratorUDTF: ... def process(self, n): ... for i in range(n): ... yield (i, ) >>> generator_udtf = udtf(GeneratorUDTF, output_schema=StructType([StructField("number", IntegerType())]), input_types=[IntegerType()]) >>> session.table_function(generator_udtf(lit(3))).collect() [Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
若要调用您的调用指定了横向联接的 UDTF,请使用
snowflake.snowpark.DataFrame
类中的join_table_function
函数。横向联接 UDTF 时,可指定 PARTITION BY 和 ORDER BY 子句。
有关函数参考和示例,请参阅 DataFrame.join_table_function。
以下示例中的代码执行横向联接,并指定:code:partition_by 和
order_by
参数。此示例中的代码首先调用snowflake.snowpark.functions.table_function
函数,以创建一个表示系统定义的SPLIT_TO_TABLE
函数的函数对象。join_table_function
随后调用的就是这个函数对象。有关
snowflake.snowpark.functions.table_function
函数参考,请参阅 table_function。有关SPLIT_TO_TABLE
函数参考,请参阅 SPLIT_TO_TABLE。>>> from snowflake.snowpark.functions import table_function >>> split_to_table = table_function("split_to_table") >>> df = session.create_dataframe([ ... ["John", "James", "address1 address2 address3"], ... ["Mike", "James", "address4 address5 address6"], ... ["Cathy", "Stone", "address4 address5 address6"], ... ], ... schema=["first_name", "last_name", "addresses"]) >>> df.join_table_function(split_to_table(df["addresses"], lit(" ")).over(partition_by="last_name", order_by="first_name")).show() ---------------------------------------------------------------------------------------- |"FIRST_NAME" |"LAST_NAME" |"ADDRESSES" |"SEQ" |"INDEX" |"VALUE" | ---------------------------------------------------------------------------------------- |John |James |address1 address2 address3 |1 |1 |address1 | |John |James |address1 address2 address3 |1 |2 |address2 | |John |James |address1 address2 address3 |1 |3 |address3 | |Mike |James |address4 address5 address6 |2 |1 |address4 | |Mike |James |address4 address5 address6 |2 |2 |address5 | |Mike |James |address4 address5 address6 |2 |3 |address6 | |Cathy |Stone |address4 address5 address6 |3 |1 |address4 | |Cathy |Stone |address4 address5 address6 |3 |2 |address5 | |Cathy |Stone |address4 address5 address6 |3 |3 |address6 | ----------------------------------------------------------------------------------------
调用存储过程¶
若要调用存储过程,请使用 Session
类的 call 方法。
>>> session.call("your_proc_name", 1)
0