在 Snowpark Python 中调用函数和存储过程

若要处理 DataFrame 中的数据,可以调用系统定义的 SQL 函数、用户定义的函数和存储过程。本主题介绍如何在 Snowpark 中调用它们。

本主题内容:

若要处理 DataFrame 中的数据,可以调用系统定义的 SQL 函数、用户定义的函数和存储过程。

调用系统定义的函数

如需调用 系统定义的 SQL 函数,请使用 snowflake.snowpark.functions 模块中的等效函数。

下面的示例调用 functions 模块中的 upper 函数(与系统定义的 UPPER 函数等效),以使用大写字母形式返回 sample_product_data 表中 name 列中的值:

>>> # Import the upper function from the functions module.
>>> from snowflake.snowpark.functions import upper, col
>>> session.table("sample_product_data").select(upper(col("name")).alias("upper_name")).collect()
[Row(UPPER_NAME='PRODUCT 1'), Row(UPPER_NAME='PRODUCT 1A'), Row(UPPER_NAME='PRODUCT 1B'), Row(UPPER_NAME='PRODUCT 2'), Row(UPPER_NAME='PRODUCT 2A'), Row(UPPER_NAME='PRODUCT 2B'), Row(UPPER_NAME='PRODUCT 3'), Row(UPPER_NAME='PRODUCT 3A'), Row(UPPER_NAME='PRODUCT 3B'), Row(UPPER_NAME='PRODUCT 4'), Row(UPPER_NAME='PRODUCT 4A'), Row(UPPER_NAME='PRODUCT 4B')]
Copy

如果函数模块中没有系统定义的 SQL 函数,则可以使用以下方法之一:

  • 使用 call_function 函数调用系统定义的函数。

  • 使用 function 函数,创建可用于调用系统定义的函数的一个函数对象。

call_functionfunctionsnowflake.snowpark.functions 模块中定义。

对于 call_function,将系统定义的函数的名称作为第一个实参传递。如果需要将列的值传递给系统定义的函数,请定义 Column 对象,并将其作为额外的实参传递给 call_function 函数。

下面的示例调用系统定义的函数 RADIANS,并传入来自 col1 列的值:

>>> # Import the call_function function from the functions module.
>>> from snowflake.snowpark.functions import call_function
>>> df = session.create_dataframe([[1, 2], [3, 4]], schema=["col1", "col2"])
>>> # Call the system-defined function RADIANS() on col1.
>>> df.select(call_function("radians", col("col1"))).collect()
[Row(RADIANS("COL1")=0.017453292519943295), Row(RADIANS("COL1")=0.05235987755982988)]
Copy

call_function 函数返回一个 Column,可将其传递给 DataFrame 转换方法 <label-snowpark_python_dataframe_transform>`(例如 :code:`filterselect 等)。

对于 function,传递系统定义的函数的名称,并使用返回的函数对象调用系统定义的函数。例如:

>>> # Import the call_function function from the functions module.
>>> from snowflake.snowpark.functions import function

>>> # Create a function object for the system-defined function RADIANS().
>>> radians = function("radians")
>>> df = session.create_dataframe([[1, 2], [3, 4]], schema=["col1", "col2"])
>>> # Call the system-defined function RADIANS() on col1.
>>> df.select(radians(col("col1"))).collect()
[Row(RADIANS("COL1")=0.017453292519943295), Row(RADIANS("COL1")=0.05235987755982988)]
Copy

调用用户定义的函数 (UDFs)

要调用您 按名称注册 的 UDFs,以及您通过执行 CREATE FUNCTION 创建的 UDFs,请使用 snowflake.snowpark.functions 模块中的 call_udf 函数。将 UDF 的名称作为第一个实参传递,将任何 UDF 参数作为额外的实参传递。

下面的示例调用 UDF 函数 minus_one,并传入来自 col1col2 列的值。该示例将来自 minus_one 的返回值传递给 DataFrame 的 select 方法。

>>> # Import the call_udf function from the functions module.
>>> from snowflake.snowpark.functions import call_udf

>>> # Runs the scalar function 'minus_one' on col1 of df.
>>> df = session.create_dataframe([[1, 2], [3, 4]], schema=["col1", "col2"])
>>> df.select(call_udf("minus_one", col("col1"))).collect()
[Row(MINUS_ONE("COL1")=0), Row(MINUS_ONE("COL1")=2)]
Copy

调用用户定义的表函数 (UDTFs)

要调用您按名称注册的 UDTFs,以及您通过执行 CREATE FUNCTION 创建的 UDTFs,请使用下列函数之一。两者都返回表示延迟计算的关系数据集的 DataFrame

请注意,您还可以使用它们来调用其他表函数,包括 系统定义的表函数

有关注册 UDTF 的更多信息,请参阅 注册 UDTF

  • 若要在不指定横向联接的情况下调用 UDTF,请调用 snowflake.snowpark.Session 类中的 table_function 函数。

    有关函数参考和示例,请参阅 Session.table_function

    以下示例中的代码使用 table_function 调用向 udtf 函数注册的 generator_udtf 函数。

    >>> from snowflake.snowpark.types import IntegerType, StructField, StructType
    >>> from snowflake.snowpark.functions import udtf, lit
    >>> class GeneratorUDTF:
    ...     def process(self, n):
    ...         for i in range(n):
    ...             yield (i, )
    >>> generator_udtf = udtf(GeneratorUDTF, output_schema=StructType([StructField("number", IntegerType())]), input_types=[IntegerType()])
    >>> session.table_function(generator_udtf(lit(3))).collect()
    [Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
    
    Copy
  • 若要调用您的调用指定了横向联接的 UDTF,请使用 snowflake.snowpark.DataFrame 类中的 join_table_function 函数。

    横向联接 UDTF 时,可指定 PARTITION BY 和 ORDER BY 子句。

    有关函数参考和示例,请参阅 DataFrame.join_table_function

    以下示例中的代码执行横向联接,并指定:code:partition_byorder_by 参数。此示例中的代码首先调用 snowflake.snowpark.functions.table_function 函数,以创建一个表示系统定义的 SPLIT_TO_TABLE 函数的函数对象。join_table_function 随后调用的就是这个函数对象。

    有关 snowflake.snowpark.functions.table_function 函数参考,请参阅 table_function。有关 SPLIT_TO_TABLE 函数参考,请参阅 SPLIT_TO_TABLE

    >>> from snowflake.snowpark.functions import table_function
    >>> split_to_table = table_function("split_to_table")
    >>> df = session.create_dataframe([
    ...     ["John", "James", "address1 address2 address3"],
    ...     ["Mike", "James", "address4 address5 address6"],
    ...     ["Cathy", "Stone", "address4 address5 address6"],
    ... ],
    ... schema=["first_name", "last_name", "addresses"])
    >>> df.join_table_function(split_to_table(df["addresses"], lit(" ")).over(partition_by="last_name", order_by="first_name")).show()
    ----------------------------------------------------------------------------------------
    |"FIRST_NAME"  |"LAST_NAME"  |"ADDRESSES"                 |"SEQ"  |"INDEX"  |"VALUE"   |
    ----------------------------------------------------------------------------------------
    |John          |James        |address1 address2 address3  |1      |1        |address1  |
    |John          |James        |address1 address2 address3  |1      |2        |address2  |
    |John          |James        |address1 address2 address3  |1      |3        |address3  |
    |Mike          |James        |address4 address5 address6  |2      |1        |address4  |
    |Mike          |James        |address4 address5 address6  |2      |2        |address5  |
    |Mike          |James        |address4 address5 address6  |2      |3        |address6  |
    |Cathy         |Stone        |address4 address5 address6  |3      |1        |address4  |
    |Cathy         |Stone        |address4 address5 address6  |3      |2        |address5  |
    |Cathy         |Stone        |address4 address5 address6  |3      |3        |address6  |
    ----------------------------------------------------------------------------------------
    
    Copy

调用存储过程

若要调用存储过程,请使用 Session 类的 call 方法。

>>> session.call("your_proc_name", 1)  
 0
Copy
语言: 中文