在 Scala 中为 DataFrames 创建用户定义的函数 (UDFs)

Snowpark API 提供了一些方法,您可以使用这些方法在 Scala 中通过 Lambda 或函数创建用户定义函数。本主题说明如何创建这些类型的函数。

本主题内容:

简介

您可以调用 Snowpark APIs,在 Scala 中为自定义 lambda 和函数创建用户定义的函数 (UDFs),并且可以调用这些 UDFs 函数来处理 DataFrame。

当您使用 Snowpark API 创建 UDF 时,Snowpark 库会序列化 UDF 的代码并将其上传到内部暂存区。在调用 UDF 时,Snowpark 库将在数据所在的服务器上执行函数。因此,不需要将数据传输到客户端,函数就可以处理数据。

在自定义代码中,还可以调用 JAR 文件中打包的代码(例如,第三方库的 Java 类)。

可以通过以下两种方式之一为自定义代码创建 UDF :

  • 您可以 创建匿名 UDF 并将该函数赋值给变量。只要此变量在作用域中,您就可以使用此变量调用 UDF。

    // Create and register an anonymous UDF (doubleUdf).
    val doubleUdf = udf((x: Int) => x + x)
    // Call the anonymous UDF.
    val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
    
    Copy
  • 您可以 创建命名 UDF,并按名称调用 UDF。例如,如果需要按名称调用 UDF 或在后续会话中使用 UDF,您可以使用此方式。

    // Create and register a permanent named UDF ("doubleUdf").
    session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
    // Call the named UDF.
    val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
    
    Copy

以下部分将提供有关在 Snowpark 中创建 UDFs 的重要信息:

本主题的其余部分将介绍如何创建 UDFs。

备注

如果通过运行 CREATE FUNCTION 命令来定义 UDF,您可以在 Snowpark 中调用该 UDF。

有关详细信息,请参阅 调用标量用户定义的函数 (UDFs)

实参和返回值支持的数据类型

要为 Scala 函数或 lambda 创建 UDF,必须对函数或 lambda 的实参和返回值使用下列受支持的数据类型:

SQL 数据类型

Scala 数据类型

备注

NUMBER

支持以下类型:

  • ShortOption[Short]

  • IntOption[Int]

  • LongOption[Long]

  • java.math.BigDecimal

FLOAT

FloatOption[Float]

DOUBLE

DoubleOption[Double]

VARCHAR

Stringjava.lang.String

BOOLEAN

BooleanOption[Boolean]

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Array[Byte]

VARIANT

` com.snowflake.snowpark.types.Variant `_

ARRAY

Array[String]Array[Variant]

OBJECT

Map[String, String]Map[String, Variant]

支持以下类型的可变映射:

  • scala.collection.mutable.Map[String, String]

  • scala.collection.mutable.Map[String, Variant]

GEOGRAPHY

` com.snowflake.snowpark.types.Geography `_

使用 App Trait 在对象中创建 UDFs 的注意事项

Scala 提供了一个 ` App <https://www.scala-lang.org/api/2.12.11/scala/App.html (https://www.scala-lang.org/api/2.12.11/scala/App.html)>`_ Trait,您可以扩展该 Trait,以便将 Scala 对象转换为可执行程序。该 App Trait 提供了一个 main 方法,用于自动执行对象定义正文中的所有代码。(对象定义中的代码实际上会成为 main 方法。)

扩展 App Trait 的效果之一是,在调用 main 方法之前,不会初始化对象中的字段。如果对象扩展了 App,并且您定义了一个 UDF 使用之前初始化的对象字段,则上传到服务器的 UDF 定义将不包括该对象字段的初始化值。

例如,假设您定义并初始化了对象中名为 myConst 的字段,并在一个 UDF 中使用了这个字段:

object Main extends App {
  ...
  // Initialize a field.
  val myConst = "Prefix "
  // Use the field in a UDF.
  // Because the App trait delays the initialization of the object fields,
  // myConst in the UDF definition resolves to null.
  val myUdf = udf((s : String) =>  myConst + s )
  ...
}
Copy

当 Snowpark 序列化 UDF 定义并将其上传到 Snowflake 时, myConst 未初始化,并且会解析为 null。因此,调用 UDF 时会为 myConst 返回 null

若要避免此类情况,请更改对象,使其不扩展 App Trait,并为代码实现单独的 main 方法:

object Main {
  ...
  def main(args: Array[String]): Unit = {
    ... // Your code ...
  }
  ...
}
Copy

为 UDF 指定依赖项

要通过 Snowpark API 定义 UDF,您必须为以下文件调用 Session.addDependency() :包含 UDF 依赖的任何类和资源的任何文件(例如 JAR 文件、资源文件等)。(有关从 UDF 读取资源的更多信息,请参阅 通过 UDF 读取文件。)

Snowpark 库将这些文件上传到内部暂存区,并在执行 UDF 时将文件添加到类路径。

小技巧

如果不希望在您每次运行应用程序时该库都上传文件,请将文件上传到暂存区。在调用 addDependency 时,将路径传递到暂存区中的文件。

如果您使用的是 Scala REPL,则 :emph:` 必须 ` 将 :doc:` REPL 生成的类目录 <quickstart-scala-repl>` 添加为依赖项。例如,如果使用 run.sh 脚本启动 REPL,则调用以下方法,以便添加该脚本创建的 repl_classes 目录:

// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")
Copy

以下示例演示如何将暂存区中的 JAR 文件添加为依赖项:

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")
Copy

以下示例演示如何为 JAR 文件和资源文件添加依赖项:

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")

// Add a resource file.
session.addDependency("/<path>/my-resource.xml")
Copy

您不应需要指定以下依赖项:

  • 您的 Scala 运行时库。

    这些库已在执行 UDFs 的服务器上的运行时环境中可用。

  • Snowpark JAR 文件。

    Snowpark 库会自动尝试检测 Snowpark JAR 文件并将其上传到服务器。

    防止库重复将 Snowpark JAR 文件上传到服务器:

    1. 将 Snowpark JAR 文件上传到暂存区。

      例如,以下命令将 Snowpark JAR 文件上传到暂存区 @mystage。PUT 命令会压缩 JAR 文件并将生成的文件命名为 snowpark-1.12.1.jar.gz。

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-1.12.1.jar @mystage
    2. 调用 addDependency,将暂存区中的 Snowpark JAR 文件添加为依赖项。

      例如,添加上一个命令上传的 Snowpark JAR 文件:

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-1.12.1.jar.gz")

      请注意, JAR 文件的指定路径包括 .gz 文件扩展名,该扩展名由 PUT 命令添加。

  • 包含当前正在运行的应用程序的 JAR 文件或目录。

    Snowpark 库会自动尝试检测并上传这些依赖项。

    如果 Snowpark 库无法自动检测这些依赖项,库会报告错误,您必须调用 addDependency,以手动添加这些依赖项。

如果将依赖项上传到暂存区时花费的时间过长,Snowpark 库会报告超时异常。要配置 Snowpark 库应等待的最长时间,请在创建会话时设置 :ref:` snowpark_request_timeout_in_seconds <label-snowpark_request_timeout_in_seconds>` 属性。

创建匿名 UDF

要创建匿名 UDF,您可以执行以下任一操作:

  • 调用 com.snowflake.snowpark.functions 对象中的 udf 函数,传入匿名函数的定义。

  • 调用 UDFRegistration 类中的 registerTemporary 方法,传入匿名函数的定义。由于您正在注册匿名 UDF,因此必须使用不带 name 参数的方法签名。

备注

在编写多线程代码时(例如,使用并行集合时),请使用 registerTemporary 方法来注册 UDFs,而不是使用 udf 函数。这可以防止找不到默认 Snowflake Session 对象的错误。

这些方法会返回 UserDefinedFunction 对象,用于调用 UDF。(请参阅 调用标量用户定义的函数 (UDFs)。)

以下示例创建匿名 UDF:

// Create and register an anonymous UDF.
val doubleUdf = udf((x: Int) => x + x)
// Call the anonymous UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
Copy

备注

如果要在 Jupyter Notebook 中创建 UDF,则必须将笔记本设置为使用 Snowpark(请参阅 为 Snowpark Scala 设置 Jupyter Notebook),并遵循在 Notebook 中编写 UDFs 的准则(请参阅 在 Jupyter Notebook 中创建 UDFs)。

以下示例创建匿名 UDF,它传入一个 String 值的 Array,并为每个值追加字符串 x

// Create and register an anonymous UDF.
val appendUdf = udf((x: Array[String]) => x.map(a => a + " x"))
// Call the anonymous UDF, passing in the "a" column, which holds an ARRAY.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "appended".
val dfWithXAppended = df.withColumn("appended", appendUdf(col("a")))
Copy

以下示例创建匿名 UDF,该匿名函数使用自定义类(LanguageDetector,用于检测文本中使用的语言)。该示例调用匿名 UDF 来检测 DataFrame 中 text_data 列中的语言,并创建新的 DataFrame,其中包含所用语言的附加 lang 列。

// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")

// Create a detector
val detector = new LanguageDetector()

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
     Option(detector.detect(s)).getOrElse("UNKNOWN"))

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(col("text_data")))
Copy

创建和注册命名的 UDF

如果要按名称调用 UDF(例如,通过在 functions 对象中使用 callUDF 函数),或者需要在后续会话中使用 UDF,则可以创建并注册命名的 UDF。为此,请使用 UDFRegistration 类中的下列其中一种方法:

  • registerTemporary,如果您只计划在当前会话中使用 UDF

  • registerPermanent,如果您计划在后续会话中使用 UDF

要访问 UDFRegistration 类的对象,请调用 Session 类的 udf 方法。

registerTemporary 会创建可在当前会话中使用的临时 UDF。

// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Copy

registerPermanent 会创建可在当前和后续会话中使用的 UDF。在调用 registerPermanent 时,您还必须在内部暂存区位置中指定一个位置,用于上传 UDF 及其依赖项的 JAR 文件。

备注

registerPermanent 不支持外部暂存区。

例如:

// Create and register a permanent named UDF.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Copy

备注

如果要在 Jupyter Notebook 中创建 UDF,则必须将笔记本设置为使用 Snowpark(请参阅 为 Snowpark Scala 设置 Jupyter Notebook),并遵循在 Notebook 中编写 UDFs 的准则(请参阅 在 Jupyter Notebook 中创建 UDFs)。

在 Jupyter Notebook 中创建 UDFs

如果要在 ` Jupyter Notebook <https://jupyter-notebook.readthedocs.io/en/stable/notebook.html (https://jupyter-notebook.readthedocs.io/en/stable/notebook.html)>`_ 中创建 UDFs,则必须执行以下额外的步骤:

编写 UDF 的实现

在扩展 Serializable 的类中定义函数实现。例如:

// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
  val myUserDefinedFunc = (s: String) => {
    ...
  }
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)
Copy

访问在另外一个单元格中定义的变量

如果需要使用在 UDF 的另外一个单元格中定义的变量,则必须将该变量作为实参传递给类构造函数。例如,假设您在单元格 1 中定义了一个变量:

In [1]:
Copy
val prefix = "Hello"
Copy

并且希望在单元格 2 中定义的 UDF 中使用该变量。在 UDF 的类构造函数中,为此变量添加一个实参。然后,在调用类构造函数以创建 UDF 时,传入在单元格 1 中定义的变量:

In [2]:
Copy
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
  val prependPrefixFunc = (s: String) => {
    s"$prefix $s"
  }
}

// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()
Copy

使用不可序列化的对象

在为 lambda 或函数创建 UDF 时,Snowpark 库会序列化 lambda 闭包并将其发送到服务器进行执行。

如果 lambda 闭包获取的对象不可序列化,则 Snowpark 库将引发 java.io.NotSerializableException 异常。

Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Copy

如果发生这种情况,您可以采取以下方案之一:

  • 使对象可序列化,或

  • 将对象声明为 lazy val 或使用 @transient 注释来避免序列化对象。

    例如:

    // Declare the detector object as lazy.
    lazy val detector = new LanguageDetector("en")
    // The detector object is not serialized but is instead reconstructed on the server.
    val langUdf = udf((s: String) =>
         Option(detector.detect(s)).getOrElse("UNKNOWN"))
    
    Copy

为 UDF 编写初始化代码

如果 UDF 需要初始化代码或上下文,您可以通过作为 UDF 闭包的一部分获取的值来提供此代码或上下文。

以下示例使用单独的类来初始化三个 UDFs 所需的上下文。

  • 第一个 UDF 在 lambda 内创建类的新实例,因此每次调用 UDF 时都会执行初始化。

  • 第二个 UDF 获取在客户端程序中生成的类的实例。客户端上生成的上下文序列化并由 UDF 使用。请注意,上下文类必须可序列化,此方法才能正常工作。

  • 第三个 UDF 获取 lazy val,因此上下文在第一个 UDF 调用时会延迟实例化,并在后续调用中重复利用。即使上下文不可序列化,这种方法依然有效。但不能保证数据帧内的 ALL UDF 调用会使用相同的延迟生成上下文。

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random

// Context needed for a UDF.
class Context {
  val randomInt = Random.nextInt
}

// Serializable context needed for the UDF.
class SerContext extends Serializable {
  val randomInt = Random.nextInt
}

object TestUdf {
  def main(args: Array[String]): Unit = {
    // Create the session.
    val session = Session.builder.configFile("/<path>/profile.properties").create
    import session.implicits._
    session.range(1, 10, 2).show()

    // Create a DataFrame with two columns ("c" and "d").
    val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
    dummy.show()

    // Initialize the context once per invocation.
    val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
    dummy.select(udfRepeatedInit('c)).show()

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    val sC = new SerContext
    val udfOnceInit = udf((i: Int) => sC.randomInt)
    dummy.select(udfOnceInit('c)).show()

    // Initialize the non-serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    lazy val unserC = new Context
    val udfOnceInitU = udf((i: Int) => unserC.randomInt)
    dummy.select(udfOnceInitU('c)).show()
  }
}
Copy

通过 UDF 读取文件

如前所述,Snowpark 库在服务器上上传和执行 UDFs。如果 UDF 需要从文件中读取数据,您必须确保使用 UDF 上传文件。

此外,如果文件的内容在两次调用 UDF 之间保持不变,您可以编写代码,在第一次调用期间加载文件一次,而不在后续调用中加载。这样可以提高 UDF 调用的性能。

设置 UDF 以读取文件:

  1. 将文件添加到 JAR 文件。

    例如,如果 UDF 需要使用 data/ 子目录 (data/hello.txt) 中的文件,请运行 jar 命令,将此文件添加到 JAR 文件:

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
    Copy
  2. 指定 JAR 文件是依赖项,该依赖项将文件上传到服务器并将文件添加到类路径。请参阅 为 UDF 指定依赖项

    例如:

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar")
    
    Copy
  3. 在 UDF 中,调用 Class.getResourceAsStream,在类路径中找到文件并读取文件。

    为避免添加 this 的依赖项,您可以使用 classOf[com.snowflake.snowpark.DataFrame]`(而不是 :code:`getClass)来获取 Class 对象。

    例如,读取 data/hello.txt 文件:

    // Read data/hello.txt from myJar.jar.
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
    
    Copy

    在此示例中,资源名称以 / 开头,表示这是 JAR 文件中文件的完整路径。(在这种情况下,文件的位置与类的包无关。)

备注

如果您不希望文件的内容在两次调用 UDF 之间发生变化,请将文件读取到 lazy val 之中。这会导致文件加载代码仅在第一次调用 UDF 时执行,而不会在后续调用时执行。

以下示例定义对象 (UDFCode),包含将用作 UDF (readFileFunc) 的函数。该函数读取文件 data/hello.txt,该文件应包含字符串 hello,。该函数将此字符串预置到作为实参传入的字符串之前。

// Create a function object that reads a file.
object UDFCode extends Serializable {

  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
  // first accessed.
  lazy val prefix = {
    import java.io._
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame]
      .getResourceAsStream(resourceName)
    if (inputStream == null) {
      throw new Exception("Can't find file " + resourceName)
    }
    scala.io.Source.fromInputStream(inputStream).mkString
  }

  val readFileFunc = (s: String) => prefix + " : " + s
}
Copy

该示例的下一部分将函数注册为匿名 UDF。该示例调用 DataFrame 的 NAME 列中的 UDF。假设 data/hello.txt 文件打包在 JAR 文件 myJar.jar 中。

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")

// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()
Copy

创建用户定义的表函数 (UDTFs)

要在 Snowpark 中创建和注册 UDTF,您必须执行以下操作:

接下来的部分将更详细地介绍这些步骤。

有关调用 UDTF 的信息,请参阅 调用 UDTF

定义 UDTF 类

定义一个类,该类继承自 ` com.snowflake.snowpark.udtf package _ 中的 :samp:`UDTF{n} 类之一(例如 UDTF0UDTF1 等)其中 n 指定 UDTF 的输入实参的数量。例如,如果 UDTF 传入 2 个输入实参,则扩展 UDTF2 类。

在类中替换以下方法:

  • :ref:` outputSchema() <label-snowpark_udtf_define_class_output_schema>`,返回 types.StructType 对象,该对象描述返回行中字段的名称和类型(输出的“架构”)。

  • :ref:` process() <label-snowpark_udtf_define_class_process>`,对 :emph:` 输入分区` 中的每一行调用一次(请参阅下面的注释)。

  • :ref:` endPartition() <label-snowpark_udtf_define_class_end_partition>`,在所有行都传递到 process() 后,为每个分区调用一次。

在调用 UDTF 时,行在传递到 UDTF 之前划分成分区:

  • 如果调用 UDTF 的语句指定 PARTITION 子句(显式分区),则该子句将确定行的分区方式。

  • 如果语句未指定 PARTITION 子句(隐式分区),则 Snowflake 将确定如何最好地对行进行分区。

有关分区的说明,请参阅 表函数和分区

有关类的 UDTF 示例,请参阅 UDTF 类示例

替换 outputSchema() 方法

替换 outputSchema() 方法,定义 process()endPartition() 方法返回的行的字段(“输出架构”)的名称和数据类型。

def outputSchema(): StructType
Copy

在此方法中,构造并返回 ` StructType _ 对象,该对象使用 ` StructField `_ 对象的 :code:`Array 来指定返回行中每个字段的 Snowflake 数据类型。Snowflake 针对 UDTF 的输出架构支持以下类型对象:

SQL 数据类型

Scala 类型

com.snowflake.snowpark.types 类型

NUMBER

ShortOption[Short]

ShortType

NUMBER

IntOption[Int]

IntType

NUMBER

LongOption[Long]

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

FloatOption[Float]

FloatType

DOUBLE

DoubleOption[Double]

DoubleType

VARCHAR

Stringjava.lang.String

StringType

BOOLEAN

BooleanOption[Boolean]

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

Array[Byte]

BinaryType

VARIANT

` com.snowflake.snowpark.types.Variant `_

VariantType

ARRAY

Array[String]

ArrayType(StringType)

ARRAY

Array[Variant]

ArrayType(VariantType)

OBJECT

Map[String, String]

MapType(StringType, StringType)

OBJECT

Map[String, Variant]

MapType(StringType, VariantType)

例如,如果 UDTF 返回包含单个整数字段的行:

override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
Copy

替换 process() 方法

在 UDTF 类中,替换 process() 方法:

def process(arg0: A0, ... arg<n> A<n>): Iterable[Row]
Copy

其中 n 是传递给 UDTF 的实参数量。

签名中的实参数量对应于您扩展的类。例如,如果 UDTF 传入 2 个输入实参,并且您要扩展 UDTF2 类,则 process() 方法具有以下签名:

def process(arg0: A0, arg1: A1): Iterable[Row]
Copy

此方法会针对输入分区中的每一行调用一次。

选择实参类型

对于 process() 方法中每个实参的类型,请使用与传递到 UDTF 的实参的 Snowflake 数据类型相对应的 Scala 类型。

Snowflake 针对 UDTF 的实参支持以下数据类型:

SQL 数据类型

Scala 数据类型

备注

NUMBER

支持以下类型:

  • ShortOption[Short]

  • IntOption[Int]

  • LongOption[Long]

  • java.math.BigDecimal

FLOAT

FloatOption[Float]

DOUBLE

DoubleOption[Double]

VARCHAR

Stringjava.lang.String

BOOLEAN

BooleanOption[Boolean]

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Array[Byte]

VARIANT

` com.snowflake.snowpark.types.Variant `_

ARRAY

Array[String]Array[Variant]

OBJECT

Map[String, String]Map[String, Variant]

支持以下类型的可变映射:

  • scala.collection.mutable.Map[String, String]

  • scala.collection.mutable.Map[String, Variant]

返回行

process() 方法中,构造并返回 Row 对象的 Iterable,其包含 UDTF 针对给定输入值要返回的数据。行中的字段必须使用 outputSchema 方法中指定的类型。(请参阅 替换 outputSchema() 方法。)

例如,如果 UDTF 生成行,请为生成的行构造并返回 Row 对象的 Iterable

override def process(start: Int, count: Int): Iterable[Row] =
    (start until (start + count)).map(Row(_))
Copy

替换 endPartition() 方法

替换 endPartition 方法,并且添加在输入分区中的所有行传递到 process 方法后应执行的代码。每个输入分区调用 endPartition 方法一次。

def endPartition(): Iterable[Row]
Copy

如果您需要在处理完分区中的所有行后执行任何工作,您可以使用此方法。例如,您可以:

  • 根据在每个 process 方法调用中获取的状态信息返回行。

  • 返回未绑定到特定输入行的行。

  • 返回汇总 process 方法生成的输出行的行。

返回的行中的字段必须与您在 outputSchema 方法中指定的类型匹配。(请参阅 替换 outputSchema() 方法。)

如果不需要在每个分区的末尾返回其他行,请返回一个空的 Row 对象 Iterable。例如:

override def endPartition(): Iterable[Row] = Array.empty[Row]
Copy

备注

虽然 Snowflake 支持大型分区,会调整超时以成功处理分区,但特别大的分区可能导致处理超时(例如 endPartition 需要太长时间才能完成)。如果您需要针对特定使用场景调整超时阈值,请联系 ` Snowflake 支持部门 `_。

UDTF 类示例

下面是生成一系列行的 UDTF 类的示例。

  • 由于 UDTF 传入 2 个实参,因此该类扩展了 UDTF2

  • 实参 startcount 指定起始行数和要生成的行数。

class MyRangeUdtf extends UDTF2[Int, Int] {
  override def process(start: Int, count: Int): Iterable[Row] =
    (start until (start + count)).map(Row(_))
  override def endPartition(): Iterable[Row] = Array.empty[Row]
  override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
}
Copy

注册 UDTF

接下来,创建新类的实例,并调用其中一种 ` UDTFRegistration _ 方法以注册该类。您可以注册 :ref:`临时 <label-snowpark_udtf_register_name_temporary>永久 UDTF

注册临时 UDTF

要注册临时 UDTF,请调用 UDTFRegistration.registerTemporary

  • 如果您不需要按名称调用 UDTF,您可以传入类的实例来注册匿名 UDTF :

    // Register the MyRangeUdtf class that was defined in the previous example.
    val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf())
    // Use the returned TableFunction object to call the UDTF.
    session.tableFunction(tableFunction, lit(10), lit(5)).show
    
    Copy
  • 如果您需要按名称调用 UDTF,请同时传入 UDTF 的名称:

    // Register the MyRangeUdtf class that was defined in the previous example.
    val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf())
    // Call the UDTF by name.
    session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
    
    Copy

注册永久 UDTF

如果您需要在后续会话中使用 UDTF,请调用 UDTFRegistration.registerPermanent 以注册永久 UDTF。

在注册永久 UDTF 时,您必须指定暂存区,在此处注册方法将上传 UDTF 及其依赖项的 JAR 文件。例如:

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerPermanent("myUdtf", new MyRangeUdtf(), "@mystage")
// Call the UDTF by name.
session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Copy

调用 UDTF

在注册 UDTF 后,您可以将返回的 TableFunction 对象传递到 Session 对象的 tableFunction 方法,从而调用 UDTF :

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf())
// Use the returned TableFunction object to call the UDTF.
session.tableFunction(tableFunction, lit(10), lit(5)).show()
Copy

要按名称调用 UDTF,请构造具有该名称的 TableFunction 对象,并将其传递到 tableFunction 方法:

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf())
// Call the UDTF by name.
session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Copy

您还可以直接通过 SELECT 语句调用 UDTF :

session.sql("select * from table(myUdtf(10, 5))")
Copy
语言: 中文