为使用 SQL 创建的存储过程编写 Scala 处理程序¶
您可以使用处理程序代码读取文件内容。该文件必须位于可供处理程序使用的 Snowflake 暂存区中。例如,您可能希望读取文件以在处理程序中处理非结构化数据。
若要读取暂存文件的内容,处理程序可以调用 SnowflakeFile
类或 InputStream
类中的方法。如果需要在计算过程中动态访问文件,则可以执行此操作。有关更多信息,请参阅 使用 SnowflakeFile 读取动态指定的文件 或 使用 InputStream 读取动态指定的文件 (本主题内容)。
SnowflakeFile
提供不适用于 InputStream
的功能,如下表所述。
类 |
输入 |
备注 |
---|---|---|
|
URL 格式:
文件必须位于指定的内部暂存区或外部暂存区。 |
轻松访问其他文件属性,例如文件大小。 |
|
URL 格式:
文件必须位于指定的内部暂存区或外部暂存区。 |
备注
对于所有者权限存储过程,该过程的所有者必须有权访问任何非作用域 URLs 的文件。对于调用方权限过程,调用方必须有权访问任何非作用域 URLs 的文件。无论哪种情况,都可以通过让处理程序代码使用新 requireScopedUrl
参数的 boolean
值调用 SnowflakeFile.newInstance
方法来读取暂存文件。
以下示例在指定不需要作用域 SnowflakeFile.newInstance
时使用 URL。
var filename = "@my_stage/filename.txt"
var sfFile = SnowflakeFile.newInstance(filename, false)
使用 SnowflakeFile
读取动态指定的文件¶
以下示例中的代码具有一个处理程序函数 execute
,该函数接受 String
并返回 String
以及文件的内容。在运行时,Snowflake 从过程的 input
变量中的传入文件路径初始化处理程序的 fileName
变量。处理程序代码使用 SnowflakeFile
实例来读取文件。
CREATE OR REPLACE PROCEDURE file_reader_scala_proc_snowflakefile(input VARCHAR)
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER = 'FileReader.execute'
PACKAGES=('com.snowflake:snowpark:latest')
AS $$
import java.io.InputStream
import java.nio.charset.StandardCharsets
import com.snowflake.snowpark_java.types.SnowflakeFile
import com.snowflake.snowpark_java.Session
object FileReader {
def execute(session: Session, fileName: String): String = {
var input: InputStream = SnowflakeFile.newInstance(fileName).getInputStream()
return new String(input.readAllBytes(), StandardCharsets.UTF_8)
}
}
$$;
以下 CALL 示例中的代码创建一个指向该文件的作用域文件 URL。此为编码 URL,允许临时访问暂存文件,而无需授予对暂存区本身的权限。
CALL file_reader_scala_proc_snowflakefile(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
使用 InputStream
读取动态指定的文件¶
以下示例中的代码定义一个处理程序函数 execute
,该函数接受 InputStream
并返回 String
以及文件的内容。在运行时,Snowflake 从过程的 input
变量中的传入文件路径初始化处理程序的 stream
变量。处理程序代码使用 InputStream
来读取文件。
CREATE OR REPLACE PROCEDURE file_reader_scala_proc_input(input VARCHAR)
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER = 'FileReader.execute'
PACKAGES=('com.snowflake:snowpark:latest')
AS $$
import java.io.InputStream
import java.nio.charset.StandardCharsets
import com.snowflake.snowpark_java.Session
object FileReader {
def execute(session: Session, stream: InputStream): String = {
val contents = new String(stream.readAllBytes(), StandardCharsets.UTF_8)
return contents
}
}
$$;
以下 CALL 示例中的代码创建一个指向该文件的编码作用域文件 URL。编码 URL 允许临时访问暂存文件,而无需对暂存区本身授予权限。
CALL file_reader_scala_proc_input(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));