在 Scala 中为 DataFrames 创建用户定义的函数 (UDFs)¶
Snowpark API 提供了一些方法,您可以使用这些方法在 Scala 中通过 Lambda 或函数创建用户定义函数。本主题说明如何创建这些类型的函数。
本主题内容:
简介¶
您可以调用 Snowpark APIs,在 Scala 中为自定义 lambda 和函数创建用户定义的函数 (UDFs),并且可以调用这些 UDFs 函数来处理 DataFrame。
当您使用 Snowpark API 创建 UDF 时,Snowpark 库会序列化 UDF 的代码并将其上传到内部暂存区。在调用 UDF 时,Snowpark 库将在数据所在的服务器上执行函数。因此,不需要将数据传输到客户端,函数就可以处理数据。
在自定义代码中,还可以调用 JAR 文件中打包的代码(例如,第三方库的 Java 类)。
可以通过以下两种方式之一为自定义代码创建 UDF :
您可以 创建匿名 UDF 并将该函数赋值给变量。只要此变量在作用域中,您就可以使用此变量调用 UDF。
// Create and register an anonymous UDF (doubleUdf). val doubleUdf = udf((x: Int) => x + x) // Call the anonymous UDF. val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
您可以 创建命名 UDF,并按名称调用 UDF。例如,如果需要按名称调用 UDF 或在后续会话中使用 UDF,您可以使用此方式。
// Create and register a permanent named UDF ("doubleUdf"). session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage") // Call the named UDF. val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
以下部分将提供有关在 Snowpark 中创建 UDFs 的重要信息:
本主题的其余部分将介绍如何创建 UDFs。
实参和返回值支持的数据类型¶
要为 Scala 函数或 lambda 创建 UDF,必须对函数或 lambda 的实参和返回值使用下列受支持的数据类型:
SQL 数据类型 |
Scala 数据类型 |
备注 |
---|---|---|
支持以下类型:
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
` com.snowflake.snowpark.types.Variant `_ |
||
|
||
|
支持以下类型的可变映射:
|
|
` com.snowflake.snowpark.types.Geography `_ |
使用 App Trait 在对象中创建 UDFs 的注意事项¶
Scala 提供了一个 ` App <https://www.scala-lang.org/api/2.12.11/scala/App.html (https://www.scala-lang.org/api/2.12.11/scala/App.html)>`_ Trait,您可以扩展该 Trait,以便将 Scala 对象转换为可执行程序。该 App
Trait 提供了一个 main
方法,用于自动执行对象定义正文中的所有代码。(对象定义中的代码实际上会成为 main
方法。)
扩展 App
Trait 的效果之一是,在调用 main
方法之前,不会初始化对象中的字段。如果对象扩展了 App
,并且您定义了一个 UDF 使用之前初始化的对象字段,则上传到服务器的 UDF 定义将不包括该对象字段的初始化值。
例如,假设您定义并初始化了对象中名为 myConst
的字段,并在一个 UDF 中使用了这个字段:
object Main extends App {
...
// Initialize a field.
val myConst = "Prefix "
// Use the field in a UDF.
// Because the App trait delays the initialization of the object fields,
// myConst in the UDF definition resolves to null.
val myUdf = udf((s : String) => myConst + s )
...
}
当 Snowpark 序列化 UDF 定义并将其上传到 Snowflake 时, myConst
未初始化,并且会解析为 null
。因此,调用 UDF 时会为 myConst
返回 null
。
若要避免此类情况,请更改对象,使其不扩展 App
Trait,并为代码实现单独的 main
方法:
object Main {
...
def main(args: Array[String]): Unit = {
... // Your code ...
}
...
}
为 UDF 指定依赖项¶
要通过 Snowpark API 定义 UDF,您必须为以下文件调用 Session.addDependency()
:包含 UDF 依赖的任何类和资源的任何文件(例如 JAR 文件、资源文件等)。(有关从 UDF 读取资源的更多信息,请参阅 通过 UDF 读取文件。)
Snowpark 库将这些文件上传到内部暂存区,并在执行 UDF 时将文件添加到类路径。
小技巧
如果不希望在您每次运行应用程序时该库都上传文件,请将文件上传到暂存区。在调用 addDependency
时,将路径传递到暂存区中的文件。
如果您使用的是 Scala REPL,则 :emph:` 必须 ` 将 :doc:` REPL 生成的类目录 <quickstart-scala-repl>` 添加为依赖项。例如,如果使用 run.sh
脚本启动 REPL,则调用以下方法,以便添加该脚本创建的 repl_classes
目录:
// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")
以下示例演示如何将暂存区中的 JAR 文件添加为依赖项:
// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")
以下示例演示如何为 JAR 文件和资源文件添加依赖项:
// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")
// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")
// Add a resource file.
session.addDependency("/<path>/my-resource.xml")
您不应需要指定以下依赖项:
您的 Scala 运行时库。
这些库已在执行 UDFs 的服务器上的运行时环境中可用。
Snowpark JAR 文件。
Snowpark 库会自动尝试检测 Snowpark JAR 文件并将其上传到服务器。
防止库重复将 Snowpark JAR 文件上传到服务器:
将 Snowpark JAR 文件上传到暂存区。
例如,以下命令将 Snowpark JAR 文件上传到暂存区
@mystage
。PUT 命令会压缩 JAR 文件并将生成的文件命名为 snowpark-1.12.1.jar.gz。-- Put the Snowpark JAR file in a stage. PUT file:///<path>/snowpark-1.12.1.jar @mystage调用
addDependency
,将暂存区中的 Snowpark JAR 文件添加为依赖项。例如,添加上一个命令上传的 Snowpark JAR 文件:
// Add the Snowpark JAR file that you uploaded to a stage. session.addDependency("@mystage/snowpark-1.12.1.jar.gz")请注意, JAR 文件的指定路径包括
.gz
文件扩展名,该扩展名由 PUT 命令添加。包含当前正在运行的应用程序的 JAR 文件或目录。
Snowpark 库会自动尝试检测并上传这些依赖项。
如果 Snowpark 库无法自动检测这些依赖项,库会报告错误,您必须调用
addDependency
,以手动添加这些依赖项。
如果将依赖项上传到暂存区时花费的时间过长,Snowpark 库会报告超时异常。要配置 Snowpark 库应等待的最长时间,请在创建会话时设置 :ref:` snowpark_request_timeout_in_seconds <label-snowpark_request_timeout_in_seconds>` 属性。
创建匿名 UDF¶
要创建匿名 UDF,您可以执行以下任一操作:
调用
com.snowflake.snowpark.functions
对象中的udf
函数,传入匿名函数的定义。调用
UDFRegistration
类中的registerTemporary
方法,传入匿名函数的定义。由于您正在注册匿名 UDF,因此必须使用不带name
参数的方法签名。
备注
在编写多线程代码时(例如,使用并行集合时),请使用 registerTemporary
方法来注册 UDFs,而不是使用 udf
函数。这可以防止找不到默认 Snowflake Session
对象的错误。
这些方法会返回 UserDefinedFunction
对象,用于调用 UDF。(请参阅 调用标量用户定义的函数 (UDFs)。)
以下示例创建匿名 UDF:
// Create and register an anonymous UDF.
val doubleUdf = udf((x: Int) => x + x)
// Call the anonymous UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
备注
如果要在 Jupyter Notebook 中创建 UDF,则必须将笔记本设置为使用 Snowpark(请参阅 为 Snowpark Scala 设置 Jupyter Notebook),并遵循在 Notebook 中编写 UDFs 的准则(请参阅 在 Jupyter Notebook 中创建 UDFs)。
以下示例创建匿名 UDF,它传入一个 String
值的 Array
,并为每个值追加字符串 x
:
// Create and register an anonymous UDF.
val appendUdf = udf((x: Array[String]) => x.map(a => a + " x"))
// Call the anonymous UDF, passing in the "a" column, which holds an ARRAY.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "appended".
val dfWithXAppended = df.withColumn("appended", appendUdf(col("a")))
以下示例创建匿名 UDF,该匿名函数使用自定义类(LanguageDetector
,用于检测文本中使用的语言)。该示例调用匿名 UDF 来检测 DataFrame 中 text_data
列中的语言,并创建新的 DataFrame,其中包含所用语言的附加 lang
列。
// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._
// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector
// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")
// Create a detector
val detector = new LanguageDetector()
// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
Option(detector.detect(s)).getOrElse("UNKNOWN"))
// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
dfEmails.withColumn("lang", langUdf(col("text_data")))
创建和注册命名的 UDF¶
如果要按名称调用 UDF(例如,通过在 functions
对象中使用 callUDF
函数),或者需要在后续会话中使用 UDF,则可以创建并注册命名的 UDF。为此,请使用 UDFRegistration
类中的下列其中一种方法:
registerTemporary
,如果您只计划在当前会话中使用 UDFregisterPermanent
,如果您计划在后续会话中使用 UDF
要访问 UDFRegistration
类的对象,请调用 Session
类的 udf
方法。
registerTemporary
会创建可在当前会话中使用的临时 UDF。
// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
registerPermanent
会创建可在当前和后续会话中使用的 UDF。在调用 registerPermanent
时,您还必须在内部暂存区位置中指定一个位置,用于上传 UDF 及其依赖项的 JAR 文件。
备注
registerPermanent
不支持外部暂存区。
例如:
// Create and register a permanent named UDF.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
备注
如果要在 Jupyter Notebook 中创建 UDF,则必须将笔记本设置为使用 Snowpark(请参阅 为 Snowpark Scala 设置 Jupyter Notebook),并遵循在 Notebook 中编写 UDFs 的准则(请参阅 在 Jupyter Notebook 中创建 UDFs)。
在 Jupyter Notebook 中创建 UDFs¶
如果要在 ` Jupyter Notebook <https://jupyter-notebook.readthedocs.io/en/stable/notebook.html (https://jupyter-notebook.readthedocs.io/en/stable/notebook.html)>`_ 中创建 UDFs,则必须执行以下额外的步骤:
为 Snowpark Scala 设置 Jupyter Notebook (如果您尚未将 Notebook 设置为与 Snowpark 配合使用)
编写 UDF 的实现¶
在扩展 Serializable
的类中定义函数实现。例如:
// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
val myUserDefinedFunc = (s: String) => {
...
}
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)
访问在另外一个单元格中定义的变量¶
如果需要使用在 UDF 的另外一个单元格中定义的变量,则必须将该变量作为实参传递给类构造函数。例如,假设您在单元格 1 中定义了一个变量:
In [1]:
val prefix = "Hello"
并且希望在单元格 2 中定义的 UDF 中使用该变量。在 UDF 的类构造函数中,为此变量添加一个实参。然后,在调用类构造函数以创建 UDF 时,传入在单元格 1 中定义的变量:
In [2]:
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
val prependPrefixFunc = (s: String) => {
s"$prefix $s"
}
}
// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()
使用不可序列化的对象¶
在为 lambda 或函数创建 UDF 时,Snowpark 库会序列化 lambda 闭包并将其发送到服务器进行执行。
如果 lambda 闭包获取的对象不可序列化,则 Snowpark 库将引发 java.io.NotSerializableException
异常。
Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
如果发生这种情况,您可以采取以下方案之一:
使对象可序列化,或
将对象声明为
lazy val
或使用@transient
注释来避免序列化对象。例如:
// Declare the detector object as lazy. lazy val detector = new LanguageDetector("en") // The detector object is not serialized but is instead reconstructed on the server. val langUdf = udf((s: String) => Option(detector.detect(s)).getOrElse("UNKNOWN"))
为 UDF 编写初始化代码¶
如果 UDF 需要初始化代码或上下文,您可以通过作为 UDF 闭包的一部分获取的值来提供此代码或上下文。
以下示例使用单独的类来初始化三个 UDFs 所需的上下文。
第一个 UDF 在 lambda 内创建类的新实例,因此每次调用 UDF 时都会执行初始化。
第二个 UDF 获取在客户端程序中生成的类的实例。客户端上生成的上下文序列化并由 UDF 使用。请注意,上下文类必须可序列化,此方法才能正常工作。
第三个 UDF 获取
lazy val
,因此上下文在第一个 UDF 调用时会延迟实例化,并在后续调用中重复利用。即使上下文不可序列化,这种方法依然有效。但不能保证数据帧内的 ALL UDF 调用会使用相同的延迟生成上下文。
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random
// Context needed for a UDF.
class Context {
val randomInt = Random.nextInt
}
// Serializable context needed for the UDF.
class SerContext extends Serializable {
val randomInt = Random.nextInt
}
object TestUdf {
def main(args: Array[String]): Unit = {
// Create the session.
val session = Session.builder.configFile("/<path>/profile.properties").create
import session.implicits._
session.range(1, 10, 2).show()
// Create a DataFrame with two columns ("c" and "d").
val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
dummy.show()
// Initialize the context once per invocation.
val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
dummy.select(udfRepeatedInit('c)).show()
// Initialize the serializable context only once,
// regardless of the number of times that the UDF is invoked.
val sC = new SerContext
val udfOnceInit = udf((i: Int) => sC.randomInt)
dummy.select(udfOnceInit('c)).show()
// Initialize the non-serializable context only once,
// regardless of the number of times that the UDF is invoked.
lazy val unserC = new Context
val udfOnceInitU = udf((i: Int) => unserC.randomInt)
dummy.select(udfOnceInitU('c)).show()
}
}
通过 UDF 读取文件¶
如前所述,Snowpark 库在服务器上上传和执行 UDFs。如果 UDF 需要从文件中读取数据,您必须确保使用 UDF 上传文件。
此外,如果文件的内容在两次调用 UDF 之间保持不变,您可以编写代码,在第一次调用期间加载文件一次,而不在后续调用中加载。这样可以提高 UDF 调用的性能。
设置 UDF 以读取文件:
将文件添加到 JAR 文件。
例如,如果 UDF 需要使用
data/
子目录 (data/hello.txt
) 中的文件,请运行jar
命令,将此文件添加到 JAR 文件:# Create a new JAR file containing data/hello.txt. $ jar cvf <path>/myJar.jar data/hello.txt
指定 JAR 文件是依赖项,该依赖项将文件上传到服务器并将文件添加到类路径。请参阅 为 UDF 指定依赖项。
例如:
// Specify that myJar.jar contains files that your UDF depends on. session.addDependency("<path>/myJar.jar")
在 UDF 中,调用
Class.getResourceAsStream
,在类路径中找到文件并读取文件。为避免添加
this
的依赖项,您可以使用classOf[com.snowflake.snowpark.DataFrame]`(而不是 :code:`getClass
)来获取Class
对象。例如,读取
data/hello.txt
文件:// Read data/hello.txt from myJar.jar. val resourceName = "/data/hello.txt" val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
在此示例中,资源名称以
/
开头,表示这是 JAR 文件中文件的完整路径。(在这种情况下,文件的位置与类的包无关。)
备注
如果您不希望文件的内容在两次调用 UDF 之间发生变化,请将文件读取到
lazy val
之中。这会导致文件加载代码仅在第一次调用 UDF 时执行,而不会在后续调用时执行。
以下示例定义对象 (UDFCode
),包含将用作 UDF (readFileFunc
) 的函数。该函数读取文件 data/hello.txt
,该文件应包含字符串 hello,
。该函数将此字符串预置到作为实参传入的字符串之前。
// Create a function object that reads a file.
object UDFCode extends Serializable {
// The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
// the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
// first accessed.
lazy val prefix = {
import java.io._
val resourceName = "/data/hello.txt"
val inputStream = classOf[com.snowflake.snowpark.DataFrame]
.getResourceAsStream(resourceName)
if (inputStream == null) {
throw new Exception("Can't find file " + resourceName)
}
scala.io.Source.fromInputStream(inputStream).mkString
}
val readFileFunc = (s: String) => prefix + " : " + s
}
该示例的下一部分将函数注册为匿名 UDF。该示例调用 DataFrame 的 NAME
列中的 UDF。假设 data/hello.txt
文件打包在 JAR 文件 myJar.jar
中。
// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")
// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")
// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)
// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()
创建用户定义的表函数 (UDTFs)¶
要在 Snowpark 中创建和注册 UDTF,您必须执行以下操作:
接下来的部分将更详细地介绍这些步骤。
有关调用 UDTF 的信息,请参阅 调用 UDTF。
定义 UDTF 类¶
定义一个类,该类继承自 ` com.snowflake.snowpark.udtf package _ 中的 :samp:`UDTF{n} 类之一(例如 UDTF0
、UDTF1
等)其中 n
指定 UDTF 的输入实参的数量。例如,如果 UDTF 传入 2 个输入实参,则扩展 UDTF2
类。
在类中替换以下方法:
:ref:` outputSchema() <label-snowpark_udtf_define_class_output_schema>`,返回
types.StructType
对象,该对象描述返回行中字段的名称和类型(输出的“架构”)。:ref:` process() <label-snowpark_udtf_define_class_process>`,对 :emph:` 输入分区` 中的每一行调用一次(请参阅下面的注释)。
:ref:` endPartition() <label-snowpark_udtf_define_class_end_partition>`,在所有行都传递到
process()
后,为每个分区调用一次。
在调用 UDTF 时,行在传递到 UDTF 之前划分成分区:
如果调用 UDTF 的语句指定 PARTITION 子句(显式分区),则该子句将确定行的分区方式。
如果语句未指定 PARTITION 子句(隐式分区),则 Snowflake 将确定如何最好地对行进行分区。
有关分区的说明,请参阅 表函数和分区。
有关类的 UDTF 示例,请参阅 UDTF 类示例。
替换 outputSchema() 方法¶
替换 outputSchema()
方法,定义 process()
和 endPartition()
方法返回的行的字段(“输出架构”)的名称和数据类型。
def outputSchema(): StructType
在此方法中,构造并返回 ` StructType _ 对象,该对象使用 ` StructField `_ 对象的 :code:`Array 来指定返回行中每个字段的 Snowflake 数据类型。Snowflake 针对 UDTF 的输出架构支持以下类型对象:
SQL 数据类型 |
Scala 类型 |
|
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
` com.snowflake.snowpark.types.Variant `_ |
|
|
|
|
|
|
|
|
|
|
|
|
|
例如,如果 UDTF 返回包含单个整数字段的行:
override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
替换 process() 方法¶
在 UDTF 类中,替换 process()
方法:
def process(arg0: A0, ... arg<n> A<n>): Iterable[Row]
其中 n
是传递给 UDTF 的实参数量。
签名中的实参数量对应于您扩展的类。例如,如果 UDTF 传入 2 个输入实参,并且您要扩展 UDTF2
类,则 process()
方法具有以下签名:
def process(arg0: A0, arg1: A1): Iterable[Row]
此方法会针对输入分区中的每一行调用一次。
选择实参类型¶
对于 process()
方法中每个实参的类型,请使用与传递到 UDTF 的实参的 Snowflake 数据类型相对应的 Scala 类型。
Snowflake 针对 UDTF 的实参支持以下数据类型:
SQL 数据类型 |
Scala 数据类型 |
备注 |
---|---|---|
支持以下类型:
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
` com.snowflake.snowpark.types.Variant `_ |
||
|
||
|
支持以下类型的可变映射:
|
返回行¶
在 process()
方法中,构造并返回 Row
对象的 Iterable
,其包含 UDTF 针对给定输入值要返回的数据。行中的字段必须使用 outputSchema
方法中指定的类型。(请参阅 替换 outputSchema() 方法。)
例如,如果 UDTF 生成行,请为生成的行构造并返回 Row
对象的 Iterable
:
override def process(start: Int, count: Int): Iterable[Row] = (start until (start + count)).map(Row(_))
替换 endPartition() 方法¶
替换 endPartition
方法,并且添加在输入分区中的所有行传递到 process
方法后应执行的代码。每个输入分区调用 endPartition
方法一次。
def endPartition(): Iterable[Row]
如果您需要在处理完分区中的所有行后执行任何工作,您可以使用此方法。例如,您可以:
根据在每个
process
方法调用中获取的状态信息返回行。返回未绑定到特定输入行的行。
返回汇总
process
方法生成的输出行的行。
返回的行中的字段必须与您在 outputSchema
方法中指定的类型匹配。(请参阅 替换 outputSchema() 方法。)
如果不需要在每个分区的末尾返回其他行,请返回一个空的 Row
对象 Iterable
。例如:
override def endPartition(): Iterable[Row] = Array.empty[Row]
备注
虽然 Snowflake 支持大型分区,会调整超时以成功处理分区,但特别大的分区可能导致处理超时(例如 endPartition
需要太长时间才能完成)。如果您需要针对特定使用场景调整超时阈值,请联系 ` Snowflake 支持部门 `_。
UDTF 类示例¶
下面是生成一系列行的 UDTF 类的示例。
由于 UDTF 传入 2 个实参,因此该类扩展了
UDTF2
。实参
start
和count
指定起始行数和要生成的行数。
class MyRangeUdtf extends UDTF2[Int, Int] {
override def process(start: Int, count: Int): Iterable[Row] =
(start until (start + count)).map(Row(_))
override def endPartition(): Iterable[Row] = Array.empty[Row]
override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
}
注册 UDTF¶
接下来,创建新类的实例,并调用其中一种 ` UDTFRegistration _ 方法以注册该类。您可以注册 :ref:`临时 <label-snowpark_udtf_register_name_temporary> 或 永久 UDTF。
注册临时 UDTF¶
要注册临时 UDTF,请调用 UDTFRegistration.registerTemporary
:
如果您不需要按名称调用 UDTF,您可以传入类的实例来注册匿名 UDTF :
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf()) // Use the returned TableFunction object to call the UDTF. session.tableFunction(tableFunction, lit(10), lit(5)).show
如果您需要按名称调用 UDTF,请同时传入 UDTF 的名称:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf()) // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
注册永久 UDTF¶
如果您需要在后续会话中使用 UDTF,请调用 UDTFRegistration.registerPermanent
以注册永久 UDTF。
在注册永久 UDTF 时,您必须指定暂存区,在此处注册方法将上传 UDTF 及其依赖项的 JAR 文件。例如:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerPermanent("myUdtf", new MyRangeUdtf(), "@mystage") // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
调用 UDTF¶
在注册 UDTF 后,您可以将返回的 TableFunction
对象传递到 Session
对象的 tableFunction
方法,从而调用 UDTF :
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf()) // Use the returned TableFunction object to call the UDTF. session.tableFunction(tableFunction, lit(10), lit(5)).show()
要按名称调用 UDTF,请构造具有该名称的 TableFunction
对象,并将其传递到 tableFunction
方法:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf()) // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
您还可以直接通过 SELECT 语句调用 UDTF :
session.sql("select * from table(myUdtf(10, 5))")