表格 Java UDFs (UDTFs)

本文档介绍如何在 Java 中编写 UDTF (用户定义的 表函数)。

本主题内容:

简介

Java UDTF 处理程序类处理 UDTF 调用中收到的行,并返回表格结果。收到的行由 Snowflake 隐式分区,或在函数调用的语法中显式分区。可使用在类中实现的方法处理单个行以及将它们分组到的分区。

处理程序类可处理符合以下条件的分区和行:

  • 具有作为初始值设定项的零实参构造函数。可以使用它来设置分区范围的状态。

  • 用于处理每一行的 process 方法。

  • 零实参 endPartition 方法作为终结器,用于完成分区处理,包括返回范围限定为分区的值。

有关更多信息,请参阅 :ref:`label-udf_java_methods`(本主题内容)。

每个 Java UDTF 还需要一个*输出行类*,该类指定由处理程序类生成的输出行列的 Java 数据类型。有关详细信息,请参阅 :ref:`label-udtf_java_outputrow_class`(本主题内容)。

分区的使用说明

  • 接收到由 Snowflake 隐式分区的行时,处理程序代码不能对分区做出任何假设。如果 UDTF 只需要单独查看行即可生成其输出,并且没有跨行聚合状态,使用隐式分区运行是最有用的。在这种情况下,代码可能并不需要构造函数或 endPartition 方法。

  • 为提高性能,Snowflake 通常会并行执行 UDTF 处理程序代码的多个实例。行的每个分区都会传递给 UDTF 的一个实例。

  • 尽管每个分区仅由一个 UDTF 处理,但反之不一定如此 – 一个 UDTF 实例可以按顺序处理多个分区。因此,使用初始值设定项和终结器来初始化和清理每个分区非常重要,可避免将累积值从一个分区的处理转移到另一个分区的处理。

备注

表格函数 (UDTFs) 的输入实参限制为 500 个,输出列限制为 500 列。

UDTFs 的 Java 类

UDTF 的主要组件是处理程序类和输出行类。

处理程序类

Snowflake 主要通过调用处理程序类的以下方法与 UDTF 交互:

  • 初始值设定项(构造函数)。

  • 处理每行的方法 (process)。

  • 终结器方法 (endPartition)。

处理程序类可包含支持这三种方法所需的其他方法。

处理程序类还包含一个 getOutputClass 方法,稍后将加以介绍。

从处理程序类(或 输出行类)中的任何方法引发异常都会导致处理停止。调用 UDTF 的查询失败,并显示错误消息。

构造函数

处理程序类可以有一个构造函数,该构造函数接收的实参数量必须为零。

在调用 process 之前,必须为每个 分区 调用一次构造函数。

构造函数无法生成输出行。

使用构造函数初始化分区的状态;此状态可由 processendPartition 方法使用。对于任何长时间运行的初始化,如果只需要在每个分区执行一次,而不是每行执行一次,那么也适合置于构造函数之中。

构造函数是可选的。

process 方法

process 方法会针对输入分区中的每一行调用一次。

传递给 UDTF 的实参会传递给 process。实参的值将从 SQL 数据类型转换为 Java 数据类型。(有关映射 SQL 和 Java 数据类型的信息,请参阅 SQL-Java 数据类型映射。)

process 方法的参数名称可为任何有效的 Java 标识;这些名称不需要与 CREATE FUNCTION 语句中指定的名称匹配。

每次调用 process 时,它都可以返回零行、一行或多行。

process 方法返回的数据类型必须为 Stream <OutputRow>,Stream 在 java.util.stream.Stream 中定义,OutputRow 是输出行类的名称。下面的示例显示了一个仅通过 Stream 返回其输入的简单 process 方法:

import java.util.stream.Stream;

...

public Stream<OutputRow> process(String v) {
  return Stream.of(new OutputRow(v));
}

...
Copy

如果 process 方法不保留或使用对象中的任何状态(例如,如果该方法被设计为仅从输出中排除选定的输入行),则可以将该方法声明为 static。 如果 process 方法为 static,并且处理程序类没有构造函数或非静态 endPartition 方法,则 Snowflake 会将每一行直接传递给静态 process 方法,而不会构造处理程序类的实例。

如果需要跳过输入行并处理下一行(例如,如果要验证输入行),则返回一个空 Stream 对象。例如,下面的 process 方法仅返回 number 为正整数的行。如果 number 不是正数,该方法返回一个空 Stream 对象,以跳过当前行并继续处理下一行。

public Stream<OutputRow> process(int number) {
  if (inputNumber < 1) {
    return Stream.empty();
  }
  return Stream.of(new OutputRow(number));
}
Copy

如果 process 返回 null Stream,处理将会停止。(即使返回 null Stream,仍会调用 endPartition 方法。)

此方法是必需的。

endPartition 方法

此可选方法可用于生成基于 process 中汇总的任何状态信息的输出行。将该分区中的所有行传递给 process 之后,对每个 :ref:`分区 <label-udf_java_partitions>`调用此方法一次。

如果包含此方法,则无论数据是显式还是隐式分区,都会在每个分区上调用该方法。如果未对数据进行有意义的分区,终结器的输出可能没有意义。

备注

如果用户没有明确地对数据进行分区,Snowflake 会对数据进行隐式分区。有关详细信息,请参阅:分区

此方法可输出零行、一行或多行。

备注

虽然 Snowflake 支持大型分区,会调整超时以成功处理分区,但特别大的分区可能导致处理超时(例如 endPartition 需要太长时间才能完成)。如果您需要针对特定使用场景调整超时阈值,请联系 Snowflake 支持部门

getOutputClass 方法

此方法返回有关 输出行类 的信息。输出行类包含有关返回行的数据类型的信息。

输出行类

Snowflake 使用输出行类,来帮助指定 Java 数据类型与 SQL 数据类型之间的转换。

在 Java UDTF 返回一行时,必须将该行每列中的值从 Java 数据类型转换为相应的 SQL 数据类型。SQL 数据类型在 CREATE FUNCTION 语句的 RETURNS 子句中指定。但是,Java 与 SQL 数据类型之间的映射并非 1 对 1,因此 Snowflake 需要知道每个返回列的 Java 数据类型。(有关映射 SQL 和 Java 数据类型的更多信息,请参阅 SQL-Java 数据类型映射。)

Java UDTF 通过定义输出行类来指定输出列的 Java 数据类型。UDTF 返回的每一行都作为输出行类的一个实例返回。输出行类的每个实例都包含各输出列的一个公共字段。Snowflake 从输出行类的每个实例中读取公共字段的值,将 Java 值转换为 SQL 值,并构造包含这些值的 SQL 输出行。

输出行类的每个实例中的值都是通过调用输出行类的构造函数设置的。构造函数接受与输出列对应的参数,然后将公共字段设置为这些参数。

下面的代码定义了一个输出行类示例:

class OutputRow {

  public String name;
  public int id;

  public OutputRow(String pName, int pId) {
    this.name = pName;
    this.id = pId;
  }

}
Copy

此类指定的公共变量必须与 CREATE FUNCTION 语句的 RETURNS TABLE (...) 子句中指定的列匹配。例如,上面的 OutputRow 类对应于下面的 RETURNS 子句:

CREATE FUNCTION F(...)
  RETURNS TABLE(NAME VARCHAR, ID INTEGER)
  ...
Copy

重要

输出行类中的列名和 Java 公共字段名称之间的 SQL 匹配 不区分大小写。例如,在上面显示的 Java 和 SQL 代码中,名为 id 的 Java 字段对应于名为 ID 的 SQL 列。

输出行类的使用方式如下:

  • 处理程序类使用输出行类来指定 process 方法和 endPartition 方法的返回类型。处理程序类还会使用输出行类来构造返回值。例如:

    public Stream<OutputRow> process(String v) {
      ...
      return Stream.of(new OutputRow(...));
    }
    
    public Stream<OutputRow> endPartition() {
      ...
      return Stream.of(new OutputRow(...));
    }
    
    Copy
  • 输出行类也用于处理程序类的 getOutputClass 方法,这是 Snowflake 调用的静态方法,用于了解输出的 Java 数据类型:

    public static Class getOutputClass() {
      return OutputRow.class;
    }
    
    Copy

从输出行类(或处理程序类)中的任何方法抛出异常都会导致处理停止。调用 UDTF 的查询失败,并显示错误消息。

要求汇总

UDTF 的 Java 代码必须满足以下要求:

  • 代码必须定义 输出行类

  • UDTF 处理程序类必须包含一个名为 process 的公共方法,该方法返回 <output_row_class> 的 Stream,其中 Stream 在 java.util.stream.Stream 中定义。

  • UDTF 处理程序类必须定义一个名为 getOutputClass 的公共静态方法,该方法必须返回 <output_row_class>.class

如果 Java 代码不满足这些要求,UDTF 的创建或执行将失败:

  • 如果会话在执行 CREATE FUNCTION 语句时具有活动仓库,Snowflake 会在创建函数时检测冲突。

  • 如果会话在执行 CREATE FUNCTION 语句时没有活动仓库,Snowflake 会在调用函数时检测冲突。

在查询中调用 Java UDTFs 的示例

有关调用 UDFs 和 UDTFs 的一般信息,请参阅 执行 UDF

不使用显式分区的调用

此示例演示如何创建 UDTF。此示例为每个输入返回两个副本,并为每个分区返回一个额外的行。

create function return_two_copies(v varchar)
returns table(output_value varchar)
language java
handler='TestFunction'
target_path='@~/TestFunction.jar'
as
$$

  import java.util.stream.Stream;

  class OutputRow {

    public String output_value;

    public OutputRow(String outputValue) {
      this.output_value = outputValue;
    }

  }


  class TestFunction {

    String myString;

    public TestFunction()  {
      myString = "Created in constructor and output from endPartition()";
    }

    public static Class getOutputClass() {
      return OutputRow.class;
    }

    public Stream<OutputRow> process(String inputValue) {
      // Return two rows with the same value.
      return Stream.of(new OutputRow(inputValue), new OutputRow(inputValue));
    }

    public Stream<OutputRow> endPartition() {
      // Returns the value we initialized in the constructor.
      return Stream.of(new OutputRow(myString));
    }

  }

$$;
Copy

此示例演示如何调用 UDTF。为使此示例简单,该语句传递字面量值而不是列,并省略 OVER() 子句。

SELECT output_value
   FROM TABLE(return_two_copies('Input string'));
+-------------------------------------------------------+
| OUTPUT_VALUE                                          |
|-------------------------------------------------------|
| Input string                                          |
| Input string                                          |
| Created in constructor and output from endPartition() |
+-------------------------------------------------------+
Copy

此示例使用从另一个表中读取的值调用 UDTF。每次调用 process 方法时,都会从 cities_of_interest 表当前行的 city_name 列传递一个值。如上所述,UDTF 在没有显式 OVER() 子句的情况下调用。

创建一个简单的表,将其用作输入源:

CREATE TABLE cities_of_interest (city_name VARCHAR);
INSERT INTO cities_of_interest (city_name) VALUES
    ('Toronto'),
    ('Warsaw'),
    ('Kyoto');
Copy

调用 Java UDTF:

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
| NULL      | Created in constructor and output from endPartition() |
+-----------+-------------------------------------------------------+
Copy

注意

在此示例中,FROM 子句中所用的语法与内部联接(即 FROM t1, t2)的语法相同;但执行的操作 不是 真正的内部联接。实际行为是使用表中每一行的值调用函数。换言之,在给定以下 FROM 子句的情况下:

FROM cities_of_interest, TABLE(f(city_name))
Copy

其行为等同于以下伪代码:

for city_name in cities_of_interest:
    output_row = f(city_name)
Copy

:ref:` JavaScript UDTFs 文档中的示例部分 <label-udtf_javascript_examples>` 包含使用表中的值调用 UDTFs 的更复杂的查询示例。

如果语句未显式指定分区,Snowflake 执行引擎会使用 隐式分区

如果仅有一个分区,则 endPartition 方法仅调用一次,并且查询的输出仅包含一行,该行包含 Created in constructor and output from endPartition() 值。如果在语句的不同执行中将数据分组到不同数量的分区中,则调用 endPartition 方法的次数不同,并且输出包含此行的不同数量的副本。

有关更多信息,请参阅 隐式分区

使用显式分区的调用

也可以使用显式分区来调用 Java UDTFs。

多个分区

以下示例使用之前创建的同一个 UDTF 和表。该示例按 city_name 对数据进行分区。

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name) OVER (PARTITION BY city_name))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Created in constructor and output from endPartition() |
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Created in constructor and output from endPartition() |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Created in constructor and output from endPartition() |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
+-----------+-------------------------------------------------------+
Copy

单个分区

以下示例使用之前创建的同一个 UDTF 和表,并按一个常量对数据进行分区,这将强制 Snowflake 仅使用单个分区:

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name) OVER (PARTITION BY 1))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
| NULL      | Created in constructor and output from endPartition() |
+-----------+-------------------------------------------------------+
Copy

请注意,输出中仅包含消息 Created in constructor and output from endPartition() 的一个副本,这表示 endPartition 仅被调用了一次。

处理超大输入(例如大文件)

在某些情况下,UDTF 需要占用非常大量的内存来处理每个输入行。例如,UDTF 可能会读取和处理一个大小超出内存处理能力的文件。

若要处理 UDF 或 UDTF 中的大文件,请使用 SnowflakeFileInputStream 类。有关更多信息,请参阅 使用 UDF 和过程处理程序处理非结构化数据

语言: 中文