在 Snowpark Java 中调用函数和存储过程¶
若要处理 DataFrame 中的数据,可以调用系统定义的 SQL 函数、用户定义的函数和存储过程。本主题介绍如何在 Snowpark 中调用它们。
本主题内容:
调用系统定义的函数¶
如果需要调用 系统定义的 SQL 函数,请使用 ` Functions 类 `_ 中的等效静态方法。
以下示例调用 Functions
类中的 upper
静态方法(与系统定义的 UPPER 函数等效),从而以大写字母形式返回 name 列中的值:
DataFrame df = session.table("sample_product_data");
df.select(Functions.upper(Functions.col("name"))).show();
如果 Functions
类中没有提供系统定义的 SQL 函数,则可以使用 Functions.callUDF
静态方法调用系统定义的函数。
对于 callUDF
,将系统定义的函数的名称作为第一个实参传递。如果需要将列的值传递给系统定义的函数,请定义 列 对象,并将其作为额外的实参传递给 callUDF
方法。
下面的示例调用系统定义的函数 RADIANS,并传入来自 degrees
列的值:
// Call the system-defined function RADIANS() on degrees.
DataFrame dfDegrees = session.range(0, 360, 45).rename("degrees", Functions.col("id"));
dfDegrees.select(Functions.col("degrees"), Functions.callUDF("radians", Functions.col("degrees"))).show();
callUDF
方法返回一个 Column
,可将其传递给 DataFrame 转换方法 (例如 filter、select 等)。
调用标量用户定义的函数 (UDFs)¶
调用 UDF 的方法取决于 UDF 的创建方式:
要调用 匿名 UDF,请调用创建 UDF 时返回的 UserDefinedFunction 对象的
apply
方法。传递给 UDF 的实参必须是 列 对象。如果需要传入字面量,请使用
Functions.lit()
,如 :ref:`label-snowpark_java_dataframe_cols_lit`中所述。要调用您 按名称注册 的 UDFs,以及通过执行 CREATE FUNCTION 创建的 UDFs,请使用
Functions.callUDF
静态方法。将 UDF 的名称作为第一个实参传递,将任何 UDF 参数作为额外的实参传递。
调用 UDF 会返回包含 UDF 返回值的 Column
对象。
以下示例调用 UDF 函数 doubleUdf
,并传入来自 quantity
列的值。该示例将来自 doubleUdf
的返回值传递给 DataFrame 的 select
方法。
import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
session
.udf()
.registerTemporary(
"doubleUdf",
(Integer x) -> x + x,
DataTypes.IntegerType,
DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
调用表函数(系统函数和 UDTFs)¶
要调用 表函数 或 用户定义的表函数 (UDTF),请执行以下操作:
构造一个 ` TableFunction `_ 对象,传入表函数的名称。
调用 Session 对象的 tableFunction 方法,传入
TableFunction
对象以及输入实参名称和值的Map
。
table?Function
返回包含表函数输出的 DataFrame。
例如,假设您执行以下命令来创建 SQL UDTF:
CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
RETURNS TABLE(id INT, name VARCHAR)
AS
$$
SELECT id, name
FROM sample_product_data
WHERE category_id = cat_id
$$
;
以下代码调用此 UDTF,并为 UDTF 的输出创建一个 DataFrame。该示例将输出的前 10 行打印到控制台。
import java.util.HashMap;
import java.util.Map;
...
Map<String, Column> arguments = new HashMap<>();
arguments.put("cat_id", Functions.lit(10));
DataFrame dfTableFunctionOutput = session.tableFunction(new TableFunction("product_by_category_id"), arguments);
dfTableFunctionOutput.show();
如果需要将表函数的输出与 DataFrame 联接,请调用 ` 传入 TableFunction 的 join 方法 `_。
调用存储过程¶
过程可以在服务器端(Snowflake 环境中)或本地执行。请记住,由于这两种环境不同,过程执行的条件和结果也可能会有所不同。
您可以通过以下任一方式使用 Snowpark API 调用过程:
使用 ` SProcRegistration.runLocally `_ 方法在本地执行函数,以进行测试和调试。
使用 ` Session.storedProcedure `_ 方法之一在服务器端 Snowflake 环境中执行过程。这包括作用域为当前会话的过程,或者存储在 Snowflake 上的永久过程。
还可以从 SQL 代码调用您使用 Snowpark API 创建的永久存储过程。有关更多信息,请参阅 调用存储过程。
有关使用 Snowpark API 创建过程的更多信息,请参阅 在 Java 中为 DataFrames 创建存储过程。
在本地执行过程的逻辑¶
您可以使用 SProcRegistration.runLocally
方法在本地环境中为过程执行 lambda 函数。该方法执行此函数,并将其结果作为函数返回的类型返回。
例如,您可以先调用打算在过程中使用的 lambda 函数,然后在 Snowflake 上通过此函数注册过程。首先,将 lambda 代码作为值赋给类型为 com.snowflake.snowpark_java.sproc.JavaSProc
接口之一的变量。通过使用该变量,可以利用 SProcRegistration.runLocally
方法来测试函数。在注册过程时,还可以使用变量来表示函数。
以下示例中的代码初始化 lambda 函数中的 JavaSProc
变量,该函数将成为过程的逻辑。然后,它通过将该变量与函数的实参一起传递给 SProcRegistration.runLocally
方法,对函数进行测试。该变量还用于注册函数。
Session session = Session.builder().configFile("my_config.properties").create();
// Assign the lambda function to a variable.
JavaSProc1<Integer, Integer> func =
(Session session, Integer num) -> num + 1;
// Execute the function locally.
int result = (Integer)session.sproc().runLocally(func, 1);
System.out.println("\nResult: " + result);
// Register the procedure.
StoredProcedure sp =
session.sproc().registerTemporary(
func,
DataTypes.IntegerType,
DataTypes.IntegerType
);
// Execute the procedure on the server.
session.storedProcedure(sp, 1).show();
在服务器上执行过程¶
若要在服务器上的 Snowflake 环境中执行过程,请使用 Session.storedProcedure
方法。该方法会返回一个 DataFrame
对象。
例如,您可以执行:
您 :doc:` 使用 Snowpark API 创建 </developer-guide/snowpark/java/creating-sprocs>` 的临时或永久过程。
:doc:` 使用 CREATE PROCEDURE 语句创建 </developer-guide/stored-procedure/stored-procedures-creating-sql>` 的过程。
以下示例中的代码会创建一个临时过程,该过程设计为在服务器上执行,但持续时长与当前 Snowpark 会话相同。然后,它使用过程的名称和表示过程的 ` com.snowflake.snowpark_java.StoredProcedure `_ 变量来执行过程。
Session session = Session.builder().configFile("my_config.properties").create();
String incrementProc = "increment";
// Register the procedure.
StoredProcedure tempSP =
session.sproc().registerTemporary(
incrementProc,
(Session session, Integer num) -> num + 1,
DataTypes.IntegerType,
DataTypes.IntegerType
);
// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();
// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();