在 Snowpark Java 中调用函数和存储过程

若要处理 DataFrame 中的数据,可以调用系统定义的 SQL 函数、用户定义的函数和存储过程。本主题介绍如何在 Snowpark 中调用它们。

本主题内容:

调用系统定义的函数

如果需要调用 系统定义的 SQL 函数,请使用 ` Functions 类 `_ 中的等效静态方法。

以下示例调用 Functions 类中的 upper 静态方法(与系统定义的 UPPER 函数等效),从而以大写字母形式返回 name 列中的值:

DataFrame df = session.table("sample_product_data");
df.select(Functions.upper(Functions.col("name"))).show();
Copy

如果 Functions 类中没有提供系统定义的 SQL 函数,则可以使用 Functions.callUDF 静态方法调用系统定义的函数。

对于 callUDF,将系统定义的函数的名称作为第一个实参传递。如果需要将列的值传递给系统定义的函数,请定义 对象,并将其作为额外的实参传递给 callUDF 方法。

下面的示例调用系统定义的函数 RADIANS,并传入来自 degrees 列的值:

// Call the system-defined function RADIANS() on degrees.
DataFrame dfDegrees = session.range(0, 360, 45).rename("degrees", Functions.col("id"));
dfDegrees.select(Functions.col("degrees"), Functions.callUDF("radians", Functions.col("degrees"))).show();
Copy

callUDF 方法返回一个 Column,可将其传递给 DataFrame 转换方法 (例如 filter、select 等)。

调用标量用户定义的函数 (UDFs)

调用 UDF 的方法取决于 UDF 的创建方式:

  • 要调用 匿名 UDF,请调用创建 UDF 时返回的 UserDefinedFunction 对象的 apply 方法。

    传递给 UDF 的实参必须是 对象。如果需要传入字面量,请使用 Functions.lit(),如 :ref:`label-snowpark_java_dataframe_cols_lit`中所述。

  • 要调用您 按名称注册 的 UDFs,以及通过执行 CREATE FUNCTION 创建的 UDFs,请使用 Functions.callUDF 静态方法。

    将 UDF 的名称作为第一个实参传递,将任何 UDF 参数作为额外的实参传递。

调用 UDF 会返回包含 UDF 返回值的 Column 对象。

以下示例调用 UDF 函数 doubleUdf,并传入来自 quantity 列的值。该示例将来自 doubleUdf 的返回值传递给 DataFrame 的 select 方法。

import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerTemporary(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

调用表函数(系统函数和 UDTFs)

要调用 表函数用户定义的表函数 (UDTF),请执行以下操作:

  1. 构造一个 ` TableFunction `_ 对象,传入表函数的名称。

  2. 调用 Session 对象的 tableFunction 方法,传入 TableFunction 对象以及输入实参名称和值的 Map

table?Function 返回包含表函数输出的 DataFrame。

例如,假设您执行以下命令来创建 SQL UDTF:

CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
  RETURNS TABLE(id INT, name VARCHAR)
  AS
  $$
    SELECT id, name
      FROM sample_product_data
      WHERE category_id = cat_id
  $$
  ;
Copy

以下代码调用此 UDTF,并为 UDTF 的输出创建一个 DataFrame。该示例将输出的前 10 行打印到控制台。

import java.util.HashMap;
import java.util.Map;
...

Map<String, Column> arguments = new HashMap<>();
arguments.put("cat_id", Functions.lit(10));
DataFrame dfTableFunctionOutput = session.tableFunction(new TableFunction("product_by_category_id"), arguments);
dfTableFunctionOutput.show();
Copy

如果需要将表函数的输出与 DataFrame 联接,请调用 ` 传入 TableFunction 的 join 方法 `_。

调用存储过程

过程可以在服务器端(Snowflake 环境中)或本地执行。请记住,由于这两种环境不同,过程执行的条件和结果也可能会有所不同。

您可以通过以下任一方式使用 Snowpark API 调用过程:

  • 使用 ` SProcRegistration.runLocally `_ 方法在本地执行函数,以进行测试和调试。

  • 使用 ` Session.storedProcedure `_ 方法之一在服务器端 Snowflake 环境中执行过程。这包括作用域为当前会话的过程,或者存储在 Snowflake 上的永久过程。

还可以从 SQL 代码调用您使用 Snowpark API 创建的永久存储过程。有关更多信息,请参阅 调用存储过程

有关使用 Snowpark API 创建过程的更多信息,请参阅 在 Java 中为 DataFrames 创建存储过程

在本地执行过程的逻辑

您可以使用 SProcRegistration.runLocally 方法在本地环境中为过程执行 lambda 函数。该方法执行此函数,并将其结果作为函数返回的类型返回。

例如,您可以先调用打算在过程中使用的 lambda 函数,然后在 Snowflake 上通过此函数注册过程。首先,将 lambda 代码作为值赋给类型为 com.snowflake.snowpark_java.sproc.JavaSProc 接口之一的变量。通过使用该变量,可以利用 SProcRegistration.runLocally 方法来测试函数。在注册过程时,还可以使用变量来表示函数。

以下示例中的代码初始化 lambda 函数中的 JavaSProc 变量,该函数将成为过程的逻辑。然后,它通过将该变量与函数的实参一起传递给 SProcRegistration.runLocally 方法,对函数进行测试。该变量还用于注册函数。

Session session = Session.builder().configFile("my_config.properties").create();

// Assign the lambda function to a variable.
JavaSProc1<Integer, Integer> func =
  (Session session, Integer num) -> num + 1;

// Execute the function locally.
int result = (Integer)session.sproc().runLocally(func, 1);
System.out.println("\nResult: " + result);

// Register the procedure.
StoredProcedure sp =
  session.sproc().registerTemporary(
    func,
    DataTypes.IntegerType,
    DataTypes.IntegerType
  );

// Execute the procedure on the server.
session.storedProcedure(sp, 1).show();
Copy

在服务器上执行过程

若要在服务器上的 Snowflake 环境中执行过程,请使用 Session.storedProcedure 方法。该方法会返回一个 DataFrame 对象。

例如,您可以执行:

  • 您 :doc:` 使用 Snowpark API 创建 </developer-guide/snowpark/java/creating-sprocs>` 的临时或永久过程。

  • :doc:` 使用 CREATE PROCEDURE 语句创建 </developer-guide/stored-procedure/stored-procedures-creating-sql>` 的过程。

以下示例中的代码会创建一个临时过程,该过程设计为在服务器上执行,但持续时长与当前 Snowpark 会话相同。然后,它使用过程的名称和表示过程的 ` com.snowflake.snowpark_java.StoredProcedure `_ 变量来执行过程。

Session session = Session.builder().configFile("my_config.properties").create();

String incrementProc = "increment";

// Register the procedure.
StoredProcedure tempSP =
  session.sproc().registerTemporary(
    incrementProc,
    (Session session, Integer num) -> num + 1,
    DataTypes.IntegerType,
    DataTypes.IntegerType
  );

// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();

// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();
Copy
语言: 中文