Snowpark Checkpoints 库:收集器

Snowpark Checkpoints Python 包提供了一系列功能,以支持对迁移的工作负载进行验证。本节概述了包中包含的主要特性和功能,以及有关如何有效使用这些特性和功能的指导。

收集有关 PySpark 代码的信息

snowpark-checkpoints-collectors 包提供了从 PySpark DataFrames 中提取信息的函数。然后,我们可以使用该数据来验证转换后的 Snowpark DataFrames,以确保行为一致性。

使用以下函数插入新的检查点收集点:

函数签名:

def collect_dataframe_checkpoint(df: SparkDataFrame,
  checkpoint_name: str,
  sample: Optional[float],
  mode: Optional[CheckpointMode],
  output_path: Optional[str]) -> None:
Copy

函数参数:

  • df:PySpark DataFrame。

  • checkpoint_name:检查点名称。以字母(A-Z,a-z)或下划线 (_) 开头,仅包含字母、下划线和十进制数字 (0-9)。

  • sample:(可选)样本大小。默认值为 1.0(整个 PySpark DataFrame),范围为 0 到 1.0。

  • mode:(可选)执行模式。选项是 SCHEMADATAFRAME。默认值为 SCHEMA

  • output_path:(可选)保存检查点的输出路径。默认值为当前工作目录。

收集过程生成一个名为 checkpoint_collection_result.json 的输出文件,其中包含每个收集点的结果信息。这是一个 JSON 文件,包含以下信息:

  • 收集点开始的时间戳。

  • 收集点所在文件的相对路径。

  • 收集点所在文件的代码行。

  • 收集点检查点的名称。

  • 收集点的结果(失败或通过)。

架构推断收集数据模式(架构)

这是默认模式,利用 Pandera 架构推断来获取将要对指定 DataFrame 进行评估的元数据和检查项。此模式还会根据 PySpark 类型从 DataFrame 的列收集自定义数据。

根据列的 PySpark 类型收集列数据和检查项(请参阅下表)。对于任何列,无论其类型如何,收集的自定义数据将包括列的名称、列的类型、是否可为空、行数、非空行数和空行数。

根据列的 PySpark 类型收集自定义数据

列类型

收集的自定义数据

数字(byteshortintegerlongfloatdouble

最小值。最大值。平均值。小数精度(对于整数类型,值为零)。标准差。

日期

最小值。最大值。日期格式:%Y-%m-%d

DayTimeIntervalType 和 YearMonthIntervalType

最小值。最大值。

时间戳

最小值。最大值。日期格式:%Y-%m-%dH:%M:%S

Timestamp ntz

最小值。最大值。日期格式:%Y-%m-%dT%H:%M:%S%z

字符串

最小长度值。最大长度值。

Char

PySpark 将任何字面量作为字符串类型处理,因此 char 不是有效类型。

Varchar

PySpark 将任何字面量作为字符串类型处理,因此 Varchar 不是有效类型。

小数

最小值。最大值。平均值。小数精度。

数组

值的类型。如果允许,将 null 作为元素。空值比例。最大数组长度。最小数组长度。数组平均长度。如果所有数组大小相同。

二进制

最大大小。最小大小。平均大小。如果所有元素大小相同。

映射

键类型。值的类型。如果允许,将 null 作为值。空值比例。最大映射大小。最小映射大小。平均映射大小。如果所有映射大小相同。

Null

NullType 表示无,因为无法确定类型数据;因此无法从该类型获取信息。

结构体

结构的元数据,它适用于每个 structField:nametypenullablerows countrows not null countrows null count。它是一个数组。

它还为每种数据类型定义了一组预定义的验证检查项,详见下表:

根据列的类型收集检查项

类型

Pandera 检查

附加检查

布尔

每个值为 True 或 False。

True 和 False 值的计数。

数字(byteshortintegerlongfloatdouble

每个值都在最小值和最大值范围内。

小数精度。平均值。标准差。

日期

不适用

最小值和最大值

时间戳

每个值都在最小值和最大值范围内。

值的格式。

Timestamp ntz

每个值都在最小值和最大值范围内。

值的格式。

字符串

每个值长度都在最小和最大长度范围内。

Char

PySpark 将任何字面量作为字符串类型处理,因此 char 不是有效类型。

Varchar

PySpark 将任何字面量作为字符串类型处理,因此 Varchar 不是有效类型。

小数

不适用

不适用

数组

不适用

二进制

不适用

映射

不适用

Null

不适用

不适用

结构体

不适用

此模式允许用户定义要收集的 DataFrame 样本,但它是可选的。默认情况下,在整个 DataFrame 中进行收集。样本的大小必须在统计上代表总体。

Pandera 只能推断 Pandas DataFrame 的架构,这意味着必须将 PySpark DataFrame 转换为 Pandas DataFrame,这会影响列的类型解析。特别是,Pandera 推断以下 PySpark 类型为对象类型:stringarraymapnullstructbinary

此模式的输出是每个收集的 DataFrame 的 JSON 文件,其中文件名与检查点相同。此文件包含与架构相关的信息,包含两个部分:

  1. Pandera 架构部分包含 Pandera 推断出的数据(如名称、类型 (Pandas))、列是否允许空值、每列的其他信息,并检查该列是否基于 PySpark 类型。它是 Pandera 的 DataFrameSchema 对象。

  2. 自定义数据部分是每个列根据 PySpark 类型收集的自定义数据的数组。

备注

处理大型 PySpark DataFrames 时,集合包可能存在内存问题。要解决这个问题,您可以将集合函数中的示例参数设置为介于 0.0 到 1.0 之间的值,以便处理数据的子集,而不是整个 PySpark DataFrame。

DataFrame 收集数据模式 (DataFrame)

此模式收集 PySpark DataFrame 的数据。在这种情况下,该机制会以 Parquet 格式保存给定 DataFrame 的所有数据。使用默认用户 Snowflake 连接,它会尝试将 Parquet 文件上传到 Snowflake 临时暂存区,并根据暂存区中的信息创建表。文件和表的名称与检查点相同。

此模式的输出是保存的 DataFrame 的 Parquet 文件结果,以及默认 Snowflake 配置连接中包含 DataFrame 数据的表。

语言: 中文