Snowpark Checkpoints library¶
Snowpark Checkpoints is a testing library that validates code migrated from Apache PySpark (https://spark.apache.org/) to Snowpark Python.
Prerequisites¶
To use Snowpark Checkpoints, set up a Python development environment. The supported versions of Python are:
3.9
3.10
3.11
Note
Python 3.9 depends on Snowpark client version 1.5.0. Python 3.10 depends on Snowpark client version 1.5.1. Python 3.11 depends on Snowpark client version 1.9.0.
You can create a Python virtual environment for a particular Python version using tools like Anaconda (https://www.anaconda.com/), Miniconda (https://docs.conda.io/en/latest/miniconda.html), or virtualenv (https://docs.python.org/3/tutorial/venv.html).
Install Snowpark Checkpoints¶
Install the Snowpark Checkpoints package into a Python virtual environment by using conda (https://anaconda.org/anaconda/conda) or pip (https://pypi.org/project/pip/).
Using conda:
conda install snowpark-checkpoints
Using pip:
pip install snowpark-checkpoints
If you prefer, you can also install the packages individually:
snowpark-checkpoints-collectors - Use this package to collect information about PySpark DataFrames.
Using conda:
conda install snowpark-checkpoints-collectors
Using pip:
pip install snowpark-checkpoints-collectors
snowpark-checkpoints-hypothesis - Use this package to create unit tests for your Snowpark code based on synthetic data automatically generated, following the DataFrame schemas collected from the original PySpark code.
Using conda:
conda install snowpark-checkpoints-hypothesis
Using pip:
pip install snowpark-checkpoints-hypothesis
snowpark-checkpoints-validators - Use this package to validate your converted Snowpark DataFrames against the collected schemas or exported DataFrames generated by the collector functionality.
Using conda:
conda install snowpark-checkpoints-validators
Using pip:
pip install snowpark-checkpoints-validators
snowpark-checkpoints-configuration - Use this package to allow
snowpark-checkpoints-collectors
andsnowpark-checkpoints-validators
to automatically load the configuration of the checkpoints.Using conda:
conda install snowpark-checkpoints-configuration
Using pip:
pip install snowpark-checkpoints-configuration
Using the framework¶
Collect information on your PySpark Code¶
The snowpark-checkpoints-collectors
package offers a function for extracting information from the PySpark DataFrames. We can then use that data to validate against the converted Snowpark DataFrames to ensure behavioral equivalence.
Use the following function to insert a new checkpoint collection point:
Function signature:
def collect_dataframe_checkpoint(df: SparkDataFrame,
checkpoint_name: str,
sample: Optional[float],
mode: Optional[CheckpointMode],
output_path: Optional[str]) -> None:
Function parameters:
df: The PySpark DataFrame.
checkpoint_name: The name of the checkpoint. Starts with a letter (A-Z, a-z) or an underscore (_) and contains only letters, underscores and decimal digits (0-9).
sample: (optional) The sample size. The default value is 1.0 (entire PySpark DataFrame) in a range from 0 to 1.0.
mode: (optional) The execution mode. Options are
SCHEMA
andDATAFRAME
. The default value isSCHEMA
.output_path: (optional) The output path to save the checkpoint. The default value is the current working directory.
The collection process generates an output file, called checkpoint_collection_result.json
, with the information about the result for each collection point. It is a JSON file and contains the following information:
A timestamp when the collection point started.
The relative path of the file where the collection point is.
The line of code of the file where the collection point is.
The name of the collection point checkpoint.
The result of the collection point (fail or pass).
Schema inference collected data mode (Schema)¶
This is the default mode, which leverages Pandera schema inference to obtain the metadata and checks that will be evaluated for the specified DataFrame. This mode also collects custom data from columns of the DataFrame based on the PySpark type.
The column data and checks are collected based on the PySpark type of the column (see the tables below). For any column, no matter its type, custom data collected will include the name of the column, the type of the column, nullable, the count of rows, the count of not null rows, and the count of null rows.
Column type |
Custom data collected |
---|---|
Numeric ( |
The minimum value. The maximum value. The mean value. The decimal precision (in case of integer type, the value is zero). The standard deviation. |
Date |
The minimum value. The maximum value. The format of the date: %Y-%m-%d |
DayTimeIntervalType and YearMonthIntervalType |
The minimum value. The maximum value. |
Timestamp |
The minimum value. The maximum value. The format of the date: %Y-%m-%dH:%M:%S |
Timestamp ntz |
The minimum value. The maximum value. The format of the date: %Y-%m-%dT%H:%M:%S%z |
String |
The minimum length value. The maximum length value. |
Char |
PySpark handles any literal as a string type, therefore char is not a valid type. |
Varchar |
PySpark handles any literal as a string type, therefore Varchar is not a valid type. |
Decimal |
The minimum value. The maximum value. The mean value. The decimal precision. |
Array |
The type of the value. If allowed, null as an element. The proportion of null values. The maximum array size. The minimum array size. The mean size of arrays. If all arrays have the same size. |
Binary |
The maximum size. The minimum size. The mean size. If all elements have the same size. |
Map |
The type of the key. The type of the value. If allowed, null as a value. The proportion of null values. The maximum map size. The minimum map size. The mean map size. If all maps have the same size. |
Null |
NullType represents None, because the type data can not be determined; therefore it is not possible to get information from this type. |
Struct |
The metadata of the struct, it is for each structField: |
It also defines a set of predefined validations checks for each data type detailed in the following table:
Type |
Pandera Checks |
Additional Checks |
---|---|---|
Boolean |
Each value is True or False. |
The count of True and False values. |
Numeric ( |
Each value is in the range of min value and max value. |
The decimal precision. The mean value. The standard deviation. |
Date |
N/A |
Minimum and maximum values |
Timestamp |
Each value is in the range of min value and max value. |
The format of the value. |
Timestamp ntz |
Each value is in the range of min value and max value. |
The format of the value. |
String |
Each value length is in the range of min and max length. |
None |
Char |
PySpark handles any literal as a string type, therefore |
|
Varchar |
PySpark handles any literal as a string type, therefore |
|
Decimal |
N/A |
N/A |
Array |
N/A |
None |
Binary |
N/A |
None |
Map |
N/A |
None |
Null |
N/A |
N/A |
Struct |
N/A |
None |
This mode allows the user to define a sample of a DataFrame to collect, but it is optional. By default, the collection works with the entire DataFrame. The size of the sample must represent the population statistically.
Pandera can only infer the schema of a Pandas DataFrame, which implies that the PySpark DataFrame must be converted into a Pandas DataFrame, which can affect the columns’ type resolutions. In particular, Pandera infers the following PySpark types as object types: string
, array
, map
, null
, struct
, and binary
.
The output of this mode is a JSON file for each collected DataFrame, where the name of the file is the same as the checkpoint. This file contains information related to the schema and has two sections:
The Pandera schema section contains the data inferred by Pandera such as name, type (Pandas), if the column allows null values or not, and other information for each column, and checks of the columns based on the PySpark type. It is a
DataFrameSchema
object of Pandera.The custom data section is an array of the custom data collected by each column based on the PySpark type.
Note
The collection package might have memory issues when processing large PySpark DataFrames. To address this, you can set the sample parameter in the collection function to a value between 0.0 and 1.0, in order to work with a subset of the data instead of the entire PySpark DataFrame.
DataFrame collected data mode (DataFrame)¶
This mode collects the data of the PySpark DataFrame. In this case, the mechanism saves all data of the given DataFrame in parquet format. Using the default user Snowflake connection, it tries to upload the parquet files into the Snowflake temporal stage and create a table based on the information in the stage. The name of the file and the table are the same as the checkpoint.
The output of this mode is a parquet file result of the DataFrame saved and a table with the DataFrame data in the default Snowflake configuration connection.
Validate Snowpark Converted Code¶
The Snowpark Checkpoints package offers a set of validations that can be applied to the Snowpark code to ensure behavioral equivalence against the PySpark code.
Functions provided by the Framework¶
check_with_spark: A decorator that will convert any Snowpark DataFrame arguments to a function or sample and then to PySpark DataFrames. The check will then execute a provided spark function that mirrors the functionality of the new Snowpark function and compare the outputs between the two implementations. Assuming the spark function and Snowpark functions are semantically identical this allows for verification of those functions on real, sampled data.
- Parameters:
job_context
(SnowparkJobContext): The job context containing configuration and details for the validation.spark_function
(fn): The equivalent PySpark function to compare against the Snowpark implementation.checkpoint_name
(str): A name for the checkpoint. Defaults to None.sample_number
(Optional[int], optional): The number of rows for validation. Defaults to 100.sampling_strategy
(Optional[SamplingStrategy], optional): The strategy used for sampling data. Defaults toSamplingStrategy.RANDOM_SAMPLE
.output_path
(Optional[str], optional): The path to store the validation results. Defaults to None.
Following is an example:
def original_spark_code_I_dont_understand(df): from pyspark.sql.functions import col, when ret = df.withColumn( "life_stage", when(col("byte") < 4, "child") .when(col("byte").between(4, 10), "teenager") .otherwise("adult"), ) return ret @check_with_spark( job_context=job_context, spark_function=original_spark_code_I_dont_understand ) def new_snowpark_code_I_do_understand(df): from snowflake.snowpark.functions import col, lit, when ref = df.with_column( "life_stage", when(col("byte") < 4, lit("child")) .when(col("byte").between(4, 10), lit("teenager")) .otherwise(lit("adult")), ) return ref df1 = new_snowpark_code_I_do_understand(df)
validate_dataframe_checkpoint: This function validates a Snowpark Dataframe against a specific checkpoint schema file or imported Dataframe according to the argument mode. It ensures that the information collected for that DataFrame and the DataFrame that is passed to the function are equivalent.
- Parameters:
df
(SnowparkDataFrame): The DataFrame to validate.checkpoint_name
(str): The name of the checkpoint to validate against.job_context
(SnowparkJobContext, optional) (str): The job context for the validation. Required for PARQUET mode.mode
(CheckpointMode): The mode of validation (e.g., SCHEMA, PARQUET). Defaults to SCHEMA.custom_checks
(Optional[dict[Any, Any]], optional): Custom checks to apply during validation.skip_checks
(Optional[dict[Any, Any]], optional): Checks to skip during validation.sample_frac
(Optional[float], optional): Fraction of the DataFrame to sample for validation. Defaults to 0.1.sample_number
(Optional[int], optional): Number of rows to sample for validation.sampling_strategy
(Optional[SamplingStrategy], optional): Strategy to use for sampling.output_path
(Optional[str], optional): The output path for the validation results.
Following is an example:
# Check a schema/stats here! validate_dataframe_checkpoint( df1, "demo_add_a_column_dataframe", job_context=job_context, mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema) )
Depending on the mode selected the validation will use either the collected schema file or a Parquet-loaded Dataframe in Snowflake to verify the equivalence against the PySpark version.
check-ouput_schema: This decorator validates the schema of a Snowpark function’s output and ensures that the output DataFrame conforms to a specified Pandera schema. It is particularly useful for enforcing data integrity and consistency in Snowpark pipelines. This decorator takes several parameters, including the Pandera schema to validate against, the checkpoint name, sampling parameters, and an optional job context. It wraps the Snowpark function and performs schema validation on the output DataFrame before returning the result.
Following is an example:
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_output_schema from numpy import int8 # Define the Pandera schema out_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), "COLUMN3": Column(float, Check.less_than(10)), } ) # Define the Snowpark function and apply the decorator @check_output_schema(out_schema, "output_schema_checkpoint") def preprocessor(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the preprocessor function preprocessed_dataframe = preprocessor(sp_dataframe)
check_input_schema: This decorator validates the schema of a Snowpark function’s input arguments. This decorator ensures that the input DataFrame conforms to a specified Pandera schema before the function is executed. It is particularly useful for enforcing data integrity and consistency in Snowpark pipelines. This decorator takes several parameters, including the Pandera schema to validate against, the checkpoint name, sampling parameters, and an optional job context. It wraps the Snowpark function and performs schema validation on the input DataFrame before executing the function.
Following is an example:
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_input_schema from numpy import int8 # Define the Pandera schema input_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), } ) # Define the Snowpark function and apply the decorator @check_input_schema(input_schema, "input_schema_checkpoint") def process_dataframe(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the process_dataframe function processed_dataframe = process_dataframe(sp_dataframe)
Statistic checks¶
Statistics validations are applied to the specific column type by default when the validation is run in Schema
mode; these checks can be skipped with skip_checks
.
Column Type |
Default Check |
---|---|
Numeric: |
between: If the value is between the min or the max, including the min and max. decimal_precision: If the value is decimal, this will check the decimal precision. mean: Validate if the mean of the columns falls within a specific range. |
Boolean |
isin: Validate if the value is True or False. True_proportion: Validate if the proportion of the True values falls within a specific range. False_proportion: Validation if the proportion of the False values falls within a specific range. |
Date: |
between: If the value is between the min or the max, including the min and max. |
Nullable: All supported types |
Null_proportion: Validate the null proportion accordingly. |
Skip checks¶
There is a granular control for checks, which allows you to skip column validation or specific checks for a column. With the parameter skip_checks
, you can specify the particular column and which validation type you want to skip. The name of the check used to skip is the one associated with the check.
str_contains
str_endswith
str_length
str_matches
str_startswith
in_range
equal_to
greater_than_or_equal_to
greater_than
less_than_or_equal_to
less_than
not_equal_to
notin
isin
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
schema = DataFrameSchema(
{
"COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
"COLUMN2": Column(
float,
[
Check.greater_than(-20.5),
Check.less_than(-1.0),
Check(lambda x: x < -1.2),
],
),
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
sp_df,
schema,
skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Custom checks¶
You can add additional checks to the schema generated from the JSON file with the custom_checks
property. This will add the check to the pandera schema:
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
sp_df,
"checkpoint-name",
custom_checks={
"COLUMN1": [
Check(lambda x: x.shape[0] == 5),
Check(lambda x: x.shape[1] == 2),
],
"COLUMN2": [Check(lambda x: x.shape[0] == 5)],
},
)
Sampling strategies¶
The provided code’s sampling process is designed to efficiently validate large DataFrames by taking a representative sample of the data. This approach helps perform schema validation without the need to process the entire dataset, which can be computationally expensive and time-consuming.
- Parameters:
sample_frac
: This parameter specifies the fraction of the DataFrame to sample. For example, ifsample_frac
is set to 0.1, then 10 percent of the DataFrame rows will be sampled. This is useful when you want to validate a subset of the data to save on computational resources.sample_number
: This parameter specifies the exact number of rows to sample from the DataFrame. For example, ifsample_number
is set to 100, then 100 rows will be sampled from the DataFrame. This is useful when you want to validate a fixed number of rows regardless of the DataFrame size.
Validation result¶
After any type of validation is executed, the result, whether it passes or fails, will be saved into checkpoint_validation_results.json
. This file is mostly used for the functionalities of the VSCode extension. It will contain information about the status of the validation, timestamp, checkpoint name, number of line where the execution of the function occurs, and the file.
It will also log the result into the default Snowflake account in a table called SNOWPARK_CHECKPOINTS_REPORT, which will contain information about the validation result.
DATE
: Execute timestamp of the validation.JOB
: Name of the SnowparkJobContext.STATUS
: Status of the validation.CHECKPOINT
: Name of the checkpoint validated.MESSAGE
: Error message.DATA
: Data from the validation execution.EXECUTION_MODE
: Validation mode executed.
Checkpoint environment variable¶
The default behavior of the framework to find the checkpoints.json
file is to look for an environment variable called SNOWFLAKE_CHECKPOINT_CONTRACT_FILE_PATH_ENV_VAR
. This variable will contain the relative path of the checkpoint.json
. It is assigned by the VSCode extension when you run the checkpoint with the code lenses in the code. If the environment variable is not assigned, the framework will try to look for the file in the current working directory.
Hypothesis Unit Testing¶
Hypothesis is a powerful testing library for Python that is designed to enhance traditional unit testing by generating a wide range of input data automatically. It uses property-based testing, where instead of specifying individual test cases, you can describe the expected behavior of your code with properties or conditions and Hypothesis generates examples to test those properties thoroughly. This approach helps uncover edge cases and unexpected behaviors, making it especially effective for complex functions. For more information, see Hypothesis (https://hypothesis.readthedocs.io/en/latest/).
The snowpark-checkpoints-hypothesis
package extends the Hypothesis library to generate synthetic Snowpark DataFrames for testing purposes. By leveraging Hypothesis’ ability to generate diverse and randomized test data, you can create Snowpark DataFrames with varying schemas and values to simulate real-world scenarios and uncover edge cases, ensuring robust code and verifying the correctness of complex transformations.
The Hypothesis strategy for Snowpark relies on Pandera for generating synthetic data. The dataframe_strategy
function uses the specified schema to generate a Pandas DataFrame that conforms to it and then converts it into a Snowpark DataFrame.
Function signature:
def dataframe_strategy(
schema: Union[str, DataFrameSchema],
session: Session,
size: Optional[int] = None
) -> SearchStrategy[DataFrame]
Function parameters:
schema
: The schema that defines the columns, data types and checks that the generated Snowpark dataframe should match. The schema can be:A path to a JSON schema file generated by the
collect_dataframe_checkpoint
function of thesnowpark-checkpoints-collectors
package.An instance of pandera.api.pandas.container.DataFrameSchema (https://pandera.readthedocs.io/en/stable/reference/generated/pandera.api.pandas.container.DataFrameSchema.html).
session
: An instance of snowflake.snowpark.Session that will be used for creating the Snowpark DataFrames.size
: The number of rows to generate for each Snowpark DataFrame. If this parameter is not provided, the strategy will generate DataFrames of different sizes.
Function output:
Returns a Hypothesis SearchStrategy (https://github.com/HypothesisWorks/hypothesis/blob/904bdd967ca9ff23475aa6abe860a30925149da7/hypothesis-python/src/hypothesis/strategies/_internal/strategies.py#L221) that generates Snowpark DataFrames.
Supported and unsupported data types¶
The dataframe_strategy
function supports the generation of Snowpark DataFrames with different data types. Depending on the type of the schema argument passed to the function, the data types supported by the strategy will vary. Note that if the strategy finds an unsupported data type it will raise an exception.
The following table shows the supported and unsupported PySpark data types by the dataframe_strategy
function when passing a JSON file as the schema
argument.
PySpark data type |
Supported |
---|---|
Array (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.ArrayType.html) |
Yes |
Boolean (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.BooleanType.html) |
Yes |
Char (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.CharType.html) |
No |
Date (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.DateType.html) |
Yes |
DayTimeIntervalType (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.DayTimeIntervalType.html) |
No |
Decimal (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.DecimalType.html) |
No |
Map (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.MapType.html) |
No |
Null (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.NullType.html) |
No |
Byte (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.ByteType.html), Short (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.ShortType.html), Integer (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.IntegerType.html), Long (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.LongType.html), Float (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.FloatType.html), Double (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.DoubleType.html) |
Yes |
String (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.StringType.html) |
Yes |
Struct (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.StructType.html) |
No |
Timestamp (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.TimestampType.html) |
Yes |
TimestampNTZ (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.TimestampNTZType.html) |
Yes |
Varchar (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.VarcharType.html) |
No |
YearMonthIntervalType (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.YearMonthIntervalType.html) |
No |
The following table shows the Pandera data types supported by the dataframe_strategy
function when passing a DataFrameSchema object as the schema
argument and the Snowpark data types they are mapped to.
Pandera data type |
Snowpark data type |
---|---|
int8 |
|
int16 |
|
int32 |
|
int64 |
|
float32 |
|
float64 |
|
string |
|
bool |
|
datetime64[ns, tz] |
|
datetime64[ns] |
|
date |
Examples¶
The typical workflow for using the Hypothesis library to generate Snowpark DataFrames is as follows:
Create a standard Python test function with the different assertions or conditions your code should satisfy for all inputs.
Add the Hypothesis
@given
decorator to your test function and pass thedataframe_strategy
function as an argument. For more information about the@given
decorator, see hypothesis.given (https://hypothesis.readthedocs.io/en/latest/details.html#hypothesis.given).Run the test function. When the test is executed, Hypothesis will automatically provide the generated inputs as arguments to the test.
Example 1: Generate Snowpark DataFrames from a JSON file
Below is an example of how to generate Snowpark DataFrames from a JSON schema file generated by the collect_dataframe_checkpoint
function of the snowpark-checkpoints-collectors
package.
from hypothesis import given
from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session
@given(
df=dataframe_strategy(
schema="path/to/file.json",
session=Session.builder.getOrCreate(),
size=10,
)
)
def test_my_function_from_json_file(df: DataFrame):
# Test a particular function using the generated Snowpark DataFrame
...
Example 2: Generate a Snowpark DataFrame from a Pandera DataFrameSchema object
Below is an example of how to generate Snowpark DataFrames from an instance of a Pandera DataFrameSchema. For more information, see Pandera DataFrameSchema (https://pandera.readthedocs.io/en/latest/dataframe_schemas.html).
import pandera as pa
from hypothesis import given
from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session
@given(
df=dataframe_strategy(
schema=pa.DataFrameSchema(
{
"boolean_column": pa.Column(bool),
"integer_column": pa.Column("int64", pa.Check.in_range(0, 9)),
"float_column": pa.Column(pa.Float32, pa.Check.in_range(10.5, 20.5)),
}
),
session=Session.builder.getOrCreate(),
size=10,
)
)
def test_my_function_from_dataframeschema_object(df: DataFrame):
# Test a particular function using the generated Snowpark DataFrame
...
Example 3: Customize the Hypothesis behavior
You can also customize the behavior of your test with the Hypothesis @settings
decorator. This decorator allows you to customize various configuration parameters to tailor test behavior to your needs. By using the @settings
decorator you can control aspects like the maximum number of test cases, the deadline for each test execution, verbosity levels and many others. For more information, see Hypothesis settings (https://hypothesis.readthedocs.io/en/latest/settings.html).
from datetime import timedelta
from hypothesis import given, settings
from snowflake.snowpark import DataFrame, Session
from snowflake.hypothesis_snowpark import dataframe_strategy
@given(
df=dataframe_strategy(
schema="path/to/file.json",
session=Session.builder.getOrCreate(),
)
)
@settings(
deadline=timedelta(milliseconds=800),
max_examples=25,
)
def test_my_function(df: DataFrame):
# Test a particular function using the generated Snowpark DataFrame
...
Setting Up an IDE for Snowpark Checkpoints¶
The Snowflake Extension for Visual Studio Code offers support for the Snowpark Checkpoints library to enhance the experience of using the framework. It gives you fine-grained control over the collect
and validate
statements inserted into your code, as well as reviews the status of the behavioral-equivalence assertions of your converted code.
Enabling Snowpark Checkpoints¶
To enable Snowpark Checkpoints, go to Snowflake’s extension settings and check Snowpark Checkpoints: Enabled.

View¶
Setting the Snowpark Checkpoints property to Enabled, as explained previously, will open a new tab in the extension called SNOWPARK CHECKPOINTS. It displays all checkpoints in the workspace and enables multiple actions to be performed, such as enabling/disabling all or individually, clearing all from files, and by double-clicking each checkpoint, navigates to the file and line of code where it is defined.
Toggle all checkpoints¶
Located at the top right corner of the Snowpark Checkpoints tab, this option toggles the enabled property in all checkpoints.

Enabled checkpoints:

Disabling a checkpoint leads to it being skipped at runtime.

Cleaning up all checkpoints¶
Located at the top right corner of the Snowpark Checkpoints tab. This removes checkpoints from all Python files, including Jupyter notebooks, in your workspace, but it does not delete them from the contract and panel. That means they can be restored using the command Snowflake: Restore All Checkpoints
.

Inserting checkpoints in a file¶
Right-clicking inside a file will display a context menu containing the Snowpark Checkpoints option, which allows adding Collection and Validation checkpoints.
Snowpark checkpoints option in context menu:

Collector/Validator added:

Running a single checkpoint¶
A single checkpoint can be run by clicking the code lens option shown above each checkpoint. Running it will bring up an output console showing the progress and once it finishes will pull up the results view. Even if the checkpoint is disabled in the contract file, it will be enabled just for its execution.

If an entry point is not declared in the contract file the error message: Entry point not found for the checkpoint. will be displayed.

Running all enabled Snowpark Checkpoints in a file¶
In the top right corner of each file, the Run all checkpoints from the current file button will be present.

Clicking on it will bring up an output channel displaying the progress of the execution.

Timeline view¶
Displays a timeline of the checkpoints execution results.

Commands¶
The following commands are available for Snowpark Checkpoints. To use them enter Snowflake: [command name]
into the command palette.
Command |
Description |
---|---|
Snowflake: Toggle Checkpoints |
Toggles the enabled property of all checkpoints. |
Snowflake: Snowpark Checkpoints Project Initialization |
Triggers project initialization, creating a contract file if it doesn’t exist. In case it exists, a pop-up displays, asking if you want to load the checkpoint into the contract file. |
Snowflake: Clear All Checkpoints |
Deletes all checkpoints from all files in the workspace. |
Snowflake: Restore All Checkpoints |
Restore checkpoints previously deleted from files that are still present in the contract file. |
Snowflake: Add Validation/Collection Checkpoint |
Adds a validator or collector with its mandatory parameters at the cursor position. |
Snowflake: Focus on Snowpark Checkpoints View |
Shifts focus to the panel SNOWPARK CHECKPOINTS. |
Snowflake: Open Checkpoints Timeline |
Displays a timeline of Checkpoints executions. |
Snowflake: Run all Checkpoints from the current file |
Runs all enabled checkpoints in the current file. |
Snowflake: Run all Checkpoints in the workspace |
Runs all enabled checkpoints from the workspace. |
Snowflake: Show All Snowpark Checkpoints Result |
Displays a tab with all checkpoints results. |
Warnings¶
Duplicate: In a collection project, if two checkpoints are assigned with the same name, a warning : “Another checkpoint with an identical name has been detected and will be overwritten.” Validation projects can have multiple checkpoints sharing the same name, no warning will be shown.
Wrong type: Adding a checkpoint with a different type than the project type will underline it with the following error message: “Please make sure you are using the correct Snowpark-Checkpoints statement. This particular checkpoint statement is different from the others used in this project, statements that don’t match the project type will be ignored when executed.”
Invalid checkpoint name: There are invalid ways to add a checkpoint name parameter. If this happens, a warning message will be displayed: “Invalid checkpoint name. Checkpoint names must start with a letter and can only contain letters, numbers, hyphens, and underscores”.