在 Snowpark Scala 中调用函数和存储过程

若要处理 DataFrame 中的数据,可以调用系统定义的 SQL 函数、用户定义的函数和存储过程。本主题介绍如何在 Snowpark 中调用它们。

本主题内容:

调用系统定义的函数

如果需要调用 系统定义的 SQL 函数,请使用 ` com.snowflake.snowpark.functions 对象 `_ 中的等效函数。

下面的示例调用 functions 对象中的 upper 函数(与系统定义的 UPPER 函数等效),从而以大写字母形式返回 name 列中的值:

// Import the upper function from the functions object.
import com.snowflake.snowpark.functions._
...
session.table("products").select(upper(col("name"))).show()
Copy

如果系统定义的 SQL 函数在 functions 对象中不可用,则可以使用以下方法之一:

  • 使用 callBuiltin 函数调用系统定义的函数。

  • 使用 builtin 函数,创建可用于调用系统定义的函数的一个函数对象。

callBuiltinbuiltincom.snowflake.snowpark.functions 对象中定义。

对于 callBuiltin,将系统定义的函数的名称作为第一个实参传递。如果需要将列的值传递给系统定义的函数,请定义 Column 对象,并将其作为额外的实参传递给 callBuiltin 函数。

下面的示例调用系统定义的函数 RADIANS,并传入来自 col1 列的值:

// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Call the system-defined function RADIANS() on col1.
val result = df.select(callBuiltin("radians", col("col1"))).collect()
Copy

callBuiltin 函数返回一个 Column,可将其传递给 DataFrame 转换方法 (例如 filter、select 等)。

对于 builtin,传递系统定义的函数的名称,并使用返回的函数对象调用系统定义的函数。例如:

// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Create a function object for the system-defined function RADIANS().
val radians = builtin("radians")
// Call the system-defined function RADIANS() on col1.
val result = df.select(radians(col("col1"))).collect()
Copy

调用标量用户定义的函数 (UDFs)

调用 UDF 的方法取决于 UDF 的创建方式:

  • 要调用 匿名 UDF,请调用创建 UDF 时返回的 ` UserDefinedFunction _ 对象的 :code:`apply 方法。

    传递给 UDF 的实参必须是 对象。如果需要传入字面量,请使用 lit(),如 将字面量用作列对象 中所述。

  • 要调用您 按名称注册 的 UDFs,以及通过执行 CREATE FUNCTION 创建的 UDFs,请使用 com.snowflake.snowpark.functions 对象中的 callUDF 函数。

    将 UDF 的名称作为第一个实参传递,将任何 UDF 参数作为额外的实参传递。

调用 UDF 会返回包含 UDF 返回值的 Column 对象。

下面的示例调用 UDF 函数 myFunction,并传入来自 col1col2 列的值。该示例将来自 myFunction 的返回值传递给 DataFrame 的 select 方法。

// Import the callUDF function from the functions object.
import com.snowflake.snowpark.functions._
...
// Runs the scalar function 'myFunction' on col1 and col2 of df.
val result =
    df.select(
        callUDF("myDB.schema.myFunction", col("col1"), col("col2"))
    ).collect()
Copy

调用表函数(系统函数和 UDTFs)

要调用 表函数用户定义的表函数 (UDTF),请执行以下操作:

  1. 构造一个 ` TableFunction `_ 对象,传入表函数的名称。

    如果您在 Snowpark 中创建 UDTF,则可以使用 UDTFRegistration.registerTemporaryUDTFRegistration.registerPermanent 方法返回的 TableFunction 对象。请参阅 创建用户定义的表函数 (UDTFs)

  2. 调用 ` session.tableFunction _,传入 :code:`TableFunction 对象和一个输入实参名称和值的 Map

table?Function 返回包含表函数输出的 DataFrame。

例如,假设您执行以下命令来创建 SQL UDTF:

CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
  RETURNS TABLE(id INT, name VARCHAR)
  AS
  $$
    SELECT id, name
      FROM sample_product_data
      WHERE category_id = cat_id
  $$
  ;
Copy

以下代码调用此 UDTF,并为 UDTF 的输出创建一个 DataFrame。该示例将输出的前 10 行打印到控制台。

val dfTableFunctionOutput = session.tableFunction(TableFunction("product_by_category_id"), Map("cat_id" -> lit(10)))
dfTableFunctionOutput.show()
Copy

如果需要将表函数的输出与 DataFrame 联接,请调用传入 TableFunction 的 ` DataFrame.join 方法 `_。

调用存储过程

过程可以在服务器端(Snowflake 环境中)或本地执行。请记住,由于这两种环境不同,过程执行的条件和结果也可能会有所不同。

您可以通过以下任一方式使用 Snowpark API 调用过程:

  • 使用 SProcRegistration.runLocally 方法在本地执行函数以进行测试和调试。

  • 使用 Session.storedProcedure 方法在服务器端 Snowflake 环境中执行过程。这包括作用域为当前会话的过程,或者存储在 Snowflake 上的永久过程。

还可以从 Snowflake 工作表调用您使用 Snowpark API 创建的永久存储过程。有关更多信息,请参阅 调用存储过程

有关使用 Snowpark API 创建过程的更多信息,请参阅 在 Scala 中为 DataFrames 创建存储过程

在本地执行过程的逻辑

您可以使用 SProcRegistration.runLocally 方法在本地环境中为过程执行 lambda 函数。该方法执行此函数,并将其结果作为函数返回的类型返回。

例如,在 Snowflake 上注册过程之前,您可以在本地调用(在客户端上)您打算在过程中使用的 lambda 函数。首先将 lambda 代码作为值赋给一个变量。您将该变量传递给 SProcRegistration.runLocally 方法,以在客户端运行。在注册过程时,还可以使用变量来表示函数。

下面示例中的代码将函数赋值给 func 变量。然后,它通过将变量与函数的实参值一起传递给 SProcRegistration.runLocally 方法,在本地测试函数。该变量也用于注册过程。

val session = Session.builder.configFile("my_config.properties").create

// Assign the lambda function.
val func = (session: Session, num: Int) => num + 1

// Execute the function locally.
val result = session.sproc.runLocally(func, 1)
print("\nResult: " + result)
Copy

在服务器上执行过程

若要在服务器上的 Snowflake 环境中执行过程,请使用 Session.storedProcedure 方法。该方法会返回一个 DataFrame 对象。

例如,您可以执行:

  • 您 :doc:` 使用 Snowpark API 创建 </developer-guide/snowpark/java/creating-sprocs>` 的临时或永久过程。

  • :doc:` 使用 CREATE PROCEDURE 语句创建 </developer-guide/stored-procedure/stored-procedures-creating-sql>` 的过程。

以下示例中的代码会创建一个临时过程,该过程设计为在服务器上执行,但持续时长与当前 Snowpark 会话相同。然后,它使用过程的名称和表示过程的 StoredProcedure 变量来执行过程。

val session = Session.builder.configFile("my_config.properties").create

val name: String = "add_two"

val tempSP: StoredProcedure =
  session.sproc.registerTemporary(
    name,
    (session: Session, num: Int) => num + 2
  )

session.storedProcedure(name, 1).show()

// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();

// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();
Copy
语言: 中文