矢量化 Python UDTFs

本主题介绍矢量化 Python UDTFs。

本主题内容:

概述

矢量化 Python UDTFs(用户定义的表函数)提供了一种批量操作行的方法。

Snowflake 支持两种矢量化 UDTFs:

  • 带有矢量化 end_partition 方法的 UDTFs

  • 带有矢量化 process 方法的 UDTFs

您必须选择一种,因为 UDTF 不能同时具有矢量化进程和矢量化 end_partition 方法。

带有矢量化 end_partition 方法的 UDTFs

UDTFs with a vectorized end_partition method enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) and returning results as pandas DataFrames (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or lists of pandas arrays (https://pandas.pydata.org/docs/reference/api/pandas.array.html) or pandas Series (https://pandas.pydata.org/docs/reference/series.html). This makes for easy integration with libraries that operate on pandas DataFrames or pandas arrays.

当您想要进行以下操作时,可使用矢量化 end_partition 方法:

  • 逐个分区处理数据,而不是逐行处理数据。

  • 为每个分区返回多行或多列。

  • 使用在 Pandas DataFrames 上操作的库来进行数据分析。

采用矢量化 process 方法的 UDTFs

带有矢量化 process 方法的 UDTFs 提供一种批量操作行的方法,并假设操作执行一对一的映射。换句话说,该方法会为每个输入行返回一个输出行。列数不受限制。

当您想要进行以下操作时,可使用矢量化 process 方法:

  • 批量应用存在多列结果的一对一转换。

  • 使用需要 pandas.DataFrame 的库。

  • 批量处理行,无需明确分区。

  • 利用 to_pandas() API 将查询结果直接转换为 Pandas DataFrame。

先决条件

需要 Snowpark Library for Python 版本 1.14.0 或更高版本。

开始使用带有矢量化 end_partition 方法的 UDTFs

要创建带有矢量化 end_partition 方法的 UDTF,请执行以下操作:

  • 或者,使用 __init__ 方法定义一个处理程序类,该方法将在处理每个分区之前调用。

  • 不要定义 process 方法。

  • 定义一个 end_partition 方法,该方法接受一个 DataFrame 实参并返回或产生一个 pandas.DataFrame,或者 pandas.Seriespandas.arrays 的元组,其中每个数组都是一列。结果的列类型必须与 UDTF 定义中的列类型匹配。

  • 使用 @vectorized 装饰器或 _sf_vectorized_input 函数属性将 end_partition 方法标记为矢量化。有关更多信息,请参阅 矢量化 Python UDFs@vectorized 装饰器只能在 Snowflake 中执行 Python UDTF 时使用,例如使用 SQL 工作表时。使用客户端或 Python 工作表执行时,必须使用该函数属性。

备注

带有矢量化 end_partition 方法的 UDTF 输入 DataFrame 的默认列名与 SQL 函数的签名相符。列名称遵循 SQL 标识符要求。也就是说,如果标识符未加引号,则该标识符将大写,如果该标识符使用了双引号,则将保持原样。

以下是使用 @vectorized 装饰器创建带有矢量化 end_partition 方法的 UDTF 的示例。

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    # initialize a state
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
    # process the DataFrame
    return result_df
Copy

以下是使用函数属性创建带有矢量化 end_partition 方法的 UDTF 的示例。

import pandas

class handler:
  def __init__(self):
    # initialize a state
  def end_partition(self, df):
    # process the DataFrame
    return result_df

handler.end_partition._sf_vectorized_input = pandas.DataFrame
Copy

备注

必须使用 PARTITION BY 子句,调用带有矢量化 end_partition 方法的 UDTF 来构建分区。

要使用同一分区中的所有数据调用 UDTF,请执行以下操作:

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Copy

要使用按列 x 分区的数据调用 UDTF,请执行以下操作:

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Copy

示例:使用常规 UDTF 与带有矢量化 end_partition 方法的 UDTF 的行集合

以下是如何通过使用常规 UDTF 进行行收集的示例。

import pandas

class handler:
  def __init__(self):
    self.rows = []
  def process(self, *row):
    self.rows.append(row)
  def end_partition(self):
    df = pandas.DataFrame(self.rows)
    # process the DataFrame
    return result_df
Copy

以下是如何通过带有矢量化 end_partition 方法的 UDTF 进行行收集的示例。

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    self.rows = []
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
  # process the DataFrame
    return result_df
Copy

示例:计算分区中每列的汇总统计数据

以下是一个示例,说明如何使用 Pandas describe() 方法计算分区中每列的汇总统计数据。

首先,创建一个表并生成 3 个分区,每个分区 5 行。

create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);

-- generate 3 partitions of 5 rows each
insert into test_values
select 'x',
uniform(1.5,1000.5,random(1))::float col1,
uniform(1.5,1000.5,random(2))::float col2,
uniform(1.5,1000.5,random(3))::float col3,
uniform(1.5,1000.5,random(4))::float col4,
uniform(1.5,1000.5,random(5))::float col5
from table(generator(rowcount => 5));

insert into test_values
select 'y',
uniform(1.5,1000.5,random(10))::float col1,
uniform(1.5,1000.5,random(20))::float col2,
uniform(1.5,1000.5,random(30))::float col3,
uniform(1.5,1000.5,random(40))::float col4,
uniform(1.5,1000.5,random(50))::float col5
from table(generator(rowcount => 5));

insert into test_values
select 'z',
uniform(1.5,1000.5,random(100))::float col1,
uniform(1.5,1000.5,random(200))::float col2,
uniform(1.5,1000.5,random(300))::float col3,
uniform(1.5,1000.5,random(400))::float col4,
uniform(1.5,1000.5,random(500))::float col5
from table(generator(rowcount => 5));
Copy

看一下数据。

select * from test_values;

-----------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |
-----------------------------------------------------
|x     |8.0     |99.4    |714.6   |168.7   |397.2   |
|x     |106.4   |237.1   |971.7   |828.4   |988.2   |
|x     |741.3   |207.9   |32.6    |640.6   |63.2    |
|x     |541.3   |828.6   |844.9   |77.3    |403.1   |
|x     |4.3     |723.3   |924.3   |282.5   |158.1   |
|y     |976.1   |562.4   |968.7   |934.3   |977.3   |
|y     |390.0   |244.3   |952.6   |101.7   |24.9    |
|y     |599.7   |191.8   |90.2    |788.2   |761.2   |
|y     |589.5   |201.0   |863.4   |415.1   |696.1   |
|y     |46.7    |659.7   |571.1   |938.0   |513.7   |
|z     |313.9   |188.5   |964.6   |435.4   |519.6   |
|z     |328.3   |643.1   |766.4   |148.1   |596.4   |
|z     |929.0   |255.4   |915.9   |857.2   |425.5   |
|z     |612.8   |816.4   |220.2   |879.5   |331.4   |
|z     |487.1   |704.5   |471.5   |378.9   |481.2   |
-----------------------------------------------------
Copy

接下来,创建函数。

create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
language python
runtime_version=3.8
packages=('pandas')
handler='handler'
as $$
from _snowflake import vectorized
import pandas

class handler:
    @vectorized(input=pandas.DataFrame)
    def end_partition(self, df):
      # using describe function to get the summary statistics
      result = df.describe().transpose()
      # add a column at the beginning for column ids
      result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
      return result
$$;
Copy

调用函数并按 id 进行分区。

-- partition by id
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by id))
order by id, column_name;

--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"              |"STD"               |"MIN"  |"Q1"   |"MEDIAN"  |"Q3"   |"MAX"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |280.25999999999993  |339.5609267863427   |4.3    |8.0    |106.4     |541.3  |741.3  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |419.25999999999993  |331.72476995244114  |99.4   |207.9  |237.1     |723.3  |828.6  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |697.62              |384.2964311569911   |32.6   |714.6  |844.9     |924.3  |971.7  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |399.5               |321.2689294033894   |77.3   |168.7  |282.5     |640.6  |828.4  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |401.96000000000004  |359.83584173897964  |63.2   |158.1  |397.2     |403.1  |988.2  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |520.4               |339.16133329139984  |46.7   |390.0  |589.5     |599.7  |976.1  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |371.84              |221.94799616126298  |191.8  |201.0  |244.3     |562.4  |659.7  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |689.2               |371.01012789410476  |90.2   |571.1  |863.4     |952.6  |968.7  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |635.46              |366.6140927460372   |101.7  |415.1  |788.2     |934.3  |938.0  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |594.64              |359.0334218425911   |24.9   |513.7  |696.1     |761.2  |977.3  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |534.22              |252.58182238633088  |313.9  |328.3  |487.1     |612.8  |929.0  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |521.58              |281.4870103574941   |188.5  |255.4  |643.1     |704.5  |816.4  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |667.72              |315.53336907528495  |220.2  |471.5  |766.4     |915.9  |964.6  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |539.8199999999999   |318.73025742781306  |148.1  |378.9  |435.4     |857.2  |879.5  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |470.82              |99.68626786072393   |331.4  |425.5  |481.2     |519.6  |596.4  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

或者,调用函数并将整个表视为一个分区。

-- treat the whole table as one partition
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by 1))
order by id, column_name;

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"             |"STD"               |"MIN"  |"Q1"                |"MEDIAN"  |"Q3"    |"MAX"  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |15       |444.96             |314.01110034974425  |4.3    |210.14999999999998  |487.1     |606.25  |976.1  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |15       |437.56             |268.95505944302295  |99.4   |204.45              |255.4     |682.1   |828.6  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |15       |684.8466666666667  |331.87254839915937  |32.6   |521.3               |844.9     |938.45  |971.7  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |15       |524.9266666666666  |327.074780585783    |77.3   |225.6               |435.4     |842.8   |938.0  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |15       |489.14             |288.9176669671038   |24.9   |364.29999999999995  |481.2     |646.25  |988.2  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

开始使用带有矢量化 process 方法的 UDTFs

要创建带有矢量化 process 方法的 UDTF,请执行以下操作:

  • 定义处理程序类,类似于常规 UDTFs,可选择 __init__end_partition 方法。

  • 定义 process 方法,该方法接受一个 DataFrame 实参并返回一个 pandas.DataFrame 或者 pandas.Seriespandas.arrays 的元组,其中每个数组都是一列。结果的列类型必须与 UDTF 定义中的列类型相匹配。返回的结果必须只有一个 DataFrame 或元组。这与可以产生或返回列表的矢量化 end_partition 方法不同。

  • 使用 @vectorized 装饰器或 _sf_vectorized_input 函数属性将 process 方法标记为矢量化。有关更多信息,请参阅 矢量化 Python UDFs@vectorized 装饰器只能在 Snowflake 中执行 Python UDTF 时使用,例如使用 SQL 工作表时。使用客户端或 Python 工作表执行时,必须使用函数属性。如果您的 Python 处理程序函数超出了执行时间限制,您也可以 设置目标批次大小

备注

带有矢量化 process 方法的 UDTF 输入 DataFrame 的默认列名与 SQL 函数的签名相符。列名称遵循 SQL 标识符要求。也就是说,如果标识符未加引号,则该标识符将大写,如果该标识符使用了双引号,则将保持原样。

对于带有矢量化 process 方法的 UDTF,可以将其处理程序实现为分区感知的方式处理批次,或简单地逐批处理。有关详细信息,请参阅 有状态和无状态处理

示例:带有矢量化 process 方法的 UDTF 应用独热编码

下面是一个示例,说明了如何使用带有矢量化 process 方法的 UDTF 为包含十个类别的表应用独热编码。

import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.types import PandasDataFrame

class one_hot_encode:
  def process(self, df: PandasDataFrame[str]) -> PandasDataFrame[int,int,int,int,int,int,int,int,int,int]:
      return pd.get_dummies(df)
  process._sf_vectorized_input = pd.DataFrame


one_hot_encode_udtf = session.udtf.register(
  one_hot_encode,
  output_schema=["categ0", "categ1", "categ2", "categ3", "categ4", "categ5", "categ6", "categ7", "categ8", "categ9"],
  input_names=['"categ"']
)

df_table = session.table("categories")
df_table.show()
Copy

结果如下。

-----------
|"CATEG"  |
-----------
|categ1   |
|categ6   |
|categ8   |
|categ5   |
|categ7   |
|categ5   |
|categ1   |
|categ2   |
|categ2   |
|categ4   |
-----------

准备打印表。

res = df_table.select("categ", one_hot_encode_udtf("categ")).to_pandas()
print(res.head())
Copy

结果如下。

    CATEG  CATEG0  CATEG1  CATEG2  CATEG3  CATEG4  CATEG5  CATEG6  CATEG7  CATEG8  CATEG9
0  categ0       1       0       0       0       0       0       0       0       0       0
1  categ0       1       0       0       0       0       0       0       0       0       0
2  categ5       0       0       0       0       0       1       0       0       0       0
3  categ3       0       0       0       1       0       0       0       0       0       0
4  categ8       0       0       0       0       0       0       0       0       1       0

虽然不太方便,但是作为替代,您也可以使用矢量化 UDF 获得相同结果。您需要将结果打包成一列,然后解包该列以将结果恢复为可用的 Pandas DataFrame。下面是使用矢量化 UDF 的示例。

def one_hot_encode(df: PandasSeries[str]) -> PandasSeries[Variant]:
  return pd.get_dummies(df).to_dict('records')

one_hot_encode._sf_vectorized_input = pd.DataFrame

one_hot_encode_udf = session.udf.register(
  one_hot_encode,
  output_schema=["encoding"],
)

df_table = session.table("categories")
df_table.show()
res = df_table.select(one_hot_encode_udf("categ")).to_df("encoding").to_pandas()
print(res.head())
0  {\n  "categ0": false,\n  "categ1": false,\n  "...
1  {\n  "categ0": false,\n  "categ1": true,\n  "c...
2  {\n  "categ0": false,\n  "categ1": false,\n  "...
3  {\n  "categ0": false,\n  "categ1": false,\n  "...
4  {\n  "categ0": true,\n  "categ1": false,\n  "c...
Copy

类型支持

矢量化 UDTFs 支持与矢量化 UDFs 相同的 SQL 类型。但是,对于矢量化 UDTFs,适合 64 位或更小整数类型的小数位数为 0 的 SQL NUMBER 实参将始终映射到 Int16Int32Int64。与标量 UDFs 不同,如果 UDTF 的实参不可为空,则不会将其转换为 int16int32int64

要查看显示 SQL 类型如何映射到 Pandas 数据类型的表,请参阅矢量化 Python UDFs 主题中的 类型支持表

最佳实践

本部分介绍最佳实践。

  1. 如果必须随每一行返回一个标量,请构建一个重复值列表,而不是解包 numpy 数组来创建元组。例如,对于 2 列结果,不执行以下操作:

    return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
    
    Copy

    执行以下操作:

    return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
    
    Copy
  2. 要提高性能,请将半结构化数据解包到列中。例如,如果有一个变体列 obj,其中包含元素 x(int)y(float)z(string),那么不定义具有类似以下签名的 UDTF :

    create function vec_udtf(variant obj)
    
    Copy

    并使用 vec_udtf(obj) 调用它,而是定义具有以下签名的 UDTF :

    create function vec_udtf(int, float, string)
    
    Copy

    并使用 vec_udtf(obj:x, obj:y, obj:z) 调用它。

  3. 默认情况下,Snowflake 将输入编码为支持 NULL 值的 Pandas 数据类型(例如 Int64 (https://pandas.pydata.org/docs/reference/api/pandas.Int64Dtype.html))。如果使用需要基本类型(如 numpy)的库,并且输入没有 NULL 值,则应该在使用库之前将列转换为基本类型。例如:

    input_df['y'] =  input_df['y'].astype("int64")
    
    Copy

    有关更多信息,请参阅 类型支持

  4. 采用带有矢量化 end_partition 方法的 UDTFs 时,为了提高性能并防止超时,请避免使用 pandas.concat 累积部分结果。相反,只要准备好了,就会产生部分结果。例如,不执行以下操作:

    results = []
    while(...):
      partial_result = pd.DataFrame(...)
      results.append(partial_result)
    return pd.concat(results)
    
    Copy

    执行以下操作:

    while(...):
      partial_result = pd.DataFrame(...)
      yield partial_result
    
    Copy
语言: 中文