在 Scalar Scala UDFs 中控制全局状态

在设计需要访问共享状态的 UDF 和处理程序时,您需要考虑 Snowflake 执行 UDFs 处理行的方式。

大多数处理程序应遵循以下准则:

  • 如果需要初始化不跨行更改的共享状态,请在处理程序函数外部(例如在构造函数中)对其进行初始化。

  • 编写线程安全的处理程序方法。

  • 避免跨行存储和共享动态状态。

如果您的 UDF 无法遵循这些准则,或者如果您想更深入地了解这些准则的原因,请阅读接下来的几个小节。

跨调用共享状态

Snowflake 希望独立处理标量 UDFs。依赖调用间共享的状态可能会导致意外行为。这是因为系统可以按任何顺序处理行,并将这些调用分散到多个 JVMs 中(适用于用 Java 或 Scala 编写的处理程序)。

UDFs 应避免在对处理程序方法的调用之间依赖共享状态。但是,在以下两种情况下,可能希望 UDF 存储共享状态:

  • 包含不希望对每一行重复的昂贵初始化逻辑的代码。

  • 跨行(如缓存)利用共享状态的代码。

如果需要在多行中共享状态,而且状态不会随时间改变,那么可以使用构造函数通过设置实例级变量来创建共享状态。每个实例只执行一次构造函数,而处理程序则每行调用一次,因此当处理程序处理多行时,在构造函数中进行初始化的成本更低。此外,由于构造函数只被调用一次,因此无需编写线程安全的构造函数。

如果 UDF 存储的共享状态发生变化,那么您的代码就必须准备好处理对该状态的并发访问。

有关并行化和共享状态的更多信息,请参阅 了解并行化存储 JVM 状态信息 (本主题内容)。

了解并行化

为了提高性能,Snowflake 可在 JVMs 之间和内部实现并行化。

跨 JVMs 并行化

Snowflake 可在 仓库 中的工作线程之间实现并行化。每个工作线程运行一个(或多个) JVMs。这意味着没有全局共享状态。最多只能在单个 JVM 内共享状态。

JVMs 内部并行化

  • 每个 JVM 可以执行多个线程,这些线程可以并行调用同一实例的处理程序方法。这意味着每个处理程序方法都需要是线程安全的。

  • 如果 UDF 为 IMMUTABLE,并且 SQL 语句对同一行使用相同实参调用 UDF 多次,那么 UDF 对该行的每次调用都会返回相同的值。

    例如,如果 UDF 为 IMMUTABLE,以下调用会为每行返回两次相同的值。

    SELECT my_scala_udf(42), my_scala_udf(42) FROM table1;
    
    Copy

    如果您希望多次调用即使传递相同的实参也能返回独立的值,并且不想声明函数 VOLATILE,那么可以将多个独立的 UDFs 绑定到同一个处理程序方法上。

    您可以根据以下步骤执行此操作。

    1. 创建名为 @udf_libs/rand.jar 且包含以下代码的 JAR 文件:

      class MyClass {
      
          var x: Double = 0.0
      
          // Constructor
          def this() = {
              x = Math.random()
          }
      
          // Handler
          def myHandler(): Double = x
      }
      
      Copy
    2. 创建 Scala UDFs,如下所示。

      这些 UDFs 具有不同的名称,但使用相同的 JAR 文件和该 JAR 文件中相同的处理程序。

      CREATE FUNCTION my_scala_udf_1()
        RETURNS DOUBLE
        LANGUAGE SCALA
        IMPORTS = ('@udf_libs/rand.jar')
        HANDLER = 'MyClass.myHandler';
      
      CREATE FUNCTION my_scala_udf_2()
        RETURNS DOUBLE
        LANGUAGE SCALA
        IMPORTS = ('@udf_libs/rand.jar')
        HANDLER = 'MyClass.myHandler';
      
      Copy
    3. 使用以下代码调用这两个 UDFs。

      UDFs 指向同一 JAR 文件和处理程序。这些调用会创建同一类的两个实例。每个实例返回一个独立的值,因此下面的示例返回两个独立值,而不是两次返回相同的值:

      SELECT my_scala_udf_1(), my_scala_udf_2() FROM table1;
      
      Copy

存储 JVM 状态信息

避免依赖动态共享状态的一个原因是,行不一定按可预测的顺序进行处理。每次执行 SQL 语句时,Snowflake 都可以更改批处理数、批处理顺序以及批处理中的行顺序。如果将标量 UDF 设计为一行影响后续行的返回值,则每次执行 UDF 时, UDF 都会返回不同的结果。

语言: 中文