在 Scala 中为 DataFrames 创建用户定义的函数 (UDFs)¶
Snowpark API 提供了一些方法,您可以使用这些方法在 Scala 中通过 Lambda 或函数创建用户定义函数。本主题说明如何创建这些类型的函数。
简介¶
您可以调用 Snowpark APIs,在 Scala 中为自定义 lambda 和函数创建用户定义的函数 (UDFs),并且可以调用这些 UDFs 函数来处理 DataFrame。
当您使用 Snowpark API 创建 UDF 时,Snowpark 库会序列化 UDF 的代码并将其上传到内部暂存区。在调用 UDF 时,Snowpark 库将在数据所在的服务器上执行函数。因此,不需要将数据传输到客户端,函数就可以处理数据。
在自定义代码中,还可以调用 JAR 文件中打包的代码(例如,第三方库的 Java 类)。
可以通过以下两种方式之一为自定义代码创建 UDF :
您可以 创建匿名 UDF 并将该函数赋值给变量。只要此变量在作用域中,您就可以使用此变量调用 UDF。
// Create and register an anonymous UDF (doubleUdf). val doubleUdf = udf((x: Int) => x + x) // Call the anonymous UDF. val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
您可以 创建命名 UDF,并按名称调用 UDF。例如,如果需要按名称调用 UDF 或在后续会话中使用 UDF,您可以使用此方式。
// Create and register a permanent named UDF ("doubleUdf"). session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage") // Call the named UDF. val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
以下部分将提供有关在 Snowpark 中创建 UDFs 的重要信息:
本主题的其余部分将介绍如何创建 UDFs。
实参和返回值支持的数据类型¶
要为 Scala 函数或 lambda 创建 UDF,必须对函数或 lambda 的实参和返回值使用下列受支持的数据类型:
SQL 数据类型 |
Scala 数据类型 |
备注 |
|---|---|---|
支持以下类型:
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
` com.snowflake.snowpark.types.Variant `_ |
||
|
||
|
支持以下类型的可变映射:
|
|
` com.snowflake.snowpark.types.Geography `_ |
使用 App Trait 在对象中创建 UDFs 的注意事项¶
Scala 提供了一个 ` App <https://www.scala-lang.org/api/2.12.11/scala/App.html (https://www.scala-lang.org/api/2.12.11/scala/App.html)>`_ Trait,您可以扩展该 Trait,以便将 Scala 对象转换为可执行程序。该 App Trait 提供了一个 main 方法,用于自动执行对象定义正文中的所有代码。(对象定义中的代码实际上会成为 main 方法。)
扩展 App Trait 的效果之一是,在调用 main 方法之前,不会初始化对象中的字段。如果对象扩展了 App,并且您定义了一个 UDF 使用之前初始化的对象字段,则上传到服务器的 UDF 定义将不包括该对象字段的初始化值。
例如,假设您定义并初始化了对象中名为 myConst 的字段,并在一个 UDF 中使用了这个字段:
object Main extends App {
...
// Initialize a field.
val myConst = "Prefix "
// Use the field in a UDF.
// Because the App trait delays the initialization of the object fields,
// myConst in the UDF definition resolves to null.
val myUdf = udf((s : String) => myConst + s )
...
}
当 Snowpark 序列化 UDF 定义并将其上传到 Snowflake 时, myConst 未初始化,并且会解析为 null。因此,调用 UDF 时会为 myConst 返回 null。
若要避免此类情况,请更改对象,使其不扩展 App Trait,并为代码实现单独的 main 方法:
object Main {
...
def main(args: Array[String]): Unit = {
... // Your code ...
}
...
}
为 UDF 指定依赖项¶
要通过 Snowpark API 定义 UDF,您必须为以下文件调用 Session.addDependency() :包含 UDF 依赖的任何类和资源的任何文件(例如 JAR 文件、资源文件等)。(有关从 UDF 读取资源的更多信息,请参阅 通过 UDF 读取文件。)
Snowpark 库将这些文件上传到内部暂存区,并在执行 UDF 时将文件添加到类路径。
小技巧
如果不希望在您每次运行应用程序时该库都上传文件,请将文件上传到暂存区。在调用 addDependency 时,将路径传递到暂存区中的文件。
如果您使用的是 Scala REPL,则 :emph:` 必须 ` 将 :doc:` REPL 生成的类目录 <quickstart-scala-repl>` 添加为依赖项。例如,如果使用 run.sh 脚本启动 REPL,则调用以下方法,以便添加该脚本创建的 repl_classes 目录:
// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")
以下示例演示如何将暂存区中的 JAR 文件添加为依赖项:
// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")
以下示例演示如何为 JAR 文件和资源文件添加依赖项:
// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")
// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")
// Add a resource file.
session.addDependency("/<path>/my-resource.xml")
您不应需要指定以下依赖项:
您的 Scala 运行时库。
这些库已在执行 UDFs 的服务器上的运行时环境中可用。
Snowpark JAR 文件。
Snowpark 库会自动尝试检测 Snowpark JAR 文件并将其上传到服务器。
防止库重复将 Snowpark JAR 文件上传到服务器:
将 Snowpark JAR 文件上传到暂存区。
For example, the following command uploads the Snowpark JAR file to the stage
@mystage. The PUT command compresses the JAR file and names the resulting file snowpark_2.12-1.18.0.jar.gz.-- Put the Snowpark JAR file in a stage. PUT file:///<path>/snowpark_2.12-1.18.0.jar @mystage调用
addDependency,将暂存区中的 Snowpark JAR 文件添加为依赖项。例如,添加上一个命令上传的 Snowpark JAR 文件:
// Add the Snowpark JAR file that you uploaded to a stage. session.addDependency("@mystage/snowpark_2.12-1.18.0.jar.gz")请注意, JAR 文件的指定路径包括
.gz文件扩展名,该扩展名由 PUT 命令添加。包含当前正在运行的应用程序的 JAR 文件或目录。
Snowpark 库会自动尝试检测并上传这些依赖项。
如果 Snowpark 库无法自动检测这些依赖项,库会报告错误,您必须调用
addDependency,以手动添加这些依赖项。
如果将依赖项上传到暂存区时花费的时间过长,Snowpark 库会报告超时异常。要配置 Snowpark 库应等待的最长时间,请在创建会话时设置 :ref:` snowpark_request_timeout_in_seconds <label-snowpark_request_timeout_in_seconds>` 属性。
创建匿名 UDF¶
要创建匿名 UDF,您可以执行以下任一操作:
调用
com.snowflake.snowpark.functions对象中的udf函数,传入匿名函数的定义。调用
UDFRegistration类中的registerTemporary方法,传入匿名函数的定义。由于您正在注册匿名 UDF,因此必须使用不带name参数的方法签名。
备注
在编写多线程代码时(例如,使用并行集合时),请使用 registerTemporary 方法来注册 UDFs,而不是使用 udf 函数。这可以防止找不到默认 Snowflake Session 对象的错误。
这些方法会返回 UserDefinedFunction 对象,用于调用 UDF。(请参阅 调用标量用户定义的函数 (UDFs)。)
以下示例创建匿名 UDF:
// Create and register an anonymous UDF.
val doubleUdf = udf((x: Int) => x + x)
// Call the anonymous UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
备注
如果要在 Jupyter Notebook 中创建 UDF,则必须将笔记本设置为使用 Snowpark(请参阅 为 Snowpark Scala 设置 Jupyter Notebook),并遵循在 Notebook 中编写 UDFs 的准则(请参阅 在 Jupyter Notebook 中创建 UDFs)。
以下示例创建匿名 UDF,它传入一个 String 值的 Array,并为每个值追加字符串 x :
// Create and register an anonymous UDF.
val appendUdf = udf((x: Array[String]) => x.map(a => a + " x"))
// Call the anonymous UDF, passing in the "a" column, which holds an ARRAY.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "appended".
val dfWithXAppended = df.withColumn("appended", appendUdf(col("a")))
以下示例创建匿名 UDF,该匿名函数使用自定义类(LanguageDetector,用于检测文本中使用的语言)。该示例调用匿名 UDF 来检测 DataFrame 中 text_data 列中的语言,并创建新的 DataFrame,其中包含所用语言的附加 lang 列。
// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._
// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector
// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")
// Create a detector
val detector = new LanguageDetector()
// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
Option(detector.detect(s)).getOrElse("UNKNOWN"))
// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
dfEmails.withColumn("lang", langUdf(col("text_data")))
创建和注册命名的 UDF¶
如果要按名称调用 UDF(例如,通过在 functions 对象中使用 callUDF 函数),或者需要在后续会话中使用 UDF,则可以创建并注册命名的 UDF。为此,请使用 UDFRegistration 类中的下列其中一种方法:
registerTemporary,如果您只计划在当前会话中使用 UDFregisterPermanent,如果您计划在后续会话中使用 UDF
要访问 UDFRegistration 类的对象,请调用 Session 类的 udf 方法。
registerTemporary 会创建可在当前会话中使用的临时 UDF。
// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
registerPermanent 会创建可在当前和后续会话中使用的 UDF。在调用 registerPermanent 时,您还必须在内部暂存区位置中指定一个位置,用于上传 UDF 及其依赖项的 JAR 文件。
备注
registerPermanent 不支持外部暂存区。
例如:
// Create and register a permanent named UDF.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
备注
如果要在 Jupyter Notebook 中创建 UDF,则必须将笔记本设置为使用 Snowpark(请参阅 为 Snowpark Scala 设置 Jupyter Notebook),并遵循在 Notebook 中编写 UDFs 的准则(请参阅 在 Jupyter Notebook 中创建 UDFs)。
在 Jupyter Notebook 中创建 UDFs¶
如果要在 ` Jupyter Notebook <https://jupyter-notebook.readthedocs.io/en/stable/notebook.html (https://jupyter-notebook.readthedocs.io/en/stable/notebook.html)>`_ 中创建 UDFs,则必须执行以下额外的步骤:
为 Snowpark Scala 设置 Jupyter Notebook (如果您尚未将 Notebook 设置为与 Snowpark 配合使用)
编写 UDF 的实现¶
在扩展 Serializable 的类中定义函数实现。例如:
// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
val myUserDefinedFunc = (s: String) => {
...
}
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)
访问在另外一个单元格中定义的变量¶
如果需要使用在 UDF 的另外一个单元格中定义的变量,则必须将该变量作为实参传递给类构造函数。例如,假设您在单元格 1 中定义了一个变量:
In [1]:
val prefix = "Hello"
并且希望在单元格 2 中定义的 UDF 中使用该变量。在 UDF 的类构造函数中,为此变量添加一个实参。然后,在调用类构造函数以创建 UDF 时,传入在单元格 1 中定义的变量:
In [2]:
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
val prependPrefixFunc = (s: String) => {
s"$prefix $s"
}
}
// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()
使用不可序列化的对象¶
在为 lambda 或函数创建 UDF 时,Snowpark 库会序列化 lambda 闭包并将其发送到服务器进行执行。
如果 lambda 闭包获取的对象不可序列化,则 Snowpark 库将引发 java.io.NotSerializableException 异常。
Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
如果发生这种情况,您可以采取以下方案之一:
使对象可序列化,或
将对象声明为
lazy val或使用@transient注释来避免序列化对象。例如:
// Declare the detector object as lazy. lazy val detector = new LanguageDetector("en") // The detector object is not serialized but is instead reconstructed on the server. val langUdf = udf((s: String) => Option(detector.detect(s)).getOrElse("UNKNOWN"))
为 UDF 编写初始化代码¶
如果 UDF 需要初始化代码或上下文,您可以通过作为 UDF 闭包的一部分获取的值来提供此代码或上下文。
以下示例使用单独的类来初始化三个 UDFs 所需的上下文。
第一个 UDF 在 lambda 内创建类的新实例,因此每次调用 UDF 时都会执行初始化。
第二个 UDF 获取在客户端程序中生成的类的实例。客户端上生成的上下文序列化并由 UDF 使用。请注意,上下文类必须可序列化,此方法才能正常工作。
第三个 UDF 获取
lazy val,因此上下文在第一个 UDF 调用时会延迟实例化,并在后续调用中重复利用。即使上下文不可序列化,这种方法依然有效。但不能保证数据帧内的 ALL UDF 调用会使用相同的延迟生成上下文。
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random
// Context needed for a UDF.
class Context {
val randomInt = Random.nextInt
}
// Serializable context needed for the UDF.
class SerContext extends Serializable {
val randomInt = Random.nextInt
}
object TestUdf {
def main(args: Array[String]): Unit = {
// Create the session.
val session = Session.builder.configFile("/<path>/profile.properties").create
import session.implicits._
session.range(1, 10, 2).show()
// Create a DataFrame with two columns ("c" and "d").
val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
dummy.show()
// Initialize the context once per invocation.
val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
dummy.select(udfRepeatedInit('c)).show()
// Initialize the serializable context only once,
// regardless of the number of times that the UDF is invoked.
val sC = new SerContext
val udfOnceInit = udf((i: Int) => sC.randomInt)
dummy.select(udfOnceInit('c)).show()
// Initialize the non-serializable context only once,
// regardless of the number of times that the UDF is invoked.
lazy val unserC = new Context
val udfOnceInitU = udf((i: Int) => unserC.randomInt)
dummy.select(udfOnceInitU('c)).show()
}
}
通过 UDF 读取文件¶
如前所述,Snowpark 库在服务器上上传和执行 UDFs。如果 UDF 需要从文件中读取数据,您必须确保使用 UDF 上传文件。
此外,如果文件的内容在两次调用 UDF 之间保持不变,您可以编写代码,在第一次调用期间加载文件一次,而不在后续调用中加载。这样可以提高 UDF 调用的性能。
设置 UDF 以读取文件:
将文件添加到 JAR 文件。
例如,如果 UDF 需要使用
data/子目录 (data/hello.txt) 中的文件,请运行jar命令,将此文件添加到 JAR 文件:# Create a new JAR file containing data/hello.txt. $ jar cvf <path>/myJar.jar data/hello.txt
指定 JAR 文件是依赖项,该依赖项将文件上传到服务器并将文件添加到类路径。请参阅 为 UDF 指定依赖项。
例如:
// Specify that myJar.jar contains files that your UDF depends on. session.addDependency("<path>/myJar.jar")
在 UDF 中,调用
Class.getResourceAsStream,在类路径中找到文件并读取文件。为避免添加
this的依赖项,您可以使用classOf[com.snowflake.snowpark.DataFrame]`(而不是 :code:`getClass)来获取Class对象。例如,读取
data/hello.txt文件:// Read data/hello.txt from myJar.jar. val resourceName = "/data/hello.txt" val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
在此示例中,资源名称以
/开头,表示这是 JAR 文件中文件的完整路径。(在这种情况下,文件的位置与类的包无关。)
备注
如果您不希望文件的内容在两次调用 UDF 之间发生变化,请将文件读取到
lazy val之中。这会导致文件加载代码仅在第一次调用 UDF 时执行,而不会在后续调用时执行。
以下示例定义对象 (UDFCode),包含将用作 UDF (readFileFunc) 的函数。该函数读取文件 data/hello.txt,该文件应包含字符串 hello,。该函数将此字符串预置到作为实参传入的字符串之前。
// Create a function object that reads a file.
object UDFCode extends Serializable {
// The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
// the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
// first accessed.
lazy val prefix = {
import java.io._
val resourceName = "/data/hello.txt"
val inputStream = classOf[com.snowflake.snowpark.DataFrame]
.getResourceAsStream(resourceName)
if (inputStream == null) {
throw new Exception("Can't find file " + resourceName)
}
scala.io.Source.fromInputStream(inputStream).mkString
}
val readFileFunc = (s: String) => prefix + " : " + s
}
该示例的下一部分将函数注册为匿名 UDF。该示例调用 DataFrame 的 NAME 列中的 UDF。假设 data/hello.txt 文件打包在 JAR 文件 myJar.jar 中。
// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")
// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")
// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)
// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()
创建用户定义的表函数 (UDTFs)¶
要在 Snowpark 中创建和注册 UDTF,您必须执行以下操作:
接下来的部分将更详细地介绍这些步骤。
有关调用 UDTF 的信息,请参阅 调用 UDTF。
定义 UDTF 类¶
定义一个类,该类继承自 ` com.snowflake.snowpark.udtf package _ 中的 :samp:`UDTF{n} 类之一(例如 UDTF0、UDTF1 等)其中 n 指定 UDTF 的输入实参的数量。例如,如果 UDTF 传入 2 个输入实参,则扩展 UDTF2 类。
在类中替换以下方法:
:ref:` outputSchema() <label-snowpark_udtf_define_class_output_schema>`,返回
types.StructType对象,该对象描述返回行中字段的名称和类型(输出的“架构”)。:ref:` process() <label-snowpark_udtf_define_class_process>`,对 :emph:` 输入分区` 中的每一行调用一次(请参阅下面的注释)。
:ref:` endPartition() <label-snowpark_udtf_define_class_end_partition>`,在所有行都传递到
process()后,为每个分区调用一次。
在调用 UDTF 时,行在传递到 UDTF 之前划分成分区:
如果调用 UDTF 的语句指定 PARTITION 子句(显式分区),则该子句将确定行的分区方式。
如果语句未指定 PARTITION 子句(隐式分区),则 Snowflake 将确定如何最好地对行进行分区。
有关分区的说明,请参阅 表函数和分区。
有关类的 UDTF 示例,请参阅 UDTF 类示例。
替换 outputSchema() 方法¶
替换 outputSchema() 方法,定义 process() 和 endPartition() 方法返回的行的字段(“输出架构”)的名称和数据类型。
def outputSchema(): StructType
在此方法中,构造并返回 ` StructType _ 对象,该对象使用 ` StructField `_ 对象的 :code:`Array 来指定返回行中每个字段的 Snowflake 数据类型。Snowflake 针对 UDTF 的输出架构支持以下类型对象:
SQL 数据类型 |
Scala 类型 |
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
` com.snowflake.snowpark.types.Variant `_ |
|
|
|
|
|
|
|
|
|
|
|
|
|
例如,如果 UDTF 返回包含单个整数字段的行:
override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
替换 process() 方法¶
在 UDTF 类中,替换 process() 方法:
def process(arg0: A0, ... arg<n> A<n>): Iterable[Row]
其中 n 是传递给 UDTF 的实参数量。
签名中的实参数量对应于您扩展的类。例如,如果 UDTF 传入 2 个输入实参,并且您要扩展 UDTF2 类,则 process() 方法具有以下签名:
def process(arg0: A0, arg1: A1): Iterable[Row]
此方法会针对输入分区中的每一行调用一次。
选择实参类型¶
对于 process() 方法中每个实参的类型,请使用与传递到 UDTF 的实参的 Snowflake 数据类型相对应的 Scala 类型。
Snowflake 针对 UDTF 的实参支持以下数据类型:
SQL 数据类型 |
Scala 数据类型 |
备注 |
|---|---|---|
支持以下类型:
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
` com.snowflake.snowpark.types.Variant `_ |
||
|
||
|
支持以下类型的可变映射:
|
返回行¶
在 process() 方法中,构造并返回 Row 对象的 Iterable,其包含 UDTF 针对给定输入值要返回的数据。行中的字段必须使用 outputSchema 方法中指定的类型。(请参阅 替换 outputSchema() 方法。)
例如,如果 UDTF 生成行,请为生成的行构造并返回 Row 对象的 Iterable :
override def process(start: Int, count: Int): Iterable[Row] = (start until (start + count)).map(Row(_))
替换 endPartition() 方法¶
替换 endPartition 方法,并且添加在输入分区中的所有行传递到 process 方法后应执行的代码。每个输入分区调用 endPartition 方法一次。
def endPartition(): Iterable[Row]
如果您需要在处理完分区中的所有行后执行任何工作,您可以使用此方法。例如,您可以:
根据在每个
process方法调用中获取的状态信息返回行。返回未绑定到特定输入行的行。
返回汇总
process方法生成的输出行的行。
返回的行中的字段必须与您在 outputSchema 方法中指定的类型匹配。(请参阅 替换 outputSchema() 方法。)
如果不需要在每个分区的末尾返回其他行,请返回一个空的 Row 对象 Iterable。例如:
override def endPartition(): Iterable[Row] = Array.empty[Row]
备注
虽然 Snowflake 支持大型分区,会调整超时以成功处理分区,但特别大的分区可能导致处理超时(例如 endPartition 需要太长时间才能完成)。如果您需要针对特定使用场景调整超时阈值,请联系 ` Snowflake 支持部门 `_。
UDTF 类示例¶
下面是生成一系列行的 UDTF 类的示例。
由于 UDTF 传入 2 个实参,因此该类扩展了
UDTF2。实参
start和count指定起始行数和要生成的行数。
class MyRangeUdtf extends UDTF2[Int, Int] {
override def process(start: Int, count: Int): Iterable[Row] =
(start until (start + count)).map(Row(_))
override def endPartition(): Iterable[Row] = Array.empty[Row]
override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
}
注册 UDTF¶
接下来,创建新类的实例,并调用其中一种 ` UDTFRegistration _ 方法以注册该类。您可以注册 :ref:`临时 <label-snowpark_udtf_register_name_temporary> 或 永久 UDTF。
注册临时 UDTF¶
要注册临时 UDTF,请调用 UDTFRegistration.registerTemporary:
如果您不需要按名称调用 UDTF,您可以传入类的实例来注册匿名 UDTF :
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf()) // Use the returned TableFunction object to call the UDTF. session.tableFunction(tableFunction, lit(10), lit(5)).show
如果您需要按名称调用 UDTF,请同时传入 UDTF 的名称:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf()) // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
注册永久 UDTF¶
如果您需要在后续会话中使用 UDTF,请调用 UDTFRegistration.registerPermanent 以注册永久 UDTF。
在注册永久 UDTF 时,您必须指定暂存区,在此处注册方法将上传 UDTF 及其依赖项的 JAR 文件。例如:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerPermanent("myUdtf", new MyRangeUdtf(), "@mystage") // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
调用 UDTF¶
在注册 UDTF 后,您可以将返回的 TableFunction 对象传递到 Session 对象的 tableFunction 方法,从而调用 UDTF :
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf()) // Use the returned TableFunction object to call the UDTF. session.tableFunction(tableFunction, lit(10), lit(5)).show()
要按名称调用 UDTF,请构造具有该名称的 TableFunction 对象,并将其传递到 tableFunction 方法:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf()) // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
您还可以直接通过 SELECT 语句调用 UDTF :
session.sql("select * from table(myUdtf(10, 5))")