Snowpark Checkpoints 库:验证器¶
Snowpark Checkpoints 包提供了一组验证,可应用于 Snowpark 代码,以确保与 PySpark 代码的行为等价。
Functions provided by the framework¶
- check_with_spark: A decorator that will convert any Snowpark DataFrame arguments to a function or sample and then to PySpark DataFrames. The check will then execute a provided spark function that mirrors the functionality of the new Snowpark function and compares the outputs between the two implementations. Assuming the spark function and Snowpark functions are semantically identical, this decorator verifies those functions on real, sampled data. - 参数:
- job_context(SnowparkJobContext): The job context that contains configuration and details for the validation
- spark_function(fn): The equivalent PySpark function to compare against the Snowpark implementation
- checkpoint_name(str): A name for the checkpoint; defaults to None
- sample_number(Optional[int], optional): The number of rows for validation; defaults to 100
- sampling_strategy(Optional[SamplingStrategy], optional): The strategy used for sampling data; defaults to- SamplingStrategy.RANDOM_SAMPLE
- output_path(Optional[str], optional): The path to the file where the validation results are stored; defaults to None
 
 - Example: - def original_spark_code_I_dont_understand(df): from pyspark.sql.functions import col, when ret = df.withColumn( "life_stage", when(col("byte") < 4, "child") .when(col("byte").between(4, 10), "teenager") .otherwise("adult"), ) return ret @check_with_spark( job_context=job_context, spark_function=original_spark_code_I_dont_understand ) def new_snowpark_code_I_do_understand(df): from snowflake.snowpark.functions import col, lit, when ref = df.with_column( "life_stage", when(col("byte") < 4, lit("child")) .when(col("byte").between(4, 10), lit("teenager")) .otherwise(lit("adult")), ) return ref df1 = new_snowpark_code_I_do_understand(df) 
- validate_dataframe_checkpoint: This function validates a Snowpark Dataframe against a specific checkpoint schema file or imported Dataframe according to the argument mode. It ensures that the information collected for that DataFrame and the DataFrame that is passed to the function are equivalent. - 参数:
- df(SnowparkDataFrame): The DataFrame to validate
- checkpoint_name(str): The name of the checkpoint to validate against
- job_context(SnowparkJobContext, optional) (str): The job context for the validation; required for PARQUET mode
- mode(CheckpointMode): The mode of validation (e.g., SCHEMA, PARQUET); defaults to SCHEMA
- custom_checks(Optional[dict[Any, Any]], optional): Custom checks to apply during validation
- skip_checks(Optional[dict[Any, Any]], optional): Checks to skip during validation
- sample_frac(Optional[float], optional): Fraction of the DataFrame to sample for validation; defaults to 0.1
- sample_number(Optional[int], optional): Number of rows to sample for validation
- sampling_strategy(Optional[SamplingStrategy], optional): Strategy to use for sampling
- output_path(Optional[str], optional): The path to the file where the validation results are stored
 
 - Example: - # Check a schema/stats here! validate_dataframe_checkpoint( df1, "demo_add_a_column_dataframe", job_context=job_context, mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema) ) - Depending on the mode selected, the validation will use either the collected schema file or a Parquet-loaded Dataframe in Snowflake to verify the equivalence against the PySpark version. 
- check-output_schema: This decorator validates the schema of a Snowpark function's output and ensures that the output DataFrame conforms to a specified Pandera schema. It is particularly useful for enforcing data integrity and consistency in Snowpark pipelines. This decorator takes several parameters, including the Pandera schema to validate against, the checkpoint name, sampling parameters, and an optional job context. It wraps the Snowpark function and performs schema validation on the output DataFrame before returning the result. - Example: - from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_output_schema from numpy import int8 # Define the Pandera schema out_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), "COLUMN3": Column(float, Check.less_than(10)), } ) # Define the Snowpark function and apply the decorator @check_output_schema(out_schema, "output_schema_checkpoint") def preprocessor(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the preprocessor function preprocessed_dataframe = preprocessor(sp_dataframe) 
- check_input_schema: This decorator validates the schema of a Snowpark function's input arguments. This decorator ensures that the input DataFrame conforms to a specified Pandera schema before the function is executed. It is particularly useful for enforcing data integrity and consistency in Snowpark pipelines. This decorator takes several parameters, including the Pandera schema to validate against, the checkpoint name, sampling parameters, and an optional job context. It wraps the Snowpark function and performs schema validation on the input DataFrame before executing the function. - Example: - from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_input_schema from numpy import int8 # Define the Pandera schema input_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), } ) # Define the Snowpark function and apply the decorator @check_input_schema(input_schema, "input_schema_checkpoint") def process_dataframe(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the process_dataframe function processed_dataframe = process_dataframe(sp_dataframe) 
Statistics checks¶
默认情况下,在 Schema 模式下运行验证时,统计验证会应用于特定的列类型;使用 skip_checks 可以跳过这些检查项。
| Column type | Default check | 
|---|---|
| 数字: | between: Validate whether the value is between the min or the max, including the min and max. decimal_precision: If the value is decimal, this will check the decimal precision. mean: Validate whether the mean of the columns falls within a specific range. | 
| 布尔 | isin: Validate whether the value is True or False. True_proportion: Validate whether the proportion of the True values falls within a specific range. False_proportion: Validate whether the proportion of the False values falls within a specific range. | 
| 日期: | between: Validate whether the value is between the min or the max, including the min and max. | 
| Nullable:所有支持的类型 | Null_proportion: Validate the null proportion accordingly. | 
跳过检查¶
With this granular control for checks, you can skip column validation or specific checks for a column. With the parameter skip_checks, you can specify the particular column and which validation type you want to skip. The name of the check used to skip is the one associated with the check.
- str_contains
- str_endswith
- str_length
- str_matches
- str_startswith
- in_range
- equal_to
- greater_than_or_equal_to
- greater_than
- less_than_or_equal_to
- less_than
- not_equal_to
- notin
- isin
Example:
df = pd.DataFrame(
{
      "COLUMN1": [1, 4, 0, 10, 9],
      "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
schema = DataFrameSchema(
{
      "COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
      "COLUMN2": Column(
          float,
          [
              Check.greater_than(-20.5),
              Check.less_than(-1.0),
              Check(lambda x: x < -1.2),
          ],
      ),
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
  sp_df,
  schema,
  skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
自定义检查¶
You can add additional checks to the schema generated from the JSON file with the custom_checks property. This adds the check to the pandera schema.
Example:
df = pd.DataFrame(
  {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
  }
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
  sp_df,
  "checkpoint-name",
  custom_checks={
        "COLUMN1": [
            Check(lambda x: x.shape[0] == 5),
            Check(lambda x: x.shape[1] == 2),
    ],
    "COLUMN2": [Check(lambda x: x.shape[0] == 5)],
  },
)
采样策略¶
所提供代码的采样过程旨在通过采集数据的代表性样本来有效验证大型 DataFrames。这种方法有助于执行架构验证,而无需处理整个数据集,因为处理整个数据集的计算成本高昂且耗时。
- 参数:
- sample_frac:此参数指定要采样的 DataFrame 的比例。例如,如果- sample_frac设置为 0.1,那么将对 10% 的 DataFrame 行采样。当您要验证数据的子集以节省计算资源时,这非常有用。
- sample_number:此参数指定要从 DataFrame 中采样的确切行数。例如,如果- sample_number设置为 100,那么将从 DataFrame 中采样 100 行。当您希望验证固定数量的行而不考虑 DataFrame 大小时,这很有用。
 
验证结果¶
After any type of validation is executed, the result, whether it passes or fails, is saved into checkpoint_validation_results.json. This file is primarily used for the functionalities of the VSCode extension. It contains information about the status of the validation, timestamp, checkpoint name, number of the line where the execution of the function occurs, and the file.
It also logs the result into the default Snowflake account in a table called SNOWPARK_CHECKPOINTS_REPORT, which contains information about the validation result.
- DATE: Execute timestamp of the validation
- JOB: Name of the SnowparkJobContext
- STATUS: Status of the validation
- CHECKPOINT: Name of the checkpoint validated
- MESSAGE: Error message
- DATA: Data from the validation execution
- EXECUTION_MODE: Validation mode executed