使用 Scala 编写存储过程

可以使用 Scala 编写存储过程。您可以在存储过程中使用 Snowpark 库对 Snowflake 中的表执行查询、更新和其他操作。

本主题说明如何编写存储过程的逻辑。获得逻辑后,可以使用 SQL 创建并调用过程。有关更多信息,请参阅 创建存储过程调用存储过程

本主题内容:

简介

可以使用 Snowflake 仓库作为计算框架,在 Snowflake 中构建和运行数据管道。对于数据管道的代码,您可以使用 Snowpark API for Scala 编写存储过程。若要计划这些存储过程的执行,请使用 任务

您可以在处理程序代码执行时获取日志和跟踪数据。有关更多信息,请参阅 日志记录和跟踪概述

备注

要创建和调用匿名过程,请使用 CALL(使用匿名过程)。创建和调用匿名过程不需要具有 CREATE PROCEDURE 架构权限的角色。

先决条件

您必须使用版本 1.1.0 或更新版本的 Snowpark 库。

如果要编写处理程序代码将被复制到暂存区的存储过程,则必须编译类才能在 Java 版本 11.x 中运行。

为 Snowpark 设置开发环境

设置开发环境以使用 Snowpark 库。请参阅 为 Snowpark Scala 设置开发环境

构建处理程序代码

您可以使处理程序源代码与创建过程的 SQL 保持一致,或者将处理程序编译的结果保留在单独的位置,并从 SQL 中引用它。有关更多信息,请参阅 将处理程序代码保持内联或保留在暂存区

有关构建用于过程的处理程序源代码的更多信息,请参阅 打包处理程序代码

创建和调用过程

编写过程的处理程序后,可以使用 SQL 创建并调用它。

限制

Snowpark Stored Procedures 具有以下限制:

  • 不支持并发。例如,在代码中,不能从多个线程提交查询。同时发出多个查询的代码将产生错误。

  • 如果从任务中执行存储过程,则必须在创建任务时指定仓库。(您不能使用无服务器计算资源来运行任务。)

  • 在存储过程中使用某些 Snowpark APIs 时,请注意以下限制。

    • 使用 :ref:` 执行 PUT 和 GET 命令的 APIs <label-snowpark_dataframe_stages_file_operation>` (包括 Session.sql("PUT ...")Session.sql("GET ..."))时,只能写入为调用该过程的查询提供的内存支持文件系统中的 /tmp 目录。

    • 请勿使用 :ref:` 创建新会话的 APIs<label-snowpark_creating_session>` (例如, Session.builder().configs(...).create())。

    • 不支持使用 session.jdbcConnection (以及从中返回的连接),因为它可能会导致不安全的行为。

  • 所有者权限存储过程不支持创建指定的临时对象。所有者权限存储过程是以存储过程所有者的权限运行的存储过程。有关更多信息,请参阅 调用方的权限或所有者的权限

为存储过程编写处理程序代码

对于过程的逻辑,您可以编写在调用过程时执行的处理程序代码。本节介绍处理程序的设计。

您可以将此代码包含在创建过程的 SQL 语句中,或者将代码复制到暂存区并在创建过程时引用该代码。有关更多信息,请参阅 将处理程序代码保持内联或保留在暂存区

计划编写存储过程

  • 限制消耗的内存量。

    Snowflake 根据所需的内存量对方法进行了限制。有关如何避免消耗过多内存的更多信息,请参阅 设计保持在 Snowflake 施加的约束范围内的处理程序

  • 编写线程安全代码。

    确保您的处理程序方法或函数是线程安全的。

  • 了解安全限制。

    处理程序代码在受限引擎中运行,因此请务必遵循 UDFs 和过程的安全实践 中所述的规则。

  • 决定使用所有者权限还是调用方权限。

    在计划编写存储过程时,请考虑希望存储过程使用 调用方权限还是所有者权限 运行。

  • 请记住存储过程的超时行为。

    存储过程执行将超时,除非代码活动重置计时器。具体而言,超时计时器通过代码与数据之间的交互(包括文件操作、查询和迭代结果集)重置。

编写类或对象

您定义的方法或函数应该是类或对象的一部分。

编写类或对象时,请注意以下几点:

  • 类(或对象)和方法不能处于受保护或私有状态。

  • 如果该方法不是静态的,并且您想要定义构造函数,请为类定义一个零实参构造函数。Snowflake 在初始化时调用此零实参构造函数来为类创建实例。

  • 可以为同一个类或对象中的不同存储过程定义不同的方法。

编写方法或函数

为存储过程编写方法或函数时,请注意以下事项:

  • 指定 Snowpark Session 对象作为方法或函数的第一个实参。

    调用存储过程时,Snowflake 会自动创建一个 Session 对象并将其传递给存储过程。(您不能自行创建 Session 对象。)

  • 对于其余实参和返回值,请使用与 Snowflake 数据类型

  • 方法或函数必须返回一个值。对于用 Scala 编写的存储过程,需要返回值。

  • 存储过程执行将超时,除非代码活动重置计时器。具体而言,超时计时器通过代码与数据之间的交互(包括文件操作、查询和迭代结果集)重置。

  • 当您运行来自过程处理程序中的 异步子作业 时,不支持“发后即忘”。

    换句话说,如果处理程序发出的子查询在父存储过程作业完成时仍在运行,子作业就会自动取消。

为代码提供依赖项

如果处理程序代码依赖在处理程序之外(如 JAR 文件中的类)定义的代码或依赖资源文件,则可以通过将这些依赖项上传到暂存区来为代码提供这些依赖项。创建过程 时,可以使用 IMPORTS 子句引用这些依赖项。

有关更多信息,请参阅 为代码提供依赖项

在 Snowflake 过程中访问数据

要访问 Snowflake 中的数据,请使用 Snowpark 库 APIs。

处理对 Scala 存储过程的调用时,Snowflake 会创建一个 Snowpark Session 对象,并将该对象传递给存储过程的方法或函数。

与使用其他语言的存储过程一样,会话的上下文(例如权限、当前数据库和架构等)取决于存储过程是以调用方权限还是所有者权限运行。有关详细信息,请参阅 访问和设置会话状态

您可以使用此 Session 对象调用 ` Snowpark 库 <https://docs.snowflake.cn/en/developer-guide/snowpark/reference/scala/com/snowflake/snowpark/index.html>`_ 中的 APIs。例如,您可以 :doc:` 为表创建 DataFrame </developer-guide/snowpark/scala/working-with-dataframes>` ,或执行 SQL 语句。

有关更多信息,请参阅 Snowpark Scala 开发者指南

备注

有关限制(包括访问数据的限制)的信息,请参阅 限制

数据访问示例

以下是 Scala 方法的示例,该方法将指定数量的行从一个表复制到另一个表。该方法采用以下实参:

  • Snowpark Session 对象

  • 要从中复制行的表的名称

  • 要将行保存到的表的名称

  • 要复制的行数

此示例中的方法返回一个字符串。

object MyObject
{
  def myProcedure(session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String =
  {
    session.table(fromTable).limit(count).write.saveAsTable(toTable)
    return "Success"
  }
}
Copy

以下示例定义了一个函数而不是一个方法:

object MyObject
{
  val myProcedure = (session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String =>
  {
    session.table(fromTable).limit(count).write.saveAsTable(toTable)
    "Success"
  }
}
Copy

使用 Scala 过程读取文件

您可以使用处理程序代码读取文件内容。该文件必须位于可供处理程序使用的 Snowflake 暂存区中。例如,您可能希望读取文件以在处理程序中处理非结构化数据。

若要读取暂存文件的内容,处理程序可以调用 SnowflakeFile 类或 InputStream 类中的方法。如果需要在计算过程中动态访问文件,则可以执行此操作。有关更多信息,请参阅 使用 SnowflakeFile 读取动态指定的文件使用 InputStream 读取动态指定的文件 (本主题内容)。

SnowflakeFile 提供不适用于 InputStream 的功能,如下表所述。

输入

备注

SnowflakeFile

URL 格式:

  • 带有作用域的 URL 可在函数的调用方不是其所有者时降低文件注入攻击的风险。

  • 文件 URL 或 UDF 所有者有权访问的文件的字符串路径。

文件必须位于指定的内部暂存区或外部暂存区。

轻松访问其他文件属性,例如文件大小。

InputStream

URL 格式:

  • 带有作用域的 URL 可在函数的调用方不是其所有者时降低文件注入攻击的风险。

文件必须位于指定的内部暂存区或外部暂存区。

备注

对于所有者权限存储过程,该过程的所有者必须有权访问任何非作用域 URLs 的文件。对于调用方权限过程,调用方必须有权访问任何非作用域 URLs 的文件。无论哪种情况,都可以通过让处理程序代码使用新 requireScopedUrl 参数的 boolean 值调用 SnowflakeFile.newInstance 方法来读取暂存文件。

以下示例在指定不需要作用域 URL 时使用 SnowflakeFile.newInstance

var filename = "@my_stage/filename.txt"
var sfFile = SnowflakeFile.newInstance(filename, false)
Copy

使用 SnowflakeFile 读取动态指定的文件

以下示例中的代码具有一个处理程序函数 execute,该函数接受 String 并返回 String 以及文件的内容。在运行时,Snowflake 从过程的 input 变量中的传入文件路径初始化处理程序的 fileName 变量。处理程序代码使用 SnowflakeFile 实例来读取文件。

CREATE OR REPLACE PROCEDURE file_reader_scala_proc_snowflakefile(input VARCHAR)
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER = 'FileReader.execute'
PACKAGES=('com.snowflake:snowpark:latest')
AS $$
import java.io.InputStream
import java.nio.charset.StandardCharsets
import com.snowflake.snowpark_java.types.SnowflakeFile
import com.snowflake.snowpark_java.Session

object FileReader {
  def execute(session: Session, fileName: String): String = {
    var input: InputStream = SnowflakeFile.newInstance(fileName).getInputStream()
    return new String(input.readAllBytes(), StandardCharsets.UTF_8)
  }
}
$$;
Copy

以下 CALL 示例中的代码创建一个指向该文件的作用域文件 URL。此为编码 URL,允许临时访问暂存文件,而无需授予对暂存区本身的权限。

CALL file_reader_scala_proc_snowflakefile(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
Copy

使用 InputStream 读取动态指定的文件

以下示例中的代码具有一个处理程序函数 execute,该函数接受 InputStream 并返回 String 以及文件的内容。在运行时,Snowflake 从过程的 input 变量中的传入文件路径初始化处理程序的 stream 变量。处理程序代码使用 InputStream 来读取文件。

CREATE OR REPLACE PROCEDURE file_reader_scala_proc_input(input VARCHAR)
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER = 'FileReader.execute'
PACKAGES=('com.snowflake:snowpark:latest')
AS $$
import java.io.InputStream
import java.nio.charset.StandardCharsets
import com.snowflake.snowpark_java.Session

object FileReader {
  def execute(session: Session, stream: InputStream): String = {
    val contents = new String(stream.readAllBytes(), StandardCharsets.UTF_8)
    return contents
  }
}
$$;
Copy

以下 CALL 示例中的代码创建一个指向该文件的作用域文件 URL。此为编码 URL,允许临时访问暂存文件,而无需授予对暂存区本身的权限。

CALL file_reader_scala_proc_input(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
Copy

返回表格数据

您可以编写一个过程,以表格形式返回数据。若要编写返回表格数据的过程,请执行以下操作:

  • CREATE PROCEDURE 语句中指定 TABLE(...) 作为过程的返回类型。

    您可以指定返回数据的列名和 :doc:` 类型 </sql-reference-data-types>` (如果您知道)作为 TABLE 参数。如果在定义过程时不知道返回的列(例如在运行时指定列时),则可以省略参数 TABLE。如果执行此操作,过程的返回值列将通过其处理程序返回的 DataFrame 中的列进行转换。列数据类型将根据 SQL-Scala 数据类型映射 中指定的映射转换为 SQL。

  • 编写处理程序,使其在 Snowpark DataFrame 中返回表格结果。

    有关 DataFrame 的更多信息,请参阅 在 Snowpark Scala 中使用 DataFrames

备注

如果满足以下任一条件,则过程将在运行时生成错误:

  • 它声明 TABLE 为其返回类型,但其处理程序不返回 DataFrame。

  • 其处理程序返回 DataFrame,但过程未声明 TABLE 为其返回类型。

示例

本节中的示例演示如何通过筛选列与字符串匹配的行的过程返回表格值。

定义数据

以下示例中的代码创建一个员工表。

CREATE OR REPLACE TABLE employees(id NUMBER, name VARCHAR, role VARCHAR);
INSERT INTO employees (id, name, role) VALUES (1, 'Alice', 'op'), (2, 'Bob', 'dev'), (3, 'Cindy', 'dev');
Copy

声明筛选行的过程

以下两个示例中的代码创建一个存储过程,该存储过程将表名称和角色作为实参,并返回表中角色列值与指定为实参的角色匹配的行。

指定返回列名称和类型

此示例在 RETURNS TABLE() 语句中指定列名称和类型。

CREATE OR REPLACE PROCEDURE filter_by_role(table_name VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE SCALA
RUNTIME_VERSION = '2.12'
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'Filter.filterByRole'
AS
$$
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark._

object Filter {
    def filterByRole(session: Session, tableName: String, role: String): DataFrame = {
        val table = session.table(tableName)
        val filteredRows = table.filter(col("role") === role)
        return filteredRows
    }
}
$$;
Copy

备注

目前,在 RETURNS TABLE(...) 子句中,您无法指定 GEOGRAPHY 为列类型。这在创建存储过程和匿名过程中均适用。

CREATE OR REPLACE PROCEDURE test_return_geography_table_1()
  RETURNS TABLE(g GEOGRAPHY)
  ...
Copy
WITH test_return_geography_table_1() AS PROCEDURE
  RETURNS TABLE(g GEOGRAPHY)
  ...
CALL test_return_geography_table_1();
Copy

如果尝试指定 GEOGRAPHY 为列类型,则调用存储过程会导致该错误:

Stored procedure execution error: data type of returned table does not match expected returned table type
Copy

要解决这个问题,可以省略 RETURNS TABLE() 中的列实参和类型。

CREATE OR REPLACE PROCEDURE test_return_geography_table_1()
  RETURNS TABLE()
  ...
Copy
WITH test_return_geography_table_1() AS PROCEDURE
  RETURNS TABLE()
  ...
CALL test_return_geography_table_1();
Copy
省略返回列名称和类型

以下示例中的代码声明一个过程,该过程允许通过处理程序返回值中的列推断返回值列名称和类型。它会忽略 RETURNS TABLE() 语句中的列名称和类型。

CREATE OR REPLACE PROCEDURE filter_by_role(table_name VARCHAR, role VARCHAR)
RETURNS TABLE()
LANGUAGE SCALA
RUNTIME_VERSION = '2.12'
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'Filter.filterByRole'
AS
$$
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark._

object Filter {
    def filterByRole(session: Session, tableName: String, role: String): DataFrame = {
        val table = session.table(tableName)
        val filteredRows = table.filter(col("role") === role)
        return filteredRows
    }
}
$$;
Copy

调用过程

下面的示例调用存储过程:

CALL filter_by_role('employees', 'dev');
Copy

过程调用生成以下输出:

+----+-------+------+
| ID | NAME  | ROLE |
+----+-------+------+
| 2  | Bob   | dev  |
| 3  | Cindy | dev  |
+----+-------+------+

准备具有暂存处理程序的存储过程

如果计划创建以下存储过程:其处理程序将被编译并复制到暂存区(而不是作为源代码内联保留),则必须在 JAR 文件中编译和打包类,并且必须将 JAR 文件上传到暂存区。

  1. 编译和打包处理程序代码

    若要更轻松地设置存储过程,请构建一个包含存储过程所需的所有依赖项的 JAR 文件。稍后,您需要将 JAR 文件上传到暂存区,并指向 CREATE PROCEDURE 语句中的文件 JAR。如果要上传和指向的 JAR 文件较少,则此过程会更简单。

    • 使用 sbt 构建具有依赖项的 JAR 文件。

      如果您使用 SBT 构建和打包代码,则可以使用 ` sbt-assembly 插件 <https://github.com/sbt/sbt-assembly/blob/develop/README.md (https://github.com/sbt/sbt-assembly/blob/develop/README.md)>`_ 来创建包含所有依赖项的 JAR 文件。有关更多信息,请参阅 使用 sbt 打包 Scala 处理程序代码

    • 使用 Maven 构建具有依赖项的 JAR 文件。

      如果您使用 Maven 构建和打包代码,则可以使用 ` Maven Assembly 插件 <https://maven.apache.org/plugins/maven-assembly-plugin/index.html (https://maven.apache.org/plugins/maven-assembly-plugin/index.html)>`_ 创建包含所有依赖项的 JAR 文件。有关更多信息,请参阅 使用 Maven 打包 Java 或 Scala 处理程序代码

    • 使用其他工具构建具有依赖项的 JAR 文件。

      如果您未使用 SBT 或 Maven,请参阅构建工具的文档,以了解如何构建一个包含所有依赖项的 JAR 文件。

      例如,如果您使用的是 IntelliJ IDEA 项目(不是 IntelliJ 中的 SBT 项目),请参阅 ` 关于设置构件配置的说明 <https://www.jetbrains.com/help/idea/compiling-applications.html#configure_artifact (https://www.jetbrains.com/help/idea/compiling-applications.html#configure_artifact)>`_。

  2. 将文件上传到暂存区

    若要使过程的逻辑(和其他依赖项,如果有)可用于过程,您需要将所需的文件上传到暂存区。有关更多信息,请参阅 为代码提供依赖项

示例

将 Snowpark APIs 用于异步处理

以下示例说明了如何使用 Snowpark APIs 启动异步子作业,以及这些作业在不同条件下的行为方式。

在以下示例中,asyncWait 过程会执行等待 10 秒的异步子作业。

CREATE OR REPLACE PROCEDURE asyncWait()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
  def asyncBasic(session: com.snowflake.snowpark.Session): String = {
    val df = session.sql("select system$wait(10)")
    val asyncJob = df.async.collect()
    while(!asyncJob.isDone()) {
      Thread.sleep(1000)
    }
    "Done"
  }
}
$$;

call asyncScalaTest();
Copy

在以下示例中,cancelJob 过程使用 SQL 开始需要 10 秒才能完成的作业。然后,它会在子作业完成之前将其取消。

CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
  def asyncBasic(session: com.snowflake.snowpark.Session): String = {
    val df = session.sql("select system$wait(10)")
    val asyncJob = df.async.collect()
    asyncJob.cancel()
    "Done"
  }
}
$$;
Copy

在以下示例中,checkStatus 过程会执行等待 10 秒的异步子作业。然后,该过程会在作业完成之前检查作业的状态,因此检查将返回 False

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import java.sql.ResultSet
import net.snowflake.client.jdbc.{SnowflakeConnectionV1, SnowflakeResultSet, SnowflakeStatement}
object TestScalaSP {
  def asyncBasic(session: com.snowflake.snowpark.Session): String = {
    val connection = session.jdbcConnection
    val stmt = connection.createStatement()
    val rs = stmt.asInstanceOf[SnowflakeStatement].executeAsyncQuery("CALL SYSTEM$WAIT(10)")
    val status = rs.asInstanceOf[SnowflakeResultSet].getStatus.toString
    s"""status:    ${status}"""
  }
}
$$;
Copy
语言: 中文