用 Python 编写 UDTF

本主题内容:

您可以用 Python 实现用户定义的 表函数 (UDTF) 处理程序。此处理程序代码在调用 UDTF 时执行。本主题介绍如何使用 Python 实现处理程序并创建 UDTF。

UDTF 是返回表格结果的用户定义函数 (UDF)。有关使用 Python 实现的 UDF 处理程序的更多信息,请参阅 创建 Python UDFs。有关 UDFs 的更多一般信息,请参阅 用户定义函数概述

在 UDTF 的处理程序中,您可以处理输入行(请参阅本主题中的 处理行)。您还可以具有针对每个输入分区执行的逻辑(请参阅本主题中的 处理分区)。

创建 Python UDTF 时,请执行以下操作:

  1. 使用在调用 UDTF 时 Snowflake 将调用的方法实现类。

    有关更多详细信息,请参阅 实施处理程序 (本主题内容)。

  2. 在 SQL 中使用 CREATE FUNCTION 命令创建 UDTF,将类指定为处理程序。创建 UDTF 时,请指定:

    • UDTF 输入参数的数据类型。

    • UDTF 返回的列的数据类型。

    • 调用 UDTF 时作为处理程序执行的代码。

    • 用于实现处理程序的语言。

    有关语法的更多信息,请参阅 使用 CREATE FUNCTION 创建 UDTF (本主题内容)。

您可以调用 UDF 或 UDTF,如 调用 UDF 中所述。

备注

表函数 (UDTFs) 限制输入实参为 500 个,输出列为 500 个。

Snowflake 目前支持使用以下版本的 Python 编写 UDTFs :

  • 3.8

  • 3.9

  • 3.10

  • 3.11

在 CREATE FUNCTION 语句中,将 runtime_version 设置为所需的版本。

实施处理程序

您实现处理程序类,将 UDTF 实参值处理为表格结果并处理分区输入。有关处理程序类示例,请参阅 处理程序类示例 (本主题内容)。

使用 CREATE FUNCTION 创建 UDTF 时,您将此类指定为 UDTF 的处理程序。有关用于创建函数的 SQL 的更多信息,请参阅 使用 CREATE FUNCTION 创建 UDTF (本主题内容)。

处理程序类实现在调用 UDTF 时 Snowflake 将调用的方法。此类包含 UDTF 的逻辑。

方法

要求

描述

__init__ 方法

可选

初始化输入分区的有状态处理的状态。有关更多信息,请参阅 初始化处理程序 (本主题内容)。

process 方法

必填

处理每个输入行,以元组形式返回表格值。Snowflake 调用此方法,从 UDTF 的实参传递输入。有关更多信息,请参阅 定义 process 方法 (本主题内容)。

end_partition 方法

可选

完成输入分区的处理,以元组形式返回表格值。有关更多信息,请参阅 完成分区处理 (本主题内容)。

请注意,从处理程序类中的任何方法引发异常都会导致处理停止。调用 UDTF 的查询失败,并显示错误消息。

备注

如果代码不满足此处所述的要求,UDTF 创建或执行可能会失败。Snowflake 将在 CREATE FUNCTION 语句执行时检测违规行为。

处理程序类示例

以下示例中的代码会创建 UDTF,其处理程序类处理分区中的行。process 方法处理每个输入行,并返回包含股票销售总成本的一行。处理分区中的行后,它(从其 end_partition 方法)返回分区中包含的所有销售的总额。

create or replace function stock_sale_sum(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
handler='StockSaleSum'
as $$
class StockSaleSum:
    def __init__(self):
        self._cost_total = 0
        self._symbol = ""

    def process(self, symbol, quantity, price):
      self._symbol = symbol
      cost = quantity * price
      self._cost_total += cost
      yield (symbol, cost)

    def end_partition(self):
      yield (self._symbol, self._cost_total)
$$;
Copy

以下示例中的代码调用前面的 UDF,从 stocks_table 表中的 symbolquantityprice 列传递值。有关调用 UDTF 的更多信息,请参阅 调用 UDF

select stock_sale_sum.symbol, total
  from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
Copy

初始化处理程序

您可以选择在处理程序类中实现 __init__ 方法,Snowflake 将在处理程序开始处理行之前调用该方法。例如,可以使用此方法为处理程序建立某些分区范围的状态。__init__ 方法可能不会生成输出行。

方法的签名必须采用以下形式:

def __init__(self):
Copy

例如,您可能希望:

  • 初始化分区的状态,然后在 processend_partition 方法中使用此状态。

  • 执行长时间运行的初始化,每个分区只需执行一次,而不是每行执行一次。

备注

您还可以在处理程序类之外包含该代码,从而在分区处理开始之前(例如在类声明之前)执行逻辑。

有关处理分区的更多信息,请参阅 处理分区 (本主题内容)。

如果您使用 __init__ 方法,请记住 __init__ 具有以下特征:

  • 只能将 self 视作实参。

  • 无法生成输出行。请为其使用 process 方法实现。

  • 针对每个分区调用一次,并且在调用 process 方法之前。

处理行

实现 Snowflake 将为每个输入行调用的 process 方法。

定义 process 方法

定义 process 方法,该方法接收从 SQL 类型转换的 UDTF 实参作为值,返回 Snowflake 将用于创建 UDTF 的表格返回值的数据。

方法的签名必须采用以下形式:

def process(self, *args):
Copy

process 方法必须满足以下条件:

  • self 参数。

  • 声明与 UDTF 参数对应的方法参数。

    方法参数名称不必与 UDTF 参数名称匹配,但方法参数的声明顺序必须与 UDTF 参数的声明 顺序相同

    将 UDTF 实参值传递到方法时,Snowflake 会将值从 SQL 类型转换为您在方法中使用的 Python 类型。有关 Snowflake 如何在 SQL 和 Python 数据类型之间映射的信息,请参阅 SQL-Python 数据类型映射

  • 生成一个或多个元组(或返回包含元组的可迭代对象),其中元组的序列对应 UDTF 返回值列的序列。

    元组元素的显示顺序必须与 UDTF 返回值列的声明 顺序相同。有关更多信息,请参阅 返回值 (本主题内容)。

    Snowflake 会将值从 Python 类型转换为 UDTF 声明所需的 SQL 类型。有关 Snowflake 如何在 SQL 和 Python 数据类型之间映射的信息,请参阅 SQL-Python 数据类型映射

如果处理程序类中的方法引发异常,则处理将停止。调用 UDTF 的查询将失败,并显示错误消息。如果 process 方法返回 None,则处理将停止。(即使 process 方法返回 None,仍会调用 end_partition 方法。)

process 方法示例

以下示例中的代码显示 StockSale 处理程序类,该类具有 process 方法,用于处理三个 UDTF 实参(symbolquantityprice),返回单个行,其中包含两列(symboltotal)。请注意, process 方法参数的声明顺序与 stock_sale 参数的声明顺序相同。process 方法的 yield 语句中的实参与 stock_sale RETURNS TABLE 子句中声明的列采用相同的顺序。

create or replace function stock_sale(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
handler='StockSale'
as $$
class StockSale:
    def process(self, symbol, quantity, price):
      cost = quantity * price
      yield (symbol, cost)
$$;
Copy

以下示例中的代码调用前面的 UDF,从 stocks_table 表中的 symbolquantityprice 列传递值。

select stock_sale.symbol, total
  from stocks_table, table(stock_sale(symbol, quantity, price) over (partition by symbol));
Copy

返回值

返回输出行时,您可以使用 yieldreturn (但不能同时使用两者)返回具有表格值的元组。如果方法返回或生成 None,则针对当前行的处理将停止。

  • 使用 yield 时,针对每个输出行执行单独的 yield 语句。这是最佳实践,因为附带 yield 的延迟评估可以实现更高效的处理,并有助于避免超时。

    元组中的每个元素成为 UDTF 返回的结果中的列值。yield 实参的顺序必须匹配针对 CREATE FUNCTION 的 RETURNS TABLE 子句中返回值声明的列的顺序。

    以下示例中的代码返回表示两行的值。

    def process(self, symbol, quantity, price):
      cost = quantity * price
      yield (symbol, cost)
      yield (symbol, cost)
    
    Copy

    请注意,由于 yield 实参是元组,因此在元组中传递单个值时,您必须包含尾随逗号,如以下示例所示。

    yield (cost,)
    
    Copy
  • 使用 return 时,返回带元组的可迭代对象。

    元组中的每个值成为 UDTF 返回的结果中的列值。元组中列值的顺序必须匹配针对 CREATE FUNCTION 的 RETURNS TABLE 子句中返回值声明的列的顺序。

    以下示例中的代码返回两行,每行包含两列:symbol 和 total。

    def process(self, symbol, quantity, price):
      cost = quantity * price
      return [(symbol, cost), (symbol, cost)]
    
    Copy

跳过行

要跳过输入行并处理下一行(例如,在验证输入行时),请让 process 方法返回以下其中一项:

  • 使用 return 时,返回 None、包含 None 的列表或空列表,以跳过该行。

  • 使用 yield 时,返回 None,以跳过一行。

    请注意,如果多次调用 yield,则 Snowflake 会忽略返回 None 的调用后的任何调用。

以下示例中的代码仅返回其 number 为正整数的行。如果 number 不是正数,则该方法返回 None,以跳过当前行并继续处理下一行。

def process(self, number):
  if number < 1:
    yield None
  else:
    yield (number)
Copy

有状态处理和无状态处理

您可以实现处理程序以分区感知方式处理行,或者简单地逐行处理。

  • 分区感知处理 中,处理程序包括用于管理分区范围状态的代码。这包括在分区处理开始时执行的 __init__ 方法,以及 Snowflake 在处理分区的最后一行后调用的 end_partition 方法。有关更多信息,请参阅 处理分区 (本主题内容)。

  • 分区无感知处理 中,处理程序无状态执行,忽略分区边界。

    要以这种方式执行处理程序,请不要包含 __init__end_partition 方法。

处理分区

您可以使用按分区执行的代码(例如,管理状态)以及为分区中的每一行执行的代码来处理输入中的分区。

备注

有关在调用 UDTF 时指定分区的更多信息,请参阅 表函数和分区

当查询包含分区时,它会使用指定的值(如列的值)汇总行。据说,处理程序接收的汇总行按该值分区。代码可以处理这些分区及其行,以便针对每个分区的处理都包含分区范围状态。

以下 SQL 示例中的代码查询股票销售信息。它执行 stock_sale_sum UDTF,其输入按 symbol 列的值进行分区。

select stock_sale_sum.symbol, total
  from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
Copy

请记住,即使对传入的行进行分区,代码也可能忽略分区分离,仅处理这些行。例如,您可以省略旨在处理分区范围状态的代码(例如处理程序类 __init__ 方法和 end_partition 方法),仅实现 process 方法。有关更多信息,请参阅 有状态处理和无状态处理 (本主题内容)。

要将每个分区作为一个单元进行处理,您需要:

  • 实现处理程序类 __init__ 方法,用于初始化分区的处理。

    有关更多信息,请参阅 初始化处理程序 (本主题内容)。

  • 使用 process 方法处理每一行时,包含分区感知代码。

    有关处理行的更多信息,请参阅 处理行 (本主题内容)。

  • 实现 end_partition 方法以完成分区处理。

    有关更多信息,请参阅 完成分区处理 (本主题内容)。

下面介绍在包含旨在按分区执行的代码时处理程序调用的顺序。

  1. 在分区处理开始之前,Snowflake 使用处理程序类的 __init__ 方法创建该类的实例。

    在此处,您可以建立分区范围状态。例如,您可以初始化实例变量,以保存从分区中的行计算得出的值。

  2. 对于分区中的每一行,Snowflake 都会调用 process 方法。

    每次执行该方法时,它都可以对状态值进行更改。例如,您可以让 process 方法更新实例变量的值。

  3. 代码处理分区中的最后一行后,Snowflake 会调用 end_partition 方法。

    通过此方法,您可以返回包含要返回的分区级值的输出行。例如,您可以返回在处理分区中的行时一直更新的实例变量的值。

    end_partition 方法不会从 Snowflake 接收任何实参。Snowflake 只是在您处理分区中的最后一行后调用该方法。

完成分区处理

您可以选择在处理程序类中实现 end_partition 方法。Snowflake 在您处理分区中的所有行后调用该方法。通过此方法,您可以在处理完分区的所有行后执行分区的代码。end_partition 方法可能会生成输出行,例如返回分区范围的计算结果。有关更多信息,请参阅 处理分区 (本主题内容)。

方法的签名必须采用以下形式:

def end_partition(self):
Copy

Snowflake 需要以下 end_partition 方法实现:

  • 它不能是静态的。

  • self 外,它可能没有任何参数。

  • 作为返回表格值的替代方法,它可能会生成空列表或 None

备注

虽然 Snowflake 支持大型分区,会调整超时以成功处理分区,但特别大的分区可能导致处理超时(例如 end_partition 需要太长时间才能完成)。如果您需要针对特定使用场景调整超时阈值,请联系 ` Snowflake 支持部门 `_。

分区处理示例

以下示例中的代码首先计算每次购买的费用,然后将购买费用相加(在 process 方法中),从而计算股票购买所支付的总费用。代码返回 end_partition 方法中的总数。

有关包含此处理程序的 UDTF 示例,以及调用 UDTF,请参阅 处理程序类示例

class StockSaleSum:
  def __init__(self):
    self._cost_total = 0
    self._symbol = ""

  def process(self, symbol, quantity, price):
    self._symbol = symbol
    cost = quantity * price
    self._cost_total += cost
    yield (symbol, cost)

  def end_partition(self):
    yield (self._symbol, self._cost_total)
Copy

处理分区时,请记住以下几点:

  • 代码可能会处理未在 UDTF 调用中明确指定的分区。即使 UDTF 调用不包含 PARTITION BY 子句,Snowflake 也会明确对数据进行分区。

  • process 方法将按照分区的 ORDER BY 子句指定的顺序(如果有)接收行数据。

示例

使用导入的包

您可以使用 Snowflake 中提供的 Anaconda 第三方包精选列表所包含的 Python 包。要将这些包指定为 UDTF 中的依赖项,请使用 CREATE FUNCTION 中的 PACKAGES 子句。

您可以在 Snowflake 中执行以下 SQL,从而发现包含的包列表:

select * from information_schema.packages where language = 'python';
Copy

有关更多信息,请参阅 使用第三方包创建 Python UDFs

以下示例中的代码使用 NumPy (Numerical Python) (https://numpy.org/doc/stable/reference/index.html) 包中的函数,以根据一系列股票购买(其中每股具有不同的价格)计算每股的平均价。

create or replace function stock_sale_average(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
packages = ('numpy')
handler='StockSaleAverage'
as $$
import numpy as np

class StockSaleAverage:
    def __init__(self):
      self._price_array = []
      self._quantity_total = 0
      self._symbol = ""

    def process(self, symbol, quantity, price):
      self._symbol = symbol
      self._price_array.append(float(price))
      cost = quantity * price
      yield (symbol, cost)

    def end_partition(self):
      np_array = np.array(self._price_array)
      avg = np.average(np_array)
      yield (self._symbol, avg)
$$;
Copy

以下示例中的代码调用前面的 UDF,从 stocks_table 表中的 symbolquantityprice 列传递值。有关调用 UDTF 的更多信息,请参阅 调用 UDF

select stock_sale_average.symbol, total
  from stocks_table,
  table(stock_sale_average(symbol, quantity, price)
    over (partition by symbol));
Copy

使用工作进程运行并发任务

您可以使用 Python 工作进程运行并发任务。当您需要运行的并行任务利用仓库节点上的多个 CPU 核心时,您可能会发现此操作很有用。

备注

Snowflake 建议您不要使用内置的 Python 多处理模块。

要解决 ` Python 全局解释器锁 <https://wiki.python.org/moin/GlobalInterpreterLock (https://wiki.python.org/moin/GlobalInterpreterLock)>`_ 阻止多任务处理方法扩展到所有 CPU 核心,您可以使用单独的工作进程而不是线程来执行并发任务。

您可以使用 joblib 库的 Parallel 类对 Snowflake 仓库执行此操作,如以下示例所示。

CREATE OR REPLACE FUNCTION joblib_multiprocessing_udtf(i INT)
  RETURNS TABLE (result INT)
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.8
  HANDLER = 'JoblibMultiprocessing'
  PACKAGES = ('joblib')
AS $$
import joblib
from math import sqrt

class JoblibMultiprocessing:
  def process(self, i):
    pass

  def end_partition(self):
    result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
    for r in result:
      yield (r, )
$$;
Copy

备注

Snowflake Standard 和 Snowpark-Optimized Warehouses 用于 joblib.Parallel 的默认后端有所不同。

  • Standard Warehouse 默认值:threading

  • Snowpark-Optimized Warehouse 默认值:loky (多处理)

您可以通过调用 joblib.parallel_backend 函数来替换默认后端设置,如下例所示。

import joblib
joblib.parallel_backend('loky')
Copy

使用 CREATE FUNCTION 创建 UDTF

您在 SQL 中使用 CREATE FUNCTION 命令创建 UDTF,将您编写的代码指定为处理程序。有关命令参考,请参阅 CREATE FUNCTION

在创建 UDTF 时使用以下语法

CREATE OR REPLACE FUNCTION <name> ( [ <arguments> ] )
  RETURNS TABLE ( <output_column_name> <output_column_type> [, <output_column_name> <output_column_type> ... ] )
  LANGUAGE PYTHON
  [ IMPORTS = ( '<imports>' ) ]
  RUNTIME_VERSION = 3.8
  [ PACKAGES = ( '<package_name>' [, '<package_name>' . . .] ) ]
  [ TARGET_PATH = '<stage_path_and_file_name_to_write>' ]
  HANDLER = '<handler_class>'
  [ AS '<python_code>' ]
Copy

要将您编写的处理程序代码与 UDTF 关联,请在执行 CREATE FUNCTION 时执行以下操作:

  • 在 RETURNS TABLE 中,以列名和类型对指定输出列。

  • 将 LANGUAGE 设置为 PYTHON。

  • 如果处理程序类位于外部位置(例如,在暂存区上),则将 IMPORTS 子句值设置为该类的路径和名称。

    有关更多信息,请参阅 创建 Python UDFs

  • 将 RUNTIME_VERSION 设置为代码所需 Python 运行时的版本。受支持的 Python 版本包括:

    • 3.8

    • 3.9

    • 3.10

    • 3.11

  • 将 PACKAGES 子句值设置为处理程序类所需的一个或多个包(如果有)的名称。

    有关更多信息,请参阅 使用第三方包创建 Python UDFs

  • 将 HANDLER 子句值设置为处理程序类的名称。

    将 Python 处理程序代码与 UDTF 关联时,您可以内联包含代码,也可以在 Snowflake 暂存区上的某个位置引用代码。HANDLER 值区分大小写,并且必须与 Python 类的名称匹配。

    有关更多信息,请参阅 UDFs 使用内联代码与,UDFs 使用从暂存区上传的代码

    重要

    对于 标量 Python UDF,HANDLER 子句值包含方法名称。

    对于 Python UDTF,HANDLER 子句值包含类名,但 包含方法名。

    造成差异的原因是,对于标量 Python UDF,处理程序方法的名称由用户选择,因此 Snowflake 事先不知道,但对于 Python UDTF,方法(例如 end_partition 方法)的名称是已知的,因为它们必须与 Snowflake 指定的名称匹配。

  • 如果处理程序代码是使用 CREATE FUNCTION 内联指定,则 AS '<python_code>' 子句是必需项。

语言: 中文