矢量化 Python UDTFs

本主题介绍矢量化 Python UDTFs。

本主题内容:

概述

矢量化 Python UDTFs(用户定义的表函数)提供了一种批量操作行的方法。

Snowflake 支持两种矢量化 UDTFs:

  • 带有矢量化 end_partition 方法的 UDTFs

  • 带有矢量化 process 方法的 UDTFs

您必须选择一种,因为 UDTF 不能同时具有矢量化 process 方法和矢量化 end_partition 方法。

带有矢量化 end_partition 方法的 UDTFs

UDTFs with a vectorized end_partition method enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) and returning results as pandas DataFrames (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or lists of pandas arrays (https://pandas.pydata.org/docs/reference/api/pandas.array.html) or pandas Series (https://pandas.pydata.org/docs/reference/series.html). This facilitates integration with libraries that operate on pandas DataFrames or pandas arrays.

将矢量化 end_partition 方法用于以下任务:

  • 逐个分区处理数据,而不是逐行处理数据。

  • 为每个分区返回多行或多列。

  • 使用在 Pandas DataFrames 上操作的库来进行数据分析。

采用矢量化 process 方法的 UDTFs

带有矢量化 process 方法的 UDTFs 提供一种批量操作行的方法,并假设操作执行一对一的映射。换句话说,该方法会为每个输入行返回一个输出行。列数不受限制。

将矢量化 process 方法用于以下任务:

  • 批量应用存在多列结果的一对一转换。

  • 使用需要 pandas.DataFrame 的库。

  • 批量处理行,无需明确分区。

  • 利用 to_pandas() API 将查询结果直接转换为 Pandas DataFrame。

先决条件

需要 Snowpark Library for Python 版本 1.14.0 或更高版本。

创建使用矢量化 end_partition 方法的 UDTF

  1. 可选:使用 __init__ 方法定义一个处理程序类,该方法将在处理每个分区之前调用。

    注意:请勿定义 process 方法。

  2. 定义一个 end_partition 方法,该方法接受一个 DataFrame 实参并返回或产生一个 pandas.DataFrame,或者 pandas.Seriespandas.arrays 的元组,其中每个数组都是一列。

    结果的列类型必须与 UDTF 定义中的列类型匹配。

  3. 使用 @vectorized 装饰器或 _sf_vectorized_input 函数属性将 end_partition 方法标记为矢量化。

    有关更多信息,请参阅 矢量化 Python UDFs.:code:@vectorized 装饰器只能在 Python UDTF 在 Snowflake 中执行时使用;例如,当使用 SQL 工作表时。使用客户端或 Python 工作表执行时,必须使用该函数属性。

备注

带有矢量化 end_partition 方法的 UDTF 输入 DataFrame 的默认列名与 SQL 函数的签名相符。列名称遵循 SQL 标识符要求。也就是说,如果标识符未加引号,则该标识符将大写,如果该标识符使用了双引号,则将保持原样。

以下代码块是使用 @vectorized 装饰器创建使用矢量化 end_partition 方法的 UDTF 的示例。

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    # initialize a state
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
    # process the DataFrame
    return result_df
Copy

以下代码块是使用函数属性创建使用矢量化 end_partition 方法的 UDTF 的示例。

import pandas

class handler:
  def __init__(self):
    # initialize a state
  def end_partition(self, df):
    # process the DataFrame
    return result_df

handler.end_partition._sf_vectorized_input = pandas.DataFrame
Copy

备注

必须使用 PARTITION BY 子句,调用使用矢量化 end_partition 方法的 UDTF 来构建分区。

要使用同一分区中的所有数据调用 UDTF,请执行以下操作:

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Copy

要使用按列 x 分区的数据调用 UDTF,请执行以下操作:

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Copy

示例:使用常规 UDTF 与使用矢量化 end_partition 方法的 UDTF 的行集合

使用常规 UDTF 的行集合:

import pandas

class handler:
  def __init__(self):
    self.rows = []
  def process(self, *row):
    self.rows.append(row)
  def end_partition(self):
    df = pandas.DataFrame(self.rows)
    # process the DataFrame
    return result_df
Copy

使用矢量化 end_partition 方法使用 UDTF 的行集合:

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    self.rows = []
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
  # process the DataFrame
    return result_df
Copy

示例:计算分区中每列的汇总统计数据

以下是一个示例,说明如何使用 Pandas describe() 方法计算分区中每列的汇总统计数据。

  1. 创建一个表并生成 3 个分区,每个分区 5 行:

    create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);
    
    -- generate 3 partitions of 5 rows each
    insert into test_values
    select 'x',
    uniform(1.5,1000.5,random(1))::float col1,
    uniform(1.5,1000.5,random(2))::float col2,
    uniform(1.5,1000.5,random(3))::float col3,
    uniform(1.5,1000.5,random(4))::float col4,
    uniform(1.5,1000.5,random(5))::float col5
    from table(generator(rowcount => 5));
    
    insert into test_values
    select 'y',
    uniform(1.5,1000.5,random(10))::float col1,
    uniform(1.5,1000.5,random(20))::float col2,
    uniform(1.5,1000.5,random(30))::float col3,
    uniform(1.5,1000.5,random(40))::float col4,
    uniform(1.5,1000.5,random(50))::float col5
    from table(generator(rowcount => 5));
    
    insert into test_values
    select 'z',
    uniform(1.5,1000.5,random(100))::float col1,
    uniform(1.5,1000.5,random(200))::float col2,
    uniform(1.5,1000.5,random(300))::float col3,
    uniform(1.5,1000.5,random(400))::float col4,
    uniform(1.5,1000.5,random(500))::float col5
    from table(generator(rowcount => 5));
    
    Copy
  2. 查看数据:

    select * from test_values;
    
    -----------------------------------------------------
    |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |
    -----------------------------------------------------
    |x     |8.0     |99.4    |714.6   |168.7   |397.2   |
    |x     |106.4   |237.1   |971.7   |828.4   |988.2   |
    |x     |741.3   |207.9   |32.6    |640.6   |63.2    |
    |x     |541.3   |828.6   |844.9   |77.3    |403.1   |
    |x     |4.3     |723.3   |924.3   |282.5   |158.1   |
    |y     |976.1   |562.4   |968.7   |934.3   |977.3   |
    |y     |390.0   |244.3   |952.6   |101.7   |24.9    |
    |y     |599.7   |191.8   |90.2    |788.2   |761.2   |
    |y     |589.5   |201.0   |863.4   |415.1   |696.1   |
    |y     |46.7    |659.7   |571.1   |938.0   |513.7   |
    |z     |313.9   |188.5   |964.6   |435.4   |519.6   |
    |z     |328.3   |643.1   |766.4   |148.1   |596.4   |
    |z     |929.0   |255.4   |915.9   |857.2   |425.5   |
    |z     |612.8   |816.4   |220.2   |879.5   |331.4   |
    |z     |487.1   |704.5   |471.5   |378.9   |481.2   |
    -----------------------------------------------------
    
    Copy
  3. 创建函数:

    create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
    returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
    language python
    RUNTIME_VERSION = 3.9
    packages=('pandas')
    handler='handler'
    as $$
    from _snowflake import vectorized
    import pandas
    
    class handler:
        @vectorized(input=pandas.DataFrame)
        def end_partition(self, df):
          # using describe function to get the summary statistics
          result = df.describe().transpose()
          # add a column at the beginning for column ids
          result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
          return result
    $$;
    
    Copy
  4. 执行以下步骤之一:

    • 调用函数并按 id 进行分区:

      -- partition by id
      select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
      over (partition by id))
      order by id, column_name;
      
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"              |"STD"               |"MIN"  |"Q1"   |"MEDIAN"  |"Q3"   |"MAX"  |
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |280.25999999999993  |339.5609267863427   |4.3    |8.0    |106.4     |541.3  |741.3  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |419.25999999999993  |331.72476995244114  |99.4   |207.9  |237.1     |723.3  |828.6  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |697.62              |384.2964311569911   |32.6   |714.6  |844.9     |924.3  |971.7  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |399.5               |321.2689294033894   |77.3   |168.7  |282.5     |640.6  |828.4  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |401.96000000000004  |359.83584173897964  |63.2   |158.1  |397.2     |403.1  |988.2  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |520.4               |339.16133329139984  |46.7   |390.0  |589.5     |599.7  |976.1  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |371.84              |221.94799616126298  |191.8  |201.0  |244.3     |562.4  |659.7  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |689.2               |371.01012789410476  |90.2   |571.1  |863.4     |952.6  |968.7  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |635.46              |366.6140927460372   |101.7  |415.1  |788.2     |934.3  |938.0  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |594.64              |359.0334218425911   |24.9   |513.7  |696.1     |761.2  |977.3  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |534.22              |252.58182238633088  |313.9  |328.3  |487.1     |612.8  |929.0  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |521.58              |281.4870103574941   |188.5  |255.4  |643.1     |704.5  |816.4  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |667.72              |315.53336907528495  |220.2  |471.5  |766.4     |915.9  |964.6  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |539.8199999999999   |318.73025742781306  |148.1  |378.9  |435.4     |857.2  |879.5  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |470.82              |99.68626786072393   |331.4  |425.5  |481.2     |519.6  |596.4  |
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      
      Copy
    • 调用函数并将整个表视为一个分区:

      -- treat the whole table as one partition
      select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
      over (partition by 1))
      order by id, column_name;
      
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"             |"STD"               |"MIN"  |"Q1"                |"MEDIAN"  |"Q3"    |"MAX"  |
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |15       |444.96             |314.01110034974425  |4.3    |210.14999999999998  |487.1     |606.25  |976.1  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |15       |437.56             |268.95505944302295  |99.4   |204.45              |255.4     |682.1   |828.6  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |15       |684.8466666666667  |331.87254839915937  |32.6   |521.3               |844.9     |938.45  |971.7  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |15       |524.9266666666666  |327.074780585783    |77.3   |225.6               |435.4     |842.8   |938.0  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |15       |489.14             |288.9176669671038   |24.9   |364.29999999999995  |481.2     |646.25  |988.2  |
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      
      Copy

采用矢量化 process 方法创建 UDTF

  1. 定义处理程序类,类似于常规 UDTFs,可选择 __init__end_partition 方法。

  2. 定义 process 方法,该方法接受一个 DataFrame 实参并返回一个 pandas.DataFrame 或者 pandas.Seriespandas.arrays 的元组,其中每个数组都是一列。

    结果的列类型必须与 UDTF 定义中的列类型匹配。返回的结果必须正好为 DataFrame 或 tuple。这与矢量化 end_partition 方法不同,您可以在其中生成或返回一个列表。

  3. 使用 @vectorized 装饰器或 _sf_vectorized_input 函数属性将 process 方法标记为矢量化。

    有关更多信息,请参阅 矢量化 Python UDFs.:code:@vectorized 装饰器只能在 Python UDTF 在 Snowflake 中执行时使用;例如,当使用 SQL 工作表时。使用客户端或 Python 工作表执行时,必须使用该函数属性。

  4. 可选:如果您的 Python 处理程序函数超过执行时间限制,请 设置目标批量大小

备注

带有矢量化 process 方法的 UDTF 输入 DataFrame 的默认列名与 SQL 函数的签名相符。列名称遵循 SQL 标识符要求。也就是说,如果标识符未加引号,则该标识符将大写,如果该标识符使用了双引号,则将保持原样。

对于带有矢量化 process 方法的 UDTF,可以将其处理程序实现为分区感知的方式处理批次,或简单地逐批处理。有关详细信息,请参阅 有状态和无状态处理

示例:带有矢量化 process 方法的 UDTF 应用独热编码

使用带有矢量化 process 方法的 UDTF 为包含十个类别的表应用独热编码。

import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.types import PandasDataFrame

class one_hot_encode:
  def process(self, df: PandasDataFrame[str]) -> PandasDataFrame[int,int,int,int,int,int,int,int,int,int]:
      return pd.get_dummies(df)
  process._sf_vectorized_input = pd.DataFrame


one_hot_encode_udtf = session.udtf.register(
  one_hot_encode,
  output_schema=["categ0", "categ1", "categ2", "categ3", "categ4", "categ5", "categ6", "categ7", "categ8", "categ9"],
  input_names=['"categ"']
)

df_table = session.table("categories")
df_table.show()
Copy

示例结果:

-----------
|"CATEG"  |
-----------
|categ1   |
|categ6   |
|categ8   |
|categ5   |
|categ7   |
|categ5   |
|categ1   |
|categ2   |
|categ2   |
|categ4   |
-----------

准备打印表:

res = df_table.select("categ", one_hot_encode_udtf("categ")).to_pandas()
print(res.head())
Copy

示例结果:

    CATEG  CATEG0  CATEG1  CATEG2  CATEG3  CATEG4  CATEG5  CATEG6  CATEG7  CATEG8  CATEG9
0  categ0       1       0       0       0       0       0       0       0       0       0
1  categ0       1       0       0       0       0       0       0       0       0       0
2  categ5       0       0       0       0       0       1       0       0       0       0
3  categ3       0       0       0       1       0       0       0       0       0       0
4  categ8       0       0       0       0       0       0       0       0       1       0

您可以使用矢量化 UDF 获取相同结果,但不太方便。您需要将结果打包成一列,然后解包该列以将结果恢复为可用的 Pandas DataFrame。

使用矢量化 UDF 的示例:

def one_hot_encode(df: PandasSeries[str]) -> PandasSeries[Variant]:
  return pd.get_dummies(df).to_dict('records')

one_hot_encode._sf_vectorized_input = pd.DataFrame

one_hot_encode_udf = session.udf.register(
  one_hot_encode,
  output_schema=["encoding"],
)

df_table = session.table("categories")
df_table.show()
res = df_table.select(one_hot_encode_udf("categ")).to_df("encoding").to_pandas()
print(res.head())
0  {\n  "categ0": false,\n  "categ1": false,\n  "...
1  {\n  "categ0": false,\n  "categ1": true,\n  "c...
2  {\n  "categ0": false,\n  "categ1": false,\n  "...
3  {\n  "categ0": false,\n  "categ1": false,\n  "...
4  {\n  "categ0": true,\n  "categ1": false,\n  "c...
Copy

类型支持

矢量化 UDTFs 支持与矢量化 UDFs 相同的 SQL 类型。但是,对于矢量化 UDTFs,适合 64 位或更小整数类型的小数位数为 0 的 SQL NUMBER 实参将始终映射到 Int16Int32Int64。与标量 UDFs 不同,如果 UDTF 的实参不可为空,则不会将其转换为 int16int32int64

要查看显示 SQL 类型如何映射到 Pandas 数据类型的表,请参阅矢量化 Python UDFs 主题中的 类型支持表

最佳实践

  • 如果必须随每一行返回一个标量,请构建一个重复值列表,而不是解包 numpy 数组来创建元组。例如,对于两列结果,不执行以下操作:

    return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
    
    Copy

    使用此:

    return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
    
    Copy
  • 要提高性能,请将半结构化数据解包到列中。

    例如,如果有一个变体列 obj,其中包含元素 x(int)y(float)z(string),那么不定义具有类似以下签名的 UDTF,并使用 vec_udtf(obj) 进行调用:

    create function vec_udtf(variant obj)
    
    Copy

    定义具有类似以下签名的 UDTF,并使用 vec_udtf(obj:x, obj:y, obj:z) 进行调用:

    create function vec_udtf(int, float, string)
    
    Copy
  • 默认情况下,Snowflake 将输入编码为支持 NULL 值的 Pandas 数据类型(例如 Int64 (https://pandas.pydata.org/docs/reference/api/pandas.Int64Dtype.html))。如果使用需要基本类型(如 numpy)的库,并且输入没有 NULL 值,则应该在使用库之前将列转换为基本类型。例如:

    input_df['y'] =  input_df['y'].astype("int64")
    
    Copy

    有关更多信息,请参阅 类型支持

  • 采用带有矢量化 end_partition 方法的 UDTFs 时,为了提高性能并防止超时,请避免使用 pandas.concat 累积部分结果。相反,只要准备好了,就会产生部分结果。

    例如,不执行以下操作:

    results = []
    while(...):
      partial_result = pd.DataFrame(...)
      results.append(partial_result)
    return pd.concat(results)
    
    Copy

    执行以下操作:

    while(...):
      partial_result = pd.DataFrame(...)
      yield partial_result
    
    Copy
语言: 中文