矢量化 Python UDFs

本主题介绍矢量化 Python UDFs。

本主题内容:

概述

通过矢量化 Python UDFs,您可以定义 Python 函数,该函数接收批量输入行作为 Pandas DataFrames (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html),并返回批量结果作为 Pandas 数组 (https://pandas.pydata.org/docs/reference/api/pandas.array.html) 或 Series (https://pandas.pydata.org/docs/reference/series.html)。调用矢量化 Python UDFs 的方式与调用其他 Python UDFs 的方式相同。

与默认的逐行处理模式相比,使用矢量化 Python UDFs 的好处包括:

  • 如果 Python 代码针对行批次高效运行,就有可能实现更高的性能。

  • 如果要调用基于 Pandas DataFrames 或 Pandas 数组运行的库,则需要较少的转换逻辑。

使用矢量化 Python UDFs 时:

  • 不需要更改使用 Python UDFs 编写查询的方式。所有批处理都由 UDF 框架(而非您自己的代码)处理。

  • 与非矢量化 UDFs 一样,不能保证处理程序代码的哪些实例会看到输入的哪些批次。

矢量化 Python UDFs 入门

要创建矢量化 Python UDF,请使用受支持的机制之一来注释处理程序函数。

使用 vectorized 装饰器

_snowflake 模块向在 Snowflake 中执行的 Python UDFs 公开。在 Python 代码中,导入 _snowflake 模块,并使用 vectorized 装饰器,通过将 input 参数设置为 pandas.DataFrame 来指定处理程序预期会接收 Pandas DataFrame。

create function add_one_to_inputs(x number(10, 0), y number(10, 0))
returns number(10, 0)
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_one_to_inputs'
as $$
import pandas
from _snowflake import vectorized

@vectorized(input=pandas.DataFrame)
def add_one_to_inputs(df):
  return df[0] + df[1] + 1
$$;
Copy

使用函数属性

您可以对处理程序函数设置特殊 _sf_vectorized_input 属性,而不必导入 _snowflake 模块并使用 vectorized 装饰器。

create function add_one_to_inputs(x number(10, 0), y number(10, 0))
returns number(10, 0)
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_one_to_inputs'
as $$
import pandas

def add_one_to_inputs(df):
  return df[0] + df[1] + 1

add_one_to_inputs._sf_vectorized_input = pandas.DataFrame
$$;
Copy

设置目标批次大小

对 Python 处理程序函数的调用必须在 180 秒的时间限制内执行,目前,作为输入传递给处理程序函数的每个 DataFrame 最多可以包含数千行。为了不超出时间限制,您可能希望为处理程序函数设置目标批次大小,这会限制每个输入 DataFrame 的最大行数。请注意,设置较大的值并不能保证 Snowflake 将使用指定行数对批次进行编码。您可以使用 vectorized 装饰器或函数属性来设置目标批次大小。

备注

使用 max_batch_size 只是一种限制 UDF 在每一个批次中可以处理的行数的机制。例如,如果 UDF 写入的方式一次最多只能处理 100 行,则 max_batch_size 应设置为 100。不应将设置 max_batch_size 用作指定任意大的批次大小的机制。如果 UDF 能处理任何大小的批次,建议不设置此参数。

使用 vectorized 装饰器

若要使用 vectorized 装饰器设置目标批次大小,请为名为 max_batch_size 的实参传递一个正整数值。

例如,此语句创建一个矢量化 Python UDF,并将每个 Dataframe 限制为最多 100 行:

create function add_one_to_inputs(x number(10, 0), y number(10, 0))
returns number(10, 0)
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_one_to_inputs'
as $$
import pandas
from _snowflake import vectorized

@vectorized(input=pandas.DataFrame, max_batch_size=100)
def add_one_to_inputs(df):
  return df[0] + df[1] + 1
$$;
Copy

使用函数属性

若要使用函数属性设置目标批次大小,请为处理程序函数的 _sf_max_batch_size 属性设置一个正整数值。

例如,此语句创建一个矢量化 Python UDF,并将每个 DataFrame 限制为最多 100 行:

create function add_one_to_inputs(x number(10, 0), y number(10, 0))
returns number(10, 0)
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_one_to_inputs'
as $$
import pandas

def add_one_to_inputs(df):
  return df[0] + df[1] + 1

add_one_to_inputs._sf_vectorized_input = pandas.DataFrame
add_one_to_inputs._sf_max_batch_size = 100
$$;
Copy

DataFrame 编码

UDF 的成批实参在输入 Pandas DataFrames 中被编码为数组,每个 DataFrame 中的行数可能会有所不同。有关更多信息,请参阅 设置目标批次大小。在 DataFrame 中,实参可以通过其索引访问,即第一个实参的索引为 0,第二个实参的索引为 1,依此类推。UDF 处理程序返回的 Pandas 数组或系列必须与输入 DataFrame 的长度相同。

为了说明这一点,假设您定义了一个矢量化 Python UDF,如下所示:

create or replace function add_inputs(x int, y float)
returns float
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_inputs'
as $$
import pandas
from _snowflake import vectorized

@vectorized(input=pandas.DataFrame)
def add_inputs(df):
  return df[0] + df[1]
$$;
Copy

这个 UDF 为第一个实参使用 df[0] 来访问 Pandas 数组,并为第二个实参使用 df[1]df[0] + df[1] 会获得一个 Pandas 数组,其中包含来自两个数组的对应元素的成对和。创建 UDF 后,您可以使用一些输入行来调用它:

select add_inputs(x, y)
from (
  select 1 as x, 3.14::float as y union all
  select 2, 1.59 union all
  select 3, -0.5
);
+------------------+
| ADD_INPUTS(X, Y) |
|------------------|
|             4.14 |
|             3.59 |
|             2.5  |
+------------------+
Copy

在这里,add_inputs Python 函数接收一个 DataFrame,类似于使用以下 Python 代码创建的 DataFrame:

>>> import pandas
>>> df = pandas.DataFrame({0: pandas.array([1, 2, 3]), 1: pandas.array([3.14, 1.59, -0.5])})
>>> df
   0     1
0  1  3.14
1  2  1.59
2  3 -0.50
Copy

处理程序函数中的 return df[0] + df[1] 行将生成类似于以下 Python 代码的数组:

>>> df[0] + df[1]
0    4.14
1    3.59
2    2.50
dtype: float64
Copy

类型支持

矢量化 Python UDFs 支持以下 SQL 类型 的实参和返回值。该表反映了如何将每个 SQL 实参编码为特定 dtype (https://pandas.pydata.org/docs/user_guide/basics.html#basics-dtypes) 的 Pandas 数组。

SQL 类型

Pandas dtype

备注

NUMBER

Int16Int32Int64 用于小数位数为 0 并且适合 64 位或更小整数类型的 NUMBER 实参。如果实参不可为 null,则改用 int16int32int64。(对于 UDTFs,始终使用 Int16Int32Int64. . object 用于小数位数不是 0 的实参,或者不适合 64 位整数的实参,其中数组元素编码为 decimal.Decimal 值。. . 为确保 16 位 dtype,请使用 4 作为最大 NUMBER 精度。为确保 32 位 dtype,请使用 9 作为最大 NUMBER 精度。为确保 64 位 dtype,请使用 18 作为最大 NUMBER 精度。

为确保 UDF 的某个输入实参解析为不可为 null,请从使用 NOT NULL 列约束创建的表中传递列,或为实参使用 IFNULL 等函数。

FLOAT

float64

NULL 值被编码为 NaN 值。在输出中, NaN 值被解析为 NULLs。

BOOLEAN

boolean (用于可为 null 的实参)或 :code:`bool`(用于不可为 null 的实参)。

VARCHAR

string

Snowflake SQL 和 Pandas 都使用 UTF-8 编码来表示字符串。

BINARY

bytes

DATE

datetime64

每个值都编码为不带时间分量的 datetime64。NULL 值编码为 numpy.timedelta('NaT')

VARIANT

object . . 每个值都编码为 dictlistintfloatstrbool

对于实参,每个 Variant 行都会动态转换为 Python 类型,对于返回值则会反过来转换。以下类型将转换为字符串而不是原生 Python 类型:decimalbinarydatetimetimestamp_ltztimestamp_ntztimestamp_tz

OBJECT

object . . 每个元素都编码为字典。

ARRAY

object . . 每个元素都编码为列表。

TIME

timedelta64

每个值都编码为相对于午夜的偏移量。NULL 值编码为 numpy.timedelta64('NaT')。用作返回类型时,输出的元素可以是 [00:00:00, 23:59:59.999999999] 范围内的 numpy.timedelta64datetime.time 值。

TIMESTAMP_LTZ

datetime64

使用本地时区,将每个值编码为相对于 UTC Unix 纪元的纳秒尺度的 numpy.datetime64。NULL 值编码为 numpy.datetime64('NaT')。当用作返回类型时,输出的元素可能是 numpy.datetime64 或 时区无感知 datetime.datetimepandas.Timestamp 值。

TIMESTAMP_NTZ

datetime64

将每个值编码为纳秒尺度的 numpy.datetime64。NULL 值编码为 numpy.datetime64('NaT')。当用作返回类型时,输出的元素可能是 numpy.datetime64 或 时区无感知 datetime.datetimepandas.Timestamp 值。

TIMESTAMP_TZ

object

将每个值编码为纳秒尺度的 pandas.Timestamp。NULL 值编码为 pandas.NA。当用作返回类型时,输出的元素可以是时区感知 datetime.datetimepandas.Timestamp 值。

GEOGRAPHY

object

将每个值格式化为 GeoJSON,然后将其转换为 Python dict

接受以下类型作为输出:Pandas Seriesarray、NumPy array、常规 Python list,以及包含 label-udf_python_batch_types`中所述预期类型的任何可迭代序列。在 dtype 是 :code:`boolbooleanint16int32int64Int16Int32Int64float64 时,使用 Pandas Seriesarray 和 NumPy array 是有效的,因为它们将其内容公开为 memoryviews。这意味着可以复制内容,而不必按顺序读取每个值。

语言: 中文