Scala UDF 处理程序示例¶
本主题包括用 Scala 编写的 UDF 处理程序代码的简单示例。
有关使用 Scala 创建标量 UDF 处理程序的信息,请参阅 使用 Scala 编写标量 UDF。有关一般编码准则,请参阅 一般 Scala UDF 处理程序编码准则。
创建和调用简单的内联 Scala UDF¶
以下语句创建并调用内联 Scala UDF。此代码返回传递给它的 VARCHAR。
此函数使用可选 CALLED ON NULL INPUT
子句声明,以表示即使输入的值为 NULL,也会调用该函数。(不管有没有这个子句,这个函数都会返回 NULL,但是您可以用另一种方式修改代码以处理 NULL,例如,返回一个空字符串。)
创建 UDF¶
CREATE OR REPLACE FUNCTION echo_varchar(x VARCHAR)
RETURNS VARCHAR
LANGUAGE SCALA
CALLED ON NULL INPUT
RUNTIME_VERSION = 2.12
HANDLER='Echo.echoVarchar'
AS
$$
class Echo {
def echoVarchar(x : String): String = {
return x
}
}
$$;
调用 UDF¶
SELECT echo_varchar('Hello');
将 NULL 传递给内联 Scala UDF¶
这使用了上面定义的 echo_varchar()
UDF。此 SQL NULL
值被隐式转换为 Scala Null (https://www.scala-lang.org/api/2.12.17/scala/Null.html),该 Scala Null
被返回并隐式转换回 SQL NULL
:
调用 UDF:
SELECT echo_varchar(NULL);
从内联 UDF 显式返回 NULL¶
以下代码显示如何显式返回 NULL 值。将 Java 值 Null
转换为 SQL NULL
。
创建 UDF¶
CREATE OR REPLACE FUNCTION return_a_null()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER='TemporaryTestLibrary.returnNull'
AS
$$
class TemporaryTestLibrary {
def returnNull(): String = {
return null
}
}
$$;
调用 UDF¶
SELECT return_a_null();
将 OBJECT 传递给内联 Scala UDF¶
以下示例使用 SQL OBJECT 数据类型和相应的 Scala 数据类型 (Map[String, String]
),并从 OBJECT 中提取一个值。此示例还表明您可以向 Scala UDF 传递多个参数。
创建并加载一个表,它包含以下 OBJECT 类型的列:
CREATE TABLE objectives (o OBJECT);
INSERT INTO objectives SELECT PARSE_JSON('{"outer_key" : {"inner_key" : "inner_value"} }');
创建 UDF¶
CREATE OR REPLACE FUNCTION extract_from_object(x OBJECT, key VARCHAR)
RETURNS VARIANT
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER='VariantLibrary.extract'
AS
$$
import scala.collection.immutable.Map
class VariantLibrary {
def extract(m: Map[String, String], key: String): String = {
return m(key)
}
}
$$;
调用 UDF¶
SELECT extract_from_object(o, 'outer_key'),
extract_from_object(o, 'outer_key')['inner_key'] FROM OBJECTIVES;
将 ARRAY 传递给内联 Scala UDF¶
以下示例使用 SQL ARRAY 数据类型。
创建 UDF¶
CREATE OR REPLACE FUNCTION generate_greeting(greeting_words ARRAY)
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER='StringHandler.handleStrings'
AS
$$
class StringHandler {
def handleStrings(strings: Array[String]): String = {
return concatenate(strings)
}
private def concatenate(strings: Array[String]): String = {
var concatenated : String = ""
for (newString <- strings) {
concatenated = concatenated + " " + newString
}
return concatenated
}
}
$$;
使用 Scala UDF 读取文件¶
您可以使用处理程序代码读取文件内容。例如,您可能需要读取文件,以使用处理程序处理非结构化数据。
该文件必须位于可供处理程序使用的 Snowflake 暂存区中。
要读取暂存文件的内容,处理程序可以通过调用 SnowflakeFile
类或 InputStream
类的方法读取动态指定的文件。
如果您需要访问调用方指定的文件,您可以这样做。有关更多信息,请参阅本主题中的以下内容:
SnowflakeFile
提供不适用于 InputStream
的功能,如下表所述。
类 |
输入 |
备注 |
---|---|---|
|
URL 格式:
文件必须位于指定的内部暂存区或外部暂存区。 |
轻松访问其他文件属性,例如文件大小。 |
|
URL 格式:
文件必须位于指定的内部暂存区或外部暂存区。 |
备注
UDF 所有者必须有权访问其位置不具有作用域 URLs 的任何文件。可以通过让处理程序代码使用新 requireScopedUrl
参数的 boolean
值调用 SnowflakeFile.newInstance
的方法来读取这些暂存文件。
以下示例在指定不需要作用域 URL 时使用 SnowflakeFile.newInstance
。
var filename = "@my_stage/filename.txt"
var sfFile = SnowflakeFile.newInstance(filename, false)
使用 SnowflakeFile
读取动态指定的文件¶
使用 SnowflakeFile
类的方法,您可以使用处理程序代码从暂存区读取文件。SnowflakeFile
类包含在 Snowflake 上 Scala UDF 处理程序可用的类路径中。
备注
为了使您的代码能够抵御文件注入攻击,在将文件的位置传递给 UDF 时,尤其是在函数的调用方不是其所有者时,请务必使用带有作用域的 URL。您可以使用内置函数 BUILD_SCOPED_FILE_URL 在 SQL 中创建带有作用域的 URL。有关 BUILD_SCOPED_FILE_URL 用途的更多信息,请参阅 非结构化数据简介。
要在本地开发 UDF 代码,请将包含 SnowflakeFile
的 Snowpark JAR 添加到代码的类路径中。有关 snowpark.jar
的信息,请参阅 为 Snowpark Scala 设置开发环境。请注意,Snowpark 客户端应用程序不能使用此类。
使用 SnowflakeFile
时,不必在创建 UDF 时使用 IMPORTS 子句指定暂存文件或包含 SnowflakeFile
的 JAR,如同在 SQL 中使用 CREATE FUNCTION 语句。
创建 UDF¶
以下示例中的代码使用 SnowflakeFile
从指定的暂存区位置读取文件。它使用来自 getInputStream
方法的 InputStream
,将文件的内容读入 String
变量。
CREATE OR REPLACE FUNCTION sum_total_sales_snowflake_file(file string)
RETURNS INTEGER
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES=('com.snowflake:snowpark:latest')
HANDLER='SalesSum.sumTotalSales'
AS
$$
import java.io.InputStream
import java.io.IOException
import java.nio.charset.StandardCharsets
import com.snowflake.snowpark_java.types.SnowflakeFile
object SalesSum {
@throws(classOf[IOException])
def sumTotalSales(filePath: String): Int = {
var total = -1
// Use a SnowflakeFile instance to read sales data from a stage.
val file = SnowflakeFile.newInstance(filePath)
val stream = file.getInputStream()
val contents = new String(stream.readAllBytes(), StandardCharsets.UTF_8)
// Omitted for brevity: code to retrieve sales data from JSON and assign it to the total variable.
return total
}
}
$$;
调用 UDF¶
SELECT sum_total_sales_input_stream(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
使用 InputStream
读取动态指定的文件¶
通过将处理程序函数的实参设为 InputStream
变量,可以将文件内容直接读入 java.io.InputStream
。当函数的调用方想要将文件路径作为实参传递时,这可能很有用。
备注
为了使您的代码能够抵御文件注入攻击,在向 UDF 传递文件位置时,请始终使用作用域 URLs。您可以使用内置函数 BUILD_SCOPED_FILE_URL 在 SQL 中创建带有作用域的 URL。有关 BUILD_SCOPED_FILE_URL 用途的更多信息,请参阅 非结构化数据简介。
创建 UDF¶
以下示例中的代码具有一个处理程序函数 sumTotalSales
,该函数接受 InputStream
并返回 Int
。在运行时,Snowflake 会自动将 file
变量路径处的文件内容分配给 stream
实参变量。
CREATE OR REPLACE FUNCTION sum_total_sales_input_stream(file STRING)
RETURNS NUMBER
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER = 'SalesSum.sumTotalSales'
PACKAGES = ('com.snowflake:snowpark:latest')
AS $$
import com.snowflake.snowpark.types.Variant
import java.io.InputStream
import java.io.IOException
import java.nio.charset.StandardCharsets
object SalesSum {
@throws(classOf[IOException])
def sumTotalSales(stream: InputStream): Int = {
val total = -1
val contents = new String(stream.readAllBytes(), StandardCharsets.UTF_8)
// Omitted for brevity: code to retrieve sales data from JSON and assign it to the total variable.
return total
}
}
$$;
调用 UDF¶
SELECT sum_total_sales_input_stream(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));