在 Java 中为 DataFrames 创建用户定义的函数 (UDFs)

Snowpark API 提供方法,供您在 Java 中通过 Lambda 表达式创建用户定义的函数。本主题说明如何创建这些类型的函数。

本主题内容:

简介

您可以调用 Snowpark APIs,在 Java 中为 lambda 函数创建用户定义的函数 (UDFs),并且您可以调用这些 UDFs 以处理 DataFrame 中的数据。

当您使用 Snowpark API 创建 UDF 时,Snowpark 库会序列化 UDF 的代码并将其上传到暂存区。在调用 UDF 时,Snowpark 库将在数据所在的服务器上执行函数。因此,不需要将数据传输到客户端,函数就可以处理数据。

在自定义代码中,还可以调用 JAR 文件中打包的代码(例如,第三方库的 Java 类)。

可以通过以下两种方式之一为自定义代码创建 UDF :

  • 您可以 创建匿名 UDF 并将该函数赋值给变量。只要此变量在作用域中,您就可以使用此变量调用 UDF。

    import com.snowflake.snowpark_java.types.*;
    ...
    
    // Create and register an anonymous UDF (doubleUdf)
    // that takes in an integer argument and returns an integer value.
    UserDefinedFunction doubleUdf =
      Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType);
    // Call the anonymous UDF.
    DataFrame df = session.table("sample_product_data");
    DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
    dfWithDoubleQuantity.show();
    
    Copy
  • 您可以 创建命名 UDF,并按名称调用 UDF。例如,如果需要按名称调用 UDF 或在后续会话中使用 UDF,您可以使用此方式。

    import com.snowflake.snowpark_java.types.*;
    ...
    
    // Create and register a permanent named UDF ("doubleUdf")
    // that takes in an integer argument and returns an integer value.
    UserDefinedFunction doubleUdf =
      session
        .udf()
        .registerPermanent(
          "doubleUdf",
          (Integer x) -> x + x,
          DataTypes.IntegerType,
          DataTypes.IntegerType,
          "mystage");
    // Call the named UDF.
    DataFrame df = session.table("sample_product_data");
    DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
    dfWithDoubleQuantity.show();
    
    Copy

本主题的其余部分将介绍如何创建 UDFs。

备注

如果通过运行 CREATE FUNCTION 命令来定义 UDF,您可以在 Snowpark 中调用该 UDF。

有关详细信息,请参阅 调用标量用户定义的函数 (UDFs)

实参和返回值支持的数据类型

要为 Java lambda 创建 UDF,您必须对方法的实参和返回值使用下面列出的支持数据类型:

SQL 数据类型

Java 数据类型

备注

NUMBER

支持以下类型:

  • Integer

  • Long

  • java.math.BigDecimaljava.math.BigInteger

FLOAT

Float

DOUBLE

Double

VARCHAR

String

BOOLEAN

Boolean

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Byte[]

VARIANT

` com.snowflake.snowpark_java.types.Variant `_

ARRAY

String[]Variant[]

OBJECT

Map<String, String>Map<String, Variant>

GEOGRAPHY

` com.snowflake.snowpark_java.types.Geography `_

为 UDF 指定依赖项

要通过 Snowpark API 定义 UDF,您必须为以下文件调用 Session.addDependency() :包含 UDF 依赖的任何类和资源的任何文件(例如 JAR 文件、资源文件等)。(有关从 UDF 读取资源的更多信息,请参阅 通过 UDF 读取文件。)

Snowpark 库将这些文件上传到内部暂存区,并在执行 UDF 时将文件添加到类路径。

小技巧

如果不希望在您每次运行应用程序时该库都上传文件,请将文件上传到暂存区。在调用 addDependency 时,将路径传递到暂存区中的文件。

以下示例演示如何将暂存区中的 JAR 文件添加为依赖项:

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar");
Copy

以下示例演示如何为 JAR 文件和资源文件添加依赖项:

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar");

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/");

// Add a resource file.
session.addDependency("/<path>/my-resource.xml");
Copy

您不应需要指定以下依赖项:

  • 您的 Java 运行时库。

    这些库已在执行 UDFs 的服务器上的运行时环境中可用。

  • Snowpark JAR 文件。

    Snowpark 库会自动尝试检测 Snowpark JAR 文件并将其上传到服务器。

    防止库重复将 Snowpark JAR 文件上传到服务器:

    1. 将 Snowpark JAR 文件上传到暂存区。

      例如,以下命令将 Snowpark JAR 文件上传到暂存区 @mystage。PUT 命令会压缩 JAR 文件并将生成的文件命名为 snowpark-1.12.1.jar.gz。

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-1.12.1.jar @mystage
    2. 调用 addDependency,将暂存区中的 Snowpark JAR 文件添加为依赖项。

      例如,添加上一个命令上传的 Snowpark JAR 文件:

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-1.12.1.jar.gz");

      请注意, JAR 文件的指定路径包括 .gz 文件扩展名,该扩展名由 PUT 命令添加。

  • 包含当前正在运行的应用程序的 JAR 文件或目录。

    Snowpark 库会自动尝试检测并上传这些依赖项。

    如果 Snowpark 库无法自动检测这些依赖项,库会报告错误,您必须调用 addDependency,以手动添加这些依赖项。

如果将依赖项上传到暂存区时花费的时间过长,Snowpark 库会报告超时异常。要配置 Snowpark 库应等待的最长时间,请在创建会话时设置 snowpark_request_timeout_in_seconds 属性。

创建匿名 UDF

要创建匿名 UDF,您可以执行以下任一操作:

  • 调用 Functions.udf 静态方法,传入 lambda 表达式,以及表示输入和输出数据类型的 ` DataTypes `_ 字段(或由该类的方法构造的对象)。

  • 调用 UDFRegistration 类中的 registerTemporary 方法,传入 lambda 表达式,以及表示输入和输出数据类型的 ` DataTypes `_ 字段(或由该类的方法构造的对象)。

    您可以调用 Session 对象的 udf 方法,访问 UDFRegistration 类的实例。

    在调用 registerTemporary 时,请使用没有 name 参数的方法签名。(因为您正在创建匿名 UDF,所以您没有指定 UDF 的名称。)

备注

在编写多线程代码时(例如,使用并行集合时),请使用 registerTemporary 方法来注册 UDFs,而不是使用 udf 方法。这可以防止找不到默认 Snowflake Session 对象的错误。

这些方法会返回 UserDefinedFunction 对象,用于调用 UDF。(请参阅 调用标量用户定义的函数 (UDFs)。)

以下示例创建匿名 UDF:

import com.snowflake.snowpark_java.types.*;
...

// Create and register an anonymous UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType);
// Call the anonymous UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

以下示例创建匿名 UDF,该匿名函数使用自定义类(LanguageDetector,用于检测文本中使用的语言)。该示例调用匿名 UDF 来检测 DataFrame 中 text_data 列中的语言,并创建新的 DataFrame,其中包含所用语言的附加 lang 列。

import com.snowflake.snowpark_java.types.*;

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector;

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar");

// Create a detector
LanguageDetector detector = new LanguageDetector();

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
UserDefinedFunction langUdf =
  Functions.udf(
    (String s) -> Option(detector.detect(s)).getOrElse("UNKNOWN"),
    DataTypes.StringType,
    DataTypes.StringType);

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
DataFrame dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(Functions.col("text_data")));
Copy

创建和注册命名的 UDF

如果要按名称调用 UDF(例如,使用 Functions.callUDF 静态方法),或者需要在后续会话中使用 UDF,则可以创建并注册命名的 UDF。为此,请使用 UDFRegistration 类中的下列其中一种方法:

  • registerTemporary,如果您只计划在当前会话中使用 UDF

  • registerPermanent,如果您计划在后续会话中使用 UDF

要访问 UDFRegistration 类的对象,请调用 Session 对象的 udf 方法。

在调用 registerTemporaryregisterPermanent 方法时,传入 lambda 表达式,以及表示输入和输出数据类型的 ` DataTypes `_ 字段(或由该类的方法构造的对象)。

例如:

import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerTemporary(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

registerPermanent 会创建可在当前和后续会话中使用的 UDF。在调用 registerPermanent 时,您还必须在内部暂存区位置中指定一个位置,用于上传 UDF 及其依赖项的 JAR 文件。

备注

registerPermanent 不支持外部暂存区。

例如:

import com.snowflake.snowpark_java.types.*;
...

// Create and register a permanent named UDF
// that takes in an integer argument and returns an integer value.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerPermanent(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType,
      "mystage");
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

使用不可序列化的对象

在为 lambda 表达式创建 UDF 时,Snowpark 库会序列化 lambda 闭包并将其发送到服务器进行执行。

如果 lambda 闭包获取的对象不可序列化,则 Snowpark 库将引发 java.io.NotSerializableException 异常。

Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Copy

如果发生这种情况,您必须使对象可序列化。

为 UDF 编写初始化代码

如果 UDF 需要初始化代码或上下文,您可以通过作为 UDF 闭包的一部分获取的值来提供此代码或上下文。

以下示例使用单独的类来初始化两个 UDFs 所需的上下文。

  • 第一个 UDF 在 lambda 内创建类的新实例,因此每次调用 UDF 时都会执行初始化。

  • 第二个 UDF 获取在客户端程序中生成的类的实例。客户端上生成的上下文序列化并由 UDF 使用。请注意,上下文类必须可序列化,此方法才能正常工作。

import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
import java.io.Serializable;

// Context needed for a UDF.
class Context {
  double randomInt = Math.random();
}

// Serializable context needed for the UDF.
class SerContext implements Serializable {
  double randomInt = Math.random();
}

class TestUdf {
  public static void main(String[] args) {
    // Create the session.
    Session session = Session.builder().configFile("/<path>/profile.properties").create();
    session.range(1, 10, 2).show();

    // Create a DataFrame with two columns ("c" and "d").
    DataFrame dummy =
      session.createDataFrame(
        new Row[]{
          Row.create(1, 1),
          Row.create(2, 2),
          Row.create(3, 3)
        },
        StructType.create(
          new StructField("c", DataTypes.IntegerType),
          new StructField("d", DataTypes.IntegerType))
        );
    dummy.show();

    // Initialize the context once per invocation.
    UserDefinedFunction udfRepeatedInit =
      Functions.udf(
        (Integer i) -> new Context().randomInt,
        DataTypes.IntegerType,
        DataTypes.DoubleType
      );
    dummy.select(udfRepeatedInit.apply(dummy.col("c"))).show();

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    SerContext sC = new SerContext();
    UserDefinedFunction udfOnceInit =
      Functions.udf(
        (Integer i) -> sC.randomInt,
        DataTypes.IntegerType,
        DataTypes.DoubleType
      );
    dummy.select(udfOnceInit.apply(dummy.col("c"))).show();
    UserDefinedFunction udfOnceInit = udf((i: Int) => sC.randomInt);
  }
}
Copy

通过 UDF 读取文件

如前所述,Snowpark 库在服务器上上传和执行 UDFs。如果 UDF 需要从文件中读取数据,您必须确保使用 UDF 上传文件。

此外,如果文件的内容在两次调用 UDF 之间保持不变,您可以编写代码,在第一次调用期间加载文件一次,而不在后续调用中加载。这样可以提高 UDF 调用的性能。

设置 UDF 以读取文件:

  1. 将文件添加到 JAR 文件。

    例如,如果 UDF 需要使用 data/ 子目录 (data/hello.txt) 中的文件,请运行 jar 命令,将此文件添加到 JAR 文件:

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
    Copy
  2. 指定 JAR 文件是依赖项,该依赖项将文件上传到服务器并将文件添加到类路径。请参阅 为 UDF 指定依赖项

    例如:

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar");
    
    Copy
  3. 在 UDF 中,调用 Class.forName().getResourceAsStream(),在类路径中找到文件并读取文件。

    为避免添加 this 的依赖项,您可以使用 Class.forName("com.snowflake.snowpark_java.DataFrame") (而不是 getClass())来获取 Class 对象。

    例如,读取 data/hello.txt 文件:

    // Read data/hello.txt from myJar.jar.
    String resourceName = "/data/hello.txt";
    InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame").getResourceAsStream(resourceName);
    
    Copy

    在此示例中,资源名称以 / 开头,表示这是 JAR 文件中文件的完整路径。(在这种情况下,文件的位置与类的包无关。)

备注

如果您不希望文件的内容在 UDF 调用之间发生变化,请将文件读入类的静态字段,并仅在未设置该字段时读取文件。

以下示例定义对象 (UDFCode),包含将用作 UDF (readFileFunc) 的函数。该函数读取文件 data/hello.txt,该文件应包含字符串 hello,。该函数将此字符串预置到作为实参传入的字符串之前。

import java.io.InputStream;
import java.nio.charset.StandardCharsets;

// Create a function class that reads a file.
class UDFCode {
  private static String fileContent = null;
  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // The file content is cached in 'fileContent'.
  public static String readFile() {
    if (fileContent == null) {
      try {
        String resourceName = "/data/hello.txt";
        InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame")
          .getResourceAsStream(resourceName);
        fileContent = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
      } catch (Exception e) {
        fileContent = "Error while reading file";
      }
    }
    return fileContent;
  }
}
Copy

该示例的下一部分将函数注册为匿名 UDF。该示例调用 DataFrame 的 NAME 列中的 UDF。假设 data/hello.txt 文件打包在 JAR 文件 myJar.jar 中。

import com.snowflake.snowpark_java.types.*;

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar");

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
DataFrame myDf = session.sql("select 'Raymond' NAME");

// Register the function that you defined earlier as an anonymous UDF.
UserDefinedFunction readFileUdf = session.udf().registerTemporary(
  (String s) -> UDFCode.readFile() + " : " + s, DataTypes.StringType, DataTypes.StringType);

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf.apply(Functions.col("NAME"))).show();
Copy

创建用户定义的表函数 (UDTFs)

要在 Snowpark 中创建和注册 UDTF,您必须执行以下操作:

接下来的部分将更详细地介绍这些步骤。

有关调用 UDTF 的信息,请参阅 调用 UDTF

定义 UDTF 类

定义一个类,在 ` com.snowflake.snowpark_java.udtf package _ 中实施一个 :samp:`JavaUDTF{n} 界面(例如 JavaUDTF0JavaUDTF1 等),其中 n 指定 UDTF 的输入实参的数量。例如,如果 UDTF 传入 2 个输入实参,请实施 JavaUDTF2 界面。

在类中,实施以下方法:

  • outputSchema(),返回 types.StructType 对象,该对象描述返回行中字段的名称和类型(输出的“架构”)。

  • process(),对 输入分区 中的每一行调用一次(请参阅下面的注释)。

  • inputSchema(),返回 types.StructType 对象,描述输入参数的类型。

    如果 process() 方法传入 Map 实参,您必须实施 inputSchema() 方法。否则,实施此方法是可选的。

  • endPartition(),在所有行都传递到 process() 后,为每个分区调用一次。

在调用 UDTF 时,行在传递到 UDTF 之前划分成分区:

  • 如果调用 UDTF 的语句指定 PARTITION 子句(显式分区),则该子句将确定行的分区方式。

  • 如果语句未指定 PARTITION 子句(隐式分区),则 Snowflake 将确定如何最好地对行进行分区。

有关分区的说明,请参阅 表函数和分区

有关类的 UDTF 示例,请参阅 UDTF 类示例

实施 outputSchema() 方法

实施 outputSchema() 方法,定义 process()endPartition() 方法返回的行的字段(“输出架构”)的名称和数据类型。

public StructType outputSchema()
Copy

在此方法中,构造并返回 ` StructType `_ 对象,该对象包含 ` StructField `_ 对象,表示返回行中每个字段的 Snowflake 数据类型。Snowflake 针对 UDTF 的输出架构支持以下类型对象:

SQL 数据类型

Java 类型

com.snowflake.snowpark_java.types 类型

NUMBER

java.lang.Short

ShortType

NUMBER

java.lang.Integer

IntType

NUMBER

java.lang.Long

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

java.lang.Float

FloatType

DOUBLE

java.lang.Double

DoubleType

VARCHAR

java.lang.String

StringType

BOOLEAN

java.lang.Boolean

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

byte[]

BinaryType

VARIANT

` com.snowflake.snowpark_java.types.Variant `_

VariantType

ARRAY

String[]

ArrayType(StringType)

ARRAY

Variant[]

ArrayType(VariantType)

OBJECT

:code:` java.util.Map<String, String> `

MapType(StringType, StringType)

OBJECT

:code:` java.util.Map<String, Variant> `

MapType(StringType, VariantType)

例如,如果 UDTF 返回包含单个整数字段的行:

public StructType outputSchema() {
  return StructType.create(new StructField("C1", DataTypes.IntegerType));
}
Copy

实施 process() 方法

在 UDTF 类中,实施 process() 方法:

Stream<Row> process(A0 arg0, ... A<n> arg<n>)
Copy

其中 n 是传递给 UDTF 的实参数量。

签名中的实参数量与您实施的界面相对应。例如,如果 UDTF 传入 2 个输入实参,并且您正在实施界面 JavaUDTF2,则 process() 方法具有以下签名:

Stream<Row> process(A0 arg0, A1 arg1)
Copy

此方法会针对输入分区中的每一行调用一次。

选择实参类型

对于 process() 方法中每个实参的类型,请使用与传递到 UDTF 的实参的 Snowflake 数据类型相对应的 Java 类型。

Snowflake 针对 UDTF 的实参支持以下数据类型:

SQL 数据类型

Java 数据类型

备注

NUMBER

支持以下类型:

  • java.lang.Short

  • java.lang.Integer

  • java.lang.Long

  • java.math.BigDecimal

FLOAT

java.lang.Float

DOUBLE

java.lang.Double

VARCHAR

java.lang.String

BOOLEAN

java.lang.Boolean

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

byte[]

VARIANT

` com.snowflake.snowpark_java.types.Variant `_

ARRAY

String[]Variant[]

OBJECT

Map<String, String>Map<String, Variant>

备注

如果传入 java.util.Map 实参,您必须实施 inputSchema 方法,以描述这些实参的类型。请参阅 实施 inputSchema() 方法

返回行

process() 方法中,构建并返回 Row 对象的 ` java.util.stream.Stream _,这些对象包含 UDTF 要为给定输入值返回的数据。行中的字段必须使用 :code:`outputSchema 方法中指定的类型。(请参阅 实施 outputSchema() 方法 。)

例如,如果 UDTF 生成行,请为生成的行构造并返回 Row 对象的 Iterable

import java.util.stream.Stream;
...

public Stream<Row> process(Integer start, Integer count) {
  Stream.Builder<Row> builder = Stream.builder();
  for (int i = start; i < start + count ; i++) {
    builder.add(Row.create(i));
  }
  return builder.build();
}
Copy

实施 inputSchema() 方法

如果 process() 方法传入 java.util.Map 实参,您必须实施 inputSchema() 方法以描述输入实参的类型。

备注

如果 process() 方法没有传入 Map 实参,您不需要实施 inputSchema() 方法。

在此方法中,构造并返回 ` StructType _ 对象,该对象包含 ` StructField `_ 对象,表示传入 :code:`process() 方法的每个实参的 Snowflake 数据类型。Snowflake 针对 UDTF 的输入架构支持以下类型对象:

SQL 数据类型

Java 类型

com.snowflake.snowpark_java.types 类型

NUMBER

java.lang.Short

ShortType

NUMBER

java.lang.Integer

IntType

NUMBER

java.lang.Long

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

java.lang.Float

FloatType

DOUBLE

java.lang.Double

DoubleType

VARCHAR

java.lang.String

StringType

BOOLEAN

java.lang.Boolean

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

byte[]

BinaryType

VARIANT

` com.snowflake.snowpark_java.types.Variant `_

VariantType

ARRAY

String[]

ArrayType(StringType)

ARRAY

Variant[]

ArrayType(VariantType)

OBJECT

:code:` java.util.Map<String, String> `

MapType(StringType, StringType)

OBJECT

:code:` java.util.Map<String, Variant> `

MapType(StringType, VariantType)

例如,假设 process() 方法传入 Map <String, String> 实参和 Map<String, Variant> 实参:

import java.util.Map;
import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
...

public Stream<Row> process(Map<String, String> stringMap, Map<String, Variant> varMap) {
  ...
}
Copy

您必须实施 inputSchema() 方法,才能返回 StructType 对象以描述以下输入实参的类型:

import java.util.Map;
import com.snowflake.snowpark_java.types.*;
...

public StructType inputSchema() {
  return StructType.create(
      new StructField(
          "string_map",
          DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)),
      new StructField(
          "variant_map",
          DataTypes.createMapType(DataTypes.StringType, DataTypes.VariantType)));
}
Copy

实施 endPartition() 方法

实施 endPartition 方法,并且添加在输入分区中的所有行传递到 process 方法后应执行的代码。每个输入分区调用 endPartition 方法一次。

public Stream<Row> endPartition()
Copy

如果您需要在处理完分区中的所有行后执行任何工作,您可以使用此方法。例如,您可以:

  • 根据在每个 process 方法调用中获取的状态信息返回行。

  • 返回未绑定到特定输入行的行。

  • 返回汇总 process 方法生成的输出行的行。

返回的行中的字段必须与您在 outputSchema 方法中指定的类型匹配。(请参阅 实施 outputSchema() 方法。)

如果您不需要在每个分区的末尾返回其他行,请返回空白 Stream。例如:

public Stream<Row> endPartition() {
  return Stream.empty();
}
Copy

备注

虽然 Snowflake 支持大型分区,会调整超时以成功处理分区,但特别大的分区可能导致处理超时(例如 endPartition 需要太长时间才能完成)。如果您需要针对特定使用场景调整超时阈值,请联系 ` Snowflake 支持部门 `_。

UDTF 类示例

下面是生成一系列行的 UDTF 类的示例。

  • 由于 UDTF 传入 2 个实参,因此该类实施 JavaUDTF2

  • 实参 startcount 指定起始行数和要生成的行数。

import java.util.stream.Stream;
import com.snowflake.snowpark_java.types.*;
import com.snowflake.snowpark_java.udtf.*;

class MyRangeUdtf implements JavaUDTF2<Integer, Integer> {
  public StructType outputSchema() {
    return StructType.create(new StructField("C1", DataTypes.IntegerType));
  }

  // Because the process() method in this example does not pass in Map arguments,
  // implementing the inputSchema() method is optional.
  public StructType inputSchema() {
    return StructType.create(
            new StructField("start_value", DataTypes.IntegerType),
            new StructField("value_count", DataTypes.IntegerType));
  }

  public Stream<Row> endPartition() {
    return Stream.empty();
  }

  public Stream<Row> process(Integer start, Integer count) {
    Stream.Builder<Row> builder = Stream.builder();
    for (int i = start; i < start + count ; i++) {
      builder.add(Row.create(i));
    }
    return builder.build();
  }
}
Copy

注册 UDTF

接下来,创建新类的实例,并调用其中一种 ` UDTFRegistration _ 方法以注册该类。您可以注册 :ref:`临时 <label-snowpark_java_udtf_register_name_temporary>永久 UDTF

注册临时 UDTF

要注册临时 UDTF,请调用 UDTFRegistration.registerTemporary

  • 如果您不需要按名称调用 UDTF,您可以传入类的实例来注册匿名 UDTF :

    // Register the MyRangeUdtf class that was defined in the previous example.
    TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf());
    // Use the returned TableFunction object to call the UDTF.
    session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
    
    Copy
  • 如果您需要按名称调用 UDTF,请同时传入 UDTF 的名称:

    // Register the MyRangeUdtf class that was defined in the previous example.
    TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf());
    // Call the UDTF by name.
    session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
    
    Copy

注册永久 UDTF

如果您需要在后续会话中使用 UDTF,请调用 UDTFRegistration.registerPermanent 以注册永久 UDTF。

在注册永久 UDTF 时,您必须指定暂存区,在此处注册方法将上传 UDTF 及其依赖项的 JAR 文件。例如:

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerPermanent("myUdtf", new MyRangeUdtf(), "@myStage");
// Call the UDTF by name.
session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Copy

调用 UDTF

在注册 UDTF 后,您可以将返回的 TableFunction 对象传递到 Session 对象的 tableFunction 方法,从而调用 UDTF :

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf());
// Use the returned TableFunction object to call the UDTF.
session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
Copy

要按名称调用 UDTF,请构造具有该名称的 TableFunction 对象,并将其传递到 tableFunction 方法:

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf());
// Call the UDTF by name.
session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Copy

您还可以直接通过 SELECT 语句调用 UDTF :

session.sql("select * from table(myUdtf(10, 5))");
Copy
语言: 中文