为使用 SQL 创建的存储过程编写 Scala 处理程序

您可以使用处理程序代码读取文件内容。该文件必须位于可供处理程序使用的 Snowflake 暂存区中。例如,您可能希望读取文件以在处理程序中处理非结构化数据。

若要读取暂存文件的内容,处理程序可以调用 SnowflakeFile 类或 InputStream 类中的方法。如果需要在计算过程中动态访问文件,则可以执行此操作。有关更多信息,请参阅 使用 SnowflakeFile 读取动态指定的文件使用 InputStream 读取动态指定的文件 (本主题内容)。

SnowflakeFile 提供不适用于 InputStream 的功能,如下表所述。

输入

备注

SnowflakeFile

URL 格式:

  • 带有作用域的 URL 可在函数的调用方不是其所有者时降低文件注入攻击的风险。

  • 文件 URL 或 UDF 所有者有权访问的文件的字符串路径。

文件必须位于指定的内部暂存区或外部暂存区。

轻松访问其他文件属性,例如文件大小。

InputStream

URL 格式:

  • 带有作用域的 URL 可在函数的调用方不是其所有者时降低文件注入攻击的风险。

文件必须位于指定的内部暂存区或外部暂存区。

备注

对于所有者权限存储过程,该过程的所有者必须有权访问任何非作用域 URLs 的文件。对于调用方权限过程,调用方必须有权访问任何非作用域 URLs 的文件。无论哪种情况,都可以通过让处理程序代码使用新 requireScopedUrl 参数的 boolean 值调用 SnowflakeFile.newInstance 方法来读取暂存文件。

以下示例在指定不需要作用域 SnowflakeFile.newInstance 时使用 URL。

var filename = "@my_stage/filename.txt"
var sfFile = SnowflakeFile.newInstance(filename, false)
Copy

使用 SnowflakeFile 读取动态指定的文件

以下示例中的代码具有一个处理程序函数 execute,该函数接受 String 并返回 String 以及文件的内容。在运行时,Snowflake 从过程的 input 变量中的传入文件路径初始化处理程序的 fileName 变量。处理程序代码使用 SnowflakeFile 实例来读取文件。

CREATE OR REPLACE PROCEDURE file_reader_scala_proc_snowflakefile(input VARCHAR)
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER = 'FileReader.execute'
PACKAGES=('com.snowflake:snowpark:latest')
AS $$
import java.io.InputStream
import java.nio.charset.StandardCharsets
import com.snowflake.snowpark_java.types.SnowflakeFile
import com.snowflake.snowpark_java.Session

object FileReader {
  def execute(session: Session, fileName: String): String = {
    var input: InputStream = SnowflakeFile.newInstance(fileName).getInputStream()
    return new String(input.readAllBytes(), StandardCharsets.UTF_8)
  }
}
$$;
Copy

以下 CALL 示例中的代码创建一个指向该文件的作用域文件 URL。此为编码 URL,允许临时访问暂存文件,而无需授予对暂存区本身的权限。

CALL file_reader_scala_proc_snowflakefile(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
Copy

使用 InputStream 读取动态指定的文件

以下示例中的代码定义一个处理程序函数 execute,该函数接受 InputStream 并返回 String 以及文件的内容。在运行时,Snowflake 从过程的 input 变量中的传入文件路径初始化处理程序的 stream 变量。处理程序代码使用 InputStream 来读取文件。

CREATE OR REPLACE PROCEDURE file_reader_scala_proc_input(input VARCHAR)
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER = 'FileReader.execute'
PACKAGES=('com.snowflake:snowpark:latest')
AS $$
import java.io.InputStream
import java.nio.charset.StandardCharsets
import com.snowflake.snowpark_java.Session

object FileReader {
  def execute(session: Session, stream: InputStream): String = {
    val contents = new String(stream.readAllBytes(), StandardCharsets.UTF_8)
    return contents
  }
}
$$;
Copy

以下 CALL 示例中的代码创建一个指向该文件的编码作用域文件 URL。编码 URL 允许临时访问暂存文件,而无需对暂存区本身授予权限。

CALL file_reader_scala_proc_input(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
Copy
语言: 中文