Snowpark Checkpoints 库:收集器¶
Snowpark Checkpoints Python 包提供了一系列功能,以支持对迁移的工作负载进行验证。本节概述了包中包含的主要特性和功能,以及有关如何有效使用这些特性和功能的指导。
收集有关 PySpark 代码的信息¶
snowpark-checkpoints-collectors
包提供了从 PySpark DataFrames 中提取信息的函数。然后,我们可以使用该数据来验证转换后的 Snowpark DataFrames,以确保行为一致性。
使用以下函数插入新的检查点收集点:
函数签名:
def collect_dataframe_checkpoint(df: SparkDataFrame,
checkpoint_name: str,
sample: Optional[float],
mode: Optional[CheckpointMode],
output_path: Optional[str]) -> None:
函数参数:
df:PySpark DataFrame。
checkpoint_name:检查点名称。以字母(A-Z,a-z)或下划线 (_) 开头,仅包含字母、下划线和十进制数字 (0-9)。
sample:(可选)样本大小。默认值为 1.0(整个 PySpark DataFrame),范围为 0 到 1.0。
mode:(可选)执行模式。选项是
SCHEMA
和DATAFRAME
。默认值为SCHEMA
。output_path:(可选)保存检查点的输出路径。默认值为当前工作目录。
收集过程生成一个名为 checkpoint_collection_result.json
的输出文件,其中包含每个收集点的结果信息。这是一个 JSON 文件,包含以下信息:
收集点开始的时间戳。
收集点所在文件的相对路径。
收集点所在文件的代码行。
收集点检查点的名称。
收集点的结果(失败或通过)。
架构推断收集数据模式(架构)¶
这是默认模式,利用 Pandera 架构推断来获取将要对指定 DataFrame 进行评估的元数据和检查项。此模式还会根据 PySpark 类型从 DataFrame 的列收集自定义数据。
根据列的 PySpark 类型收集列数据和检查项(请参阅下表)。对于任何列,无论其类型如何,收集的自定义数据将包括列的名称、列的类型、是否可为空、行数、非空行数和空行数。
列类型 |
收集的自定义数据 |
---|---|
数字( |
最小值。最大值。平均值。小数精度(对于整数类型,值为零)。标准差。 |
日期 |
最小值。最大值。日期格式:%Y-%m-%d |
DayTimeIntervalType 和 YearMonthIntervalType |
最小值。最大值。 |
时间戳 |
最小值。最大值。日期格式:%Y-%m-%dH:%M:%S |
Timestamp ntz |
最小值。最大值。日期格式:%Y-%m-%dT%H:%M:%S%z |
字符串 |
最小长度值。最大长度值。 |
Char |
PySpark 将任何字面量作为字符串类型处理,因此 char 不是有效类型。 |
Varchar |
PySpark 将任何字面量作为字符串类型处理,因此 Varchar 不是有效类型。 |
小数 |
最小值。最大值。平均值。小数精度。 |
数组 |
值的类型。如果允许,将 null 作为元素。空值比例。最大数组长度。最小数组长度。数组平均长度。如果所有数组大小相同。 |
二进制 |
最大大小。最小大小。平均大小。如果所有元素大小相同。 |
映射 |
键类型。值的类型。如果允许,将 null 作为值。空值比例。最大映射大小。最小映射大小。平均映射大小。如果所有映射大小相同。 |
Null |
NullType 表示无,因为无法确定类型数据;因此无法从该类型获取信息。 |
结构体 |
结构的元数据,它适用于每个 structField: |
它还为每种数据类型定义了一组预定义的验证检查项,详见下表:
类型 |
Pandera 检查 |
附加检查 |
---|---|---|
布尔 |
每个值为 True 或 False。 |
True 和 False 值的计数。 |
数字( |
每个值都在最小值和最大值范围内。 |
小数精度。平均值。标准差。 |
日期 |
不适用 |
最小值和最大值 |
时间戳 |
每个值都在最小值和最大值范围内。 |
值的格式。 |
Timestamp ntz |
每个值都在最小值和最大值范围内。 |
值的格式。 |
字符串 |
每个值长度都在最小和最大长度范围内。 |
无 |
Char |
PySpark 将任何字面量作为字符串类型处理,因此 |
|
Varchar |
PySpark 将任何字面量作为字符串类型处理,因此 |
|
小数 |
不适用 |
不适用 |
数组 |
不适用 |
无 |
二进制 |
不适用 |
无 |
映射 |
不适用 |
无 |
Null |
不适用 |
不适用 |
结构体 |
不适用 |
无 |
此模式允许用户定义要收集的 DataFrame 样本,但它是可选的。默认情况下,在整个 DataFrame 中进行收集。样本的大小必须在统计上代表总体。
Pandera 只能推断 Pandas DataFrame 的架构,这意味着必须将 PySpark DataFrame 转换为 Pandas DataFrame,这会影响列的类型解析。特别是,Pandera 推断以下 PySpark 类型为对象类型:string
、array
、map
、null
、struct
和 binary
。
此模式的输出是每个收集的 DataFrame 的 JSON 文件,其中文件名与检查点相同。此文件包含与架构相关的信息,包含两个部分:
Pandera 架构部分包含 Pandera 推断出的数据(如名称、类型 (Pandas))、列是否允许空值、每列的其他信息,并检查该列是否基于 PySpark 类型。它是 Pandera 的
DataFrameSchema
对象。自定义数据部分是每个列根据 PySpark 类型收集的自定义数据的数组。
备注
处理大型 PySpark DataFrames 时,集合包可能存在内存问题。要解决这个问题,您可以将集合函数中的示例参数设置为介于 0.0 到 1.0 之间的值,以便处理数据的子集,而不是整个 PySpark DataFrame。
DataFrame 收集数据模式 (DataFrame)¶
此模式收集 PySpark DataFrame 的数据。在这种情况下,该机制会以 Parquet 格式保存给定 DataFrame 的所有数据。使用默认用户 Snowflake 连接,它会尝试将 Parquet 文件上传到 Snowflake 临时暂存区,并根据暂存区中的信息创建表。文件和表的名称与检查点相同。
此模式的输出是保存的 DataFrame 的 Parquet 文件结果,以及默认 Snowflake 配置连接中包含 DataFrame 数据的表。