Snowpark Checkpoints 库:收集器¶
The snowpark-checkpoints-collectors package offers a function for extracting information from the PySpark DataFrames. You can then use that data to validate against the converted Snowpark DataFrames to ensure behavioral equivalence.
- To insert a new checkpoint collection point, use the following function: - 函数签名: - def collect_dataframe_checkpoint(df: SparkDataFrame, checkpoint_name: str, sample: Optional[float], mode: Optional[CheckpointMode], output_path: Optional[str]) -> None: - 函数参数: - df: The PySpark DataFrame 
- checkpoint_name: The name of the checkpoint - Starts with a letter (A-Z, a-z) or an underscore (_) and contains only letters, underscores, and numbers (0-9). 
- sample: (Optional) The sample size - The default value is 1.0 (entire PySpark DataFrame) in a range from 0 to 1.0. 
- mode: (Optional) The execution mode - The options are - SCHEMA(default) and- DATAFRAME.
- output_path: (Optional) The path to the file where the checkpoint is saved - The default value is the current working directory. 
 
The collection process generates a JSON output file, called checkpoint_collection_result.json, that contains the following information about the result for each collection point:
- A timestamp for when the collection point started 
- The relative path of the file where the collection point is 
- The line of code of the file where the collection point is 
- The name of the collection point checkpoint 
- The result of the collection point (fail or pass) 
架构推断收集数据模式(架构)¶
这是默认模式,利用 Pandera 架构推断来获取将要对指定 DataFrame 进行评估的元数据和检查项。此模式还会根据 PySpark 类型从 DataFrame 的列收集自定义数据。
The column data and checks are collected based on the PySpark type of the column (see the following tables). For any column, no matter its type, the custom data collected includes the name of the column, the type of the column, nullable, the count of rows, the count of not null rows, and the count of null rows.
| 列类型 | 收集的自定义数据 | 
|---|---|
| 数字( | Minimum value; maximum value; mean value; decimal precision (in case of integer type, the value is zero); standard deviation | 
| 日期 | Minimum value; maximum value; date format (%Y-%m-%d) | 
| DayTimeIntervalType 和 YearMonthIntervalType | Minimum value; maximum value | 
| 时间戳 | Minimum value; maximum value; date format (%Y-%m-%dH:%M:%S) | 
| Timestamp ntz | Minimum value; maximum value; date format (%Y-%m-%dT%H:%M:%S%z) | 
| 字符串 | Minimum length value; maximum length value | 
| Char | PySpark handles any literal as a string type; therefore, char is not a valid type. | 
| Varchar | PySpark handles any literal as a string type; therefore, Varchar is not a valid type. | 
| 小数 | Minimum value; maximum value; mean value; decimal precision | 
| 数组 | Type of the value; if allowed, null as an element; proportion of null values; maximum array size; minimum array size; mean size of arrays; whether all arrays have the same size | 
| 二进制 | Maximum size; minimum size; mean size; whether all elements have the same size | 
| 映射 | Type of the key; type of the value; if allowed, null as a value; proportion of null values; maximum map size; minimum map size; mean map size; whether all maps have the same size | 
| Null | NullType represents None because the type data cannot be determined; therefore, it is not possible to get information from this type. | 
| 结构体 | The metadata of the struct for each structField:  | 
It also defines a set of predefined validation checks for each data type detailed in the following table:
| 类型 | Pandera checks | Additional checks | 
|---|---|---|
| 布尔 | 每个值为 True 或 False。 | The count of True and False values | 
| 数字( | 每个值都在最小值和最大值范围内。 | Decimal precision; mean value; standard deviation | 
| 日期 | 不适用 | 最小值和最大值 | 
| 时间戳 | 每个值都在最小值和最大值范围内。 | The format of the value | 
| Timestamp ntz | 每个值都在最小值和最大值范围内。 | The format of the value | 
| 字符串 | 每个值长度都在最小和最大长度范围内。 | 无 | 
| Char | PySpark handles any literal as a string type; therefore,  | |
| Varchar | PySpark handles any literal as a string type; therefore,  | |
| 小数 | 不适用 | 不适用 | 
| 数组 | 不适用 | 无 | 
| 二进制 | 不适用 | 无 | 
| 映射 | 不适用 | 无 | 
| Null | 不适用 | 不适用 | 
| 结构体 | 不适用 | 无 | 
此模式允许用户定义要收集的 DataFrame 样本,但它是可选的。默认情况下,在整个 DataFrame 中进行收集。样本的大小必须在统计上代表总体。
Pandera can only infer the schema of a pandas DataFrame, which implies that the PySpark DataFrame must be converted into a pandas DataFrame, which can affect the columns' type resolutions. In particular, pandera infers the following PySpark types as object types: string, array, map, null, struct, and binary.
The output of this mode is a JSON file for each collected DataFrame, and the name of the file is the same as the checkpoint. This file contains information related to the schema and has two sections:
- The Pandera schema section contains the data inferred by pandera such as name, type (pandas), whether the column allows null values, and other information for each column, and checks whether the column is based on the PySpark type. It is a - DataFrameSchemaobject of pandera.
- The custom data section is an array of the custom data collected by each column based on the PySpark type. 
备注
The collection package might have memory issues when processing large PySpark DataFrames. To work with a subset of the data instead of the entire PySpark DataFrame, you can set the sample parameter in the collection function to a value between 0.0 and 1.0.
DataFrame 收集数据模式 (DataFrame)¶
此模式收集 PySpark DataFrame 的数据。在这种情况下,该机制会以 Parquet 格式保存给定 DataFrame 的所有数据。使用默认用户 Snowflake 连接,它会尝试将 Parquet 文件上传到 Snowflake 临时暂存区,并根据暂存区中的信息创建表。文件和表的名称与检查点相同。
此模式的输出是保存的 DataFrame 的 Parquet 文件结果,以及默认 Snowflake 配置连接中包含 DataFrame 数据的表。