Snowpark Checkpoints 库

Snowpark Checkpoints 是一个测试库,用于验证从 Apache PySpark (https://spark.apache.org/) 迁移到 Snowpark Python 的代码。

先决条件

要使用 Snowpark Checkpoints,请设置 Python 开发环境。受支持的 Python 版本包括:

  • 3.9

  • 3.10

  • 3.11

备注

Python 3.9 依赖于 Snowpark 客户端版本 1.5.0。Python 3.10 依赖于 Snowpark 客户端版本 1.5.1。Python 3.11 依赖于 Snowpark 客户端版本 1.9.0。

您可以使用 ` Anaconda <https://www.anaconda.com/ (https://www.anaconda.com/)>`_、` Miniconda <https://docs.conda.io/en/latest/miniconda.html (https://docs.conda.io/en/latest/miniconda.html)>`_ 或 virtualenv (https://docs.python.org/3/tutorial/venv.html) 等工具为特定的 Python 版本创建 Python 虚拟环境。

安装 Snowpark Checkpoints

使用 conda (https://anaconda.org/anaconda/conda) 或 pip (https://pypi.org/project/pip/) 将 Snowpark Checkpoints 包安装到 Python 虚拟环境中。

  • 使用 conda:

    conda install snowpark-checkpoints
    
    Copy
  • 使用 pip:

    pip install snowpark-checkpoints
    
    Copy

如果您愿意,还可以单独安装软件包:

  • snowpark-checkpoints-collectors – 使用此包收集有关 PySpark DataFrames 的信息。

    • 使用 conda:

      conda install snowpark-checkpoints-collectors
      
      Copy
    • 使用 pip:

      pip install snowpark-checkpoints-collectors
      
      Copy
  • Snowpark-checkpoints-hypothesis – 使用此包根据自动生成的合成数据,按照从原始 PySpark 代码收集的 DataFrame 架构,为 Snowpark 代码创建单元测试。

    • 使用 conda:

      conda install snowpark-checkpoints-hypothesis
      
      Copy
    • 使用 pip:

      pip install snowpark-checkpoints-hypothesis
      
      Copy
  • snowpark-checkpoints-validators – 使用此包来验证转换后的 Snowpark DataFrames 是否符合通过收集器功能生成的收集到的架构或导出的 DataFrames。

    • 使用 conda:

      conda install snowpark-checkpoints-validators
      
      Copy
    • 使用 pip:

      pip install snowpark-checkpoints-validators
      
      Copy
  • ** Snowpark-checkpoints-configuration** – 使用此包允许 snowpark-checkpoints-collectorssnowpark-checkpoints-validators 自动加载检查点的配置。

    • 使用 conda:

      conda install snowpark-checkpoints-configuration
      
      Copy
    • 使用 pip:

      pip install snowpark-checkpoints-configuration
      
      Copy

使用框架

收集有关 PySpark 代码的信息

snowpark-checkpoints-collectors 包提供了从 PySpark DataFrames 中提取信息的函数。然后,我们可以使用该数据来验证转换后的 Snowpark DataFrames,以确保行为一致性。

使用以下函数插入新的检查点收集点:

函数签名:

def collect_dataframe_checkpoint(df: SparkDataFrame,
  checkpoint_name: str,
  sample: Optional[float],
  mode: Optional[CheckpointMode],
  output_path: Optional[str]) -> None:
Copy

函数参数:

  • df:PySpark DataFrame。

  • checkpoint_name:检查点名称。以字母(A-Z,a-z)或下划线 (_) 开头,仅包含字母、下划线和十进制数字 (0-9)。

  • sample:(可选)样本大小。默认值为 1.0(整个 PySpark DataFrame),范围为 0 到 1.0。

  • mode:(可选)执行模式。选项是 SCHEMADATAFRAME。默认值为 SCHEMA

  • output_path:(可选)保存检查点的输出路径。默认值为当前工作目录。

收集过程生成一个名为 checkpoint_collection_result.json 的输出文件,其中包含每个收集点的结果信息。这是一个 JSON 文件,包含以下信息:

  • 收集点开始的时间戳。

  • 收集点所在文件的相对路径。

  • 收集点所在文件的代码行。

  • 收集点检查点的名称。

  • 收集点的结果(失败或通过)。

架构推断收集数据模式(架构)

这是默认模式,利用 Pandera 架构推断来获取将要对指定 DataFrame 进行评估的元数据和检查项。此模式还会根据 PySpark 类型从 DataFrame 的列收集自定义数据。

根据列的 PySpark 类型收集列数据和检查项(请参阅下表)。对于任何列,无论其类型如何,收集的自定义数据将包括列的名称、列的类型、是否可为空、行数、非空行数和空行数。

根据列的 PySpark 类型收集自定义数据

列类型

收集的自定义数据

数字(byteshortintegerlongfloatdouble

最小值。最大值。平均值。小数精度(对于整数类型,值为零)。标准差。

日期

最小值。最大值。日期格式:%Y-%m-%d

DayTimeIntervalType 和 YearMonthIntervalType

最小值。最大值。

时间戳

最小值。最大值。日期格式:%Y-%m-%dH:%M:%S

Timestamp ntz

最小值。最大值。日期格式:%Y-%m-%dT%H:%M:%S%z

字符串

最小长度值。最大长度值。

Char

PySpark 将任何字面量作为字符串类型处理,因此 char 不是有效类型。

Varchar

PySpark 将任何字面量作为字符串类型处理,因此 Varchar 不是有效类型。

小数

最小值。最大值。平均值。小数精度。

数组

值的类型。如果允许,将 null 作为元素。空值比例。最大数组长度。最小数组长度。数组平均长度。如果所有数组大小相同。

二进制

最大大小。最小大小。平均大小。如果所有元素大小相同。

映射

键类型。值的类型。如果允许,将 null 作为值。空值比例。最大映射大小。最小映射大小。平均映射大小。如果所有映射大小相同。

Null

NullType 表示无,因为无法确定类型数据;因此无法从该类型获取信息。

结构体

结构的元数据,它适用于每个 structField:nametypenullablerows countrows not null countrows null count。它是一个数组。

它还为每种数据类型定义了一组预定义的验证检查项,详见下表:

根据列的类型收集检查项

类型

Pandera 检查

附加检查

布尔

每个值为 True 或 False。

True 和 False 值的计数。

数字(byteshortintegerlongfloatdouble

每个值都在最小值和最大值范围内。

小数精度。平均值。标准差。

日期

不适用

最小值和最大值

时间戳

每个值都在最小值和最大值范围内。

值的格式。

Timestamp ntz

每个值都在最小值和最大值范围内。

值的格式。

字符串

每个值长度都在最小和最大长度范围内。

Char

PySpark 将任何字面量作为字符串类型处理,因此 char 不是有效类型。

Varchar

PySpark 将任何字面量作为字符串类型处理,因此 Varchar 不是有效类型。

小数

不适用

不适用

数组

不适用

二进制

不适用

映射

不适用

Null

不适用

不适用

结构体

不适用

此模式允许用户定义要收集的 DataFrame 样本,但它是可选的。默认情况下,在整个 DataFrame 中进行收集。样本的大小必须在统计上代表总体。

Pandera 只能推断 Pandas DataFrame 的架构,这意味着必须将 PySpark DataFrame 转换为 Pandas DataFrame,这会影响列的类型解析。特别是,Pandera 推断以下 PySpark 类型为对象类型:stringarraymapnullstructbinary

此模式的输出是每个收集的 DataFrame 的 JSON 文件,其中文件名与检查点相同。此文件包含与架构相关的信息,包含两个部分:

  1. Pandera 架构部分包含 Pandera 推断出的数据,如名称、类型 (Pandas)、列是否允许空值、每列的其他信息,以及基于 PySpark 类型的列检查项。它是 Pandera 的 DataFrameSchema 对象。

  2. 自定义数据部分是每个列根据 PySpark 类型收集的自定义数据的数组。

备注

处理大型 PySpark DataFrames 时,集合包可能存在内存问题。要解决这个问题,您可以将集合函数中的示例参数设置为介于 0.0 到 1.0 之间的值,以便处理数据的子集,而不是整个 PySpark DataFrame。

DataFrame 收集数据模式 (DataFrame)

此模式收集 PySpark DataFrame 的数据。在这种情况下,该机制会以 Parquet 格式保存给定 DataFrame 的所有数据。使用默认用户 Snowflake 连接,它会尝试将 Parquet 文件上传到 Snowflake 临时暂存区,并根据暂存区中的信息创建表。文件和表的名称与检查点相同。

此模式的输出是保存的 DataFrame 的 Parquet 文件结果,以及默认 Snowflake 配置连接中包含 DataFrame 数据的表。

验证 Snowpark 转换的代码

Snowpark Checkpoints 包提供了一组验证,可应用于 Snowpark 代码,以确保与 PySpark 代码的行为等价。

框架提供的函数

  • check_with_spark:装饰器将 Snowpark DataFrame 实参转换为函数或样本,然后转换为 PySpark DataFrames。然后,该检查将执行提供的 Spark 函数,该函数镜像新的 Snowpark 函数的功能,并比较两个实现之间的输出。假设 Spark 函数和 Snowpark 函数在语义上相同,这允许在真实的采样数据上验证这些函数。

    参数:
    • job_context (SnowparkJobContext):包含验证的配置和详细信息的作业上下文。

    • spark_function (fn):与 Snowpark 实现进行比较的等效 PySpark 函数。

    • checkpoint_name (str):检查点的名称。默认值为无。

    • :code:`sample_number`(Optional[int],可选):验证的行数。默认值为 100。

    • sampling_strategy`(Optional[SamplingStrategy],可选):用于采样数据的策略。默认值为 :code:`SamplingStrategy.RANDOM_SAMPLE

    • :code:`output_path`(Optional[str],可选):存储验证结果的路径。默认值为无。

    示例如下:

     def original_spark_code_I_dont_understand(df):
     from pyspark.sql.functions import col, when
    
     ret = df.withColumn(
         "life_stage",
         when(col("byte") < 4, "child")
         .when(col("byte").between(4, 10), "teenager")
         .otherwise("adult"),
     )
     return ret
    
    
    @check_with_spark(
     job_context=job_context, spark_function=original_spark_code_I_dont_understand
    )
    def new_snowpark_code_I_do_understand(df):
      from snowflake.snowpark.functions import col, lit, when
    
      ref = df.with_column(
          "life_stage",
          when(col("byte") < 4, lit("child"))
          .when(col("byte").between(4, 10), lit("teenager"))
          .otherwise(lit("adult")),
     )
     return ref
    
    
     df1 = new_snowpark_code_I_do_understand(df)
    
    Copy
  • validate_dataframe_checkpoint:该函数根据实参模式,验证 Snowpark 数据框是否符合特定的检查点架构文件或导入的数据框。它确保为该 DataFrame 收集的信息和传递给该函数的 DataFrame 是等效的。

    参数:
    • df (SnowparkDataFrame):要验证的 DataFrame。

    • checkpoint_name (str):要验证的检查点的名称。

    • :code:`job_context`(SnowparkJobContext,可选)(str):验证的作业上下文。对于 PARQUET 模式是必需的。

    • mode (CheckpointMode):验证模式(如 SCHEMA、PARQUET)。默认值为 SCHEMA。

    • :code:`custom_checks`(Optional[dict[Any, Any]],可选):在验证期间要应用的自定义检查项。

    • :code:`skip_checks`(Optional[dict[Any, Any]],可选):在验证期间要跳过的检查项。

    • :code:`sample_frac`(Optional[float],可选):用于验证的 DataFrame 的抽样比例。默认值为 0.1。

    • :code:`sample_number`(Optional[int],可选):用于验证的抽样行数。

    • :code:`sampling_strategy`(Optional[SamplingStrategy],可选):用于采样的策略。

    • :code:`output_path`(Optional[str],可选):验证结果的输出路径。

    示例如下:

    # Check a schema/stats here!
    validate_dataframe_checkpoint(
       df1,
    "demo_add_a_column_dataframe",
    job_context=job_context,
    mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema)
    )
    
    Copy

    根据所选模式,验证过程将使用收集的架构文件或 Snowflake 中加载的 Parquet 数据框来验证与 PySpark 版本的等价性。

  • check-output_schema:此装饰器验证 Snowpark 函数输出的架构,并确保输出 DataFrame 符合指定的 Pandera 架构。对于在 Snowpark 管道中强制执行数据完整性和一致性特别有用。此装饰器采用多个参数,包括用于验证的 Pandera 架构、检查点名称、采样参数和可选的作业上下文。它包装 Snowpark 函数,并在返回结果之前对输出 DataFrame 执行架构验证。

    示例如下:

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_output_schema
    from numpy import int8
    
    # Define the Pandera schema
    out_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
        "COLUMN3": Column(float, Check.less_than(10)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_output_schema(out_schema, "output_schema_checkpoint")
    def preprocessor(dataframe: SnowparkDataFrame):
     return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the preprocessor function
    preprocessed_dataframe = preprocessor(sp_dataframe)
    
    Copy
  • check_input_schema:此装饰器验证 Snowpark 函数的输入实参的架构。此装饰器确保输入 DataFrame 在执行函数之前符合指定的 Pandera 架构。对于在 Snowpark 管道中强制执行数据完整性和一致性特别有用。此装饰器采用多个参数,包括用于验证的 Pandera 架构、检查点名称、采样参数和可选的作业上下文。它包装 Snowpark 函数,并在执行函数之前对输入 DataFrame 执行架构验证。

    示例如下:

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_input_schema
    from numpy import int8
    
    # Define the Pandera schema
    input_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_input_schema(input_schema, "input_schema_checkpoint")
    def process_dataframe(dataframe: SnowparkDataFrame):
    return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the process_dataframe function
    processed_dataframe = process_dataframe(sp_dataframe)
    
    Copy

统计检查

默认情况下,在 Schema 模式下运行验证时,统计验证会应用于特定的列类型;使用 skip_checks 可以跳过这些检查项。

列类型

默认检查

数字:byteshortintegerlongfloatdouble

between:如果值介于最小值或最大值之间,包括最小值和最大值。

decimal_precision:如果值为小数,将检查小数精度。

mean:验证列的均值是否在特定范围内。

布尔

isin:验证值为 True 还是 False。

True_proportion:验证 True 值的比例是否在特定范围内。

False_proportion:验证 False 值的比例是否在特定范围内。

日期:datetimestamptimestamp_ntz

between:如果值介于最小值或最大值之间,包括最小值和最大值。

Nullable:所有支持的类型

Null_proportion:相应地验证空值比例。

跳过检查

存在对检查的细粒度控制,这允许您跳过列验证或跳过某列的特定检查。使用参数 skip_checks,您可以指定特定列以及要跳过的验证类型。用于跳过的检查项的名称是与检查项关联的名称。

  • str_contains

  • str_endswith

  • str_length

  • str_matches

  • str_startswith

  • in_range

  • ​​equal_to

  • greater_than_or_equal_to

  • greater_than

  • less_than_or_equal_to

  • less_than

  • not_equal_to

  • notin

  • isin

df = pd.DataFrame(
{
      "COLUMN1": [1, 4, 0, 10, 9],
      "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)

schema = DataFrameSchema(
{
      "COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
      "COLUMN2": Column(
          float,
          [
              Check.greater_than(-20.5),
              Check.less_than(-1.0),
              Check(lambda x: x < -1.2),
          ],
      ),
}
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
  sp_df,
  schema,
  skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Copy

自定义检查

您可以使用 custom_checks 属性向 JSON 文件生成的架构添加其他检查项。这将把检查项添加到 pandera 架构中:

df = pd.DataFrame(
  {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
  }
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)

# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
  sp_df,
  "checkpoint-name",
  custom_checks={
        "COLUMN1": [
            Check(lambda x: x.shape[0] == 5),
            Check(lambda x: x.shape[1] == 2),
    ],
    "COLUMN2": [Check(lambda x: x.shape[0] == 5)],
  },
)
Copy

采样策略

所提供代码的采样过程旨在通过采集数据的代表性样本来有效验证大型 DataFrames。这种方法有助于执行架构验证,而无需处理整个数据集,因为处理整个数据集的计算成本高昂且耗时。

参数
  • sample_frac:此参数指定要采样的 DataFrame 的比例。例如,如果 sample_frac 设置为 0.1,那么将对 10% 的 DataFrame 行采样。当您要验证数据的子集以节省计算资源时,这非常有用。

  • sample_number:此参数指定要从 DataFrame 中采样的确切行数。例如,如果 sample_number 设置为 100,那么将从 DataFrame 中采样 100 行。当您希望验证固定数量的行而不考虑 DataFrame 大小时,这很有用。

验证结果

执行任何类型的验证后,无论验证通过与否,结果都将保存到 checkpoint_validation_results.json 中。此文件主要用于 VSCode 扩展的功能。它将包含有关验证状态、时间戳、检查点名称、函数执行所在的行号以及文件的信息。

它还会将结果记录到默认 Snowflake 账户中一个名为 SNOWPARK_CHECKPOINTS_REPORT 的表中,该表中将包含有关验证结果的信息。

  • DATE:执行验证的时间戳。

  • JOB:SnowparkJobContext 的名称。

  • STATUS:验证的状态。

  • CHECKPOINT:已验证检查点的名称。

  • MESSAGE:错误消息。

  • DATA:验证执行过程中的数据。

  • EXECUTION_MODE:已执行验证模式。

检查点环境变量

该框架查找 checkpoints.json 文件的默认行为是查找名为 SNOWFLAKE_CHECKPOINT_CONTRACT_FILE_PATH_ENV_VAR 的环境变量。此变量将包含 checkpoint.json 的相对路径。当您使用代码中的代码透镜运行检查点时,它由 VSCode 扩展进行分配。如果未分配环境变量,该框架将尝试在当前工作目录中查找文件。

Hypothesis 单元测试

Hypothesis 是一个强大的 Python 测试库,旨在通过自动生成广泛的输入数据来增强传统的单元测试。它使用基于属性的测试,您可以使用属性或条件来描述代码的预期行为,而不是指定单个测试用例,而 Hypothesis 会生成示例以对这些属性进行全面测试。这种方法有助于发现边缘情况和意外行为,使其对复杂函数特别有效。有关更多信息,请参阅 Hypothesis (https://hypothesis.readthedocs.io/en/latest/)。

snowpark-checkpoints-hypothesis 包扩展了 Hypothesis 库,以生成用于测试目的的合成 Snowpark DataFrames。通过利用 Hypothesis 生成多样化和随机测试数据的功能,您可以创建具有不同架构和值的 Snowpark DataFrames,以模拟真实场景和发现边缘情况,确保稳健的代码并验证复杂变换的正确性。

Snowpark 的 Hypothesis 策略依赖 Pandera 生成合成数据。dataframe_strategy 函数使用指定的架构生成符合该架构的 Pandas DataFrame,然后将其转换为 Snowpark DataFrame。

函数签名

def dataframe_strategy(
  schema: Union[str, DataFrameSchema],
  session: Session,
  size: Optional[int] = None
) -> SearchStrategy[DataFrame]
Copy

函数参数

函数输出

返回生成 Snowpark DataFrames 的 Hypothesis SearchStrategy (https://github.com/HypothesisWorks/hypothesis/blob/904bdd967ca9ff23475aa6abe860a30925149da7/hypothesis-python/src/hypothesis/strategies/_internal/strategies.py#L221)。

支持和不支持的数据类型

dataframe_strategy 函数支持生成具有不同数据类型的 Snowpark DataFrames。根据传递给函数的架构实参的类型,策略支持的数据类型会有所不同。请注意,如果策略发现不支持的数据类型,将引发异常。

下表显示了当作为 schema 实参传递 JSON 文件时,dataframe_strategy 函数支持和不支持的 PySpark 数据类型。

PySpark 数据类型

支持

数组 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.ArrayType.html)

布尔值 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.BooleanType.html)

字符 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.CharType.html)

日期 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.DateType.html)

DayTimeIntervalType (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.DayTimeIntervalType.html)

小数 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.DecimalType.html)

映射 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.MapType.html)

空值 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.NullType.html)

字节型 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.ByteType.html)、短整型 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.ShortType.html)、整型 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.IntegerType.html)、` 长整型 <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.LongType.html (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.LongType.html)>`_、单精度浮点型 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.FloatType.html)、双精度浮点型 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.DoubleType.html)

字符串 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.StringType.html)

结构 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.StructType.html)

时间戳 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.TimestampType.html)

TimestampNTZ (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.TimestampNTZType.html)

可变长度字符串 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.VarcharType.html)

YearMonthIntervalType (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.YearMonthIntervalType.html)

下表显示了 dataframe_strategy 函数在作为 schema 实参传递 DataFrameSchema 对象时支持的 Pandera 数据类型及其映射到的 Snowpark 数据类型。

Pandera 数据类型

Snowpark 数据类型

int8

ByteType

int16

ShortType

int32

IntegerType

int64

LongType

float32

FloatType

float64

DoubleType

字符串

StringType

bool

BooleanType

datetime64[ns, tz]

TimestampType(TZ)

datetime64[ns]

TimestampType(NTZ)

日期

DateType

示例

使用 Hypothesis 库生成 Snowpark DataFrames 的典型工作流如下:

  1. 创建标准的 Python 测试函数,其中包含代码应满足的所有输入的不同断言或条件。

  2. 将 Hypothesis @given 装饰器添加到测试函数中,并将 dataframe_strategy 函数作为实参传递。有关 @given 装饰器的更多信息,请参阅 hypothesis.given (https://hypothesis.readthedocs.io/en/latest/details.html#hypothesis.given)。

  3. 运行测试函数。执行测试时,Hypothesis 会自动将生成的输入作为实参提供给测试。

示例 1:从 JSON 文件生成 Snowpark DataFrames

下面示例介绍了如何从 snowpark-checkpoints-collectors 包的 collect_dataframe_checkpoint 函数生成的 JSON 架构文件生成 Snowpark DataFrames。

from hypothesis import given

from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session


@given(
    df=dataframe_strategy(
        schema="path/to/file.json",
        session=Session.builder.getOrCreate(),
        size=10,
    )
)
def test_my_function_from_json_file(df: DataFrame):
    # Test a particular function using the generated Snowpark DataFrame
    ...
Copy

示例 2:从 Pandera DataFrameSchema 对象生成 Snowpark DataFrame

下面示例介绍了如何从 Pandera DataFrameSchema 实例生成 Snowpark DataFrames。有关更多信息,请参阅 Pandera DataFrameSchema (https://pandera.readthedocs.io/en/latest/dataframe_schemas.html)。

import pandera as pa

from hypothesis import given

from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session


@given(
    df=dataframe_strategy(
        schema=pa.DataFrameSchema(
            {
                "boolean_column": pa.Column(bool),
                "integer_column": pa.Column("int64", pa.Check.in_range(0, 9)),
                "float_column": pa.Column(pa.Float32, pa.Check.in_range(10.5, 20.5)),
            }
        ),
        session=Session.builder.getOrCreate(),
        size=10,
    )
)
def test_my_function_from_dataframeschema_object(df: DataFrame):
    # Test a particular function using the generated Snowpark DataFrame
    ...
Copy

示例 3:自定义 Hypothesis 行为

您还可以使用 Hypothesis @settings 装饰器自定义测试的行为。此装饰器允许您自定义各种配置参数,以根据您的需要定制测试行为。通过使用 @settings 装饰器,您可以控制诸如测试用例的最大数量、每次测试执行的最后期限、详细程度和许多其他方面。有关更多信息,请参阅 Hypothesis 设置 (https://hypothesis.readthedocs.io/en/latest/settings.html)。

from datetime import timedelta

from hypothesis import given, settings
from snowflake.snowpark import DataFrame, Session

from snowflake.hypothesis_snowpark import dataframe_strategy


@given(
    df=dataframe_strategy(
        schema="path/to/file.json",
        session=Session.builder.getOrCreate(),
    )
)
@settings(
    deadline=timedelta(milliseconds=800),
    max_examples=25,
)
def test_my_function(df: DataFrame):
    # Test a particular function using the generated Snowpark DataFrame
    ...
Copy

为 Snowpark Checkpoints 设置 IDE

适用于 Visual Studio Code 的 Snowflake 扩展程序 为 Snowpark Checkpoints 库提供支持,以增强使用框架的体验。它使您能够对插入到代码中的 collectvalidate 语句进行细粒度控制,并查看转换后的代码的行为等价断言的状态。

启用 Snowpark Checkpoints

要启用 Snowpark Checkpoints,请转到 Snowflake 的扩展设置并检查 Snowpark Checkpoints: Enabled

已启用的检查点

视图

如前所述,通过将 Snowpark Checkpoints 属性设置为 Enabled,将在名为 SNOWPARK CHECKPOINTS 的扩展中打开一个新选项卡。该选项卡显示工作区中的所有检查点,并能够执行多个操作,例如启用/禁用全部或单个检查点、清理文件中的全部检查点,并通过双击每个检查点导航到定义检查点的文件和代码行。

切换所有检查点

此选项位于 Snowpark Checkpoints 选项卡的右上角,可切换所有检查点中已启用的属性。

切换检查点

已启用的检查点:

切换检查点

禁用检查点会导致它在运行时被跳过。

禁用检查点

清理所有检查点

位于 Snowpark Checkpoints 选项卡的右上角。这会从工作区中的所有 Python 文件(包括 Jupyter Notebook)中删除检查点,但不会从合同和面板中删除。这意味着可以使用命令 Snowflake: Restore All Checkpoints 使其恢复。

删除检查点

在文件中插入检查点

在文件内右键单击将显示包含 Snowpark Checkpoints 选项的上下文菜单,该菜单允许添加 CollectionValidation 检查点。

上下文菜单中的 Snowpark 检查点选项:

添加检查点

已添加收集器/验证器:

收集器和验证器检查点

运行单个检查点

通过点击每个检查点上方显示的代码透视表选项,可以运行单个检查点。运行单个检查点将打开一个显示进度的输出控制台,完成后将调出结果视图。即使在合同文件中禁用了该检查点,但在执行时它也会被启用。

运行单个检查点

如果未在合同文件中声明入口点,将显示错误消息:Entry point not found for the checkpoint.

未找到入口点

在文件中运行所有已启用的 Snowpark Checkpoints

在每个文件的右上角,将显示 Run all checkpoints from the current file 按钮。

运行所有检查点

点击它会出现一个显示执行进度的输出通道。

检查点进度

时间轴视图

显示检查点执行结果的时间轴。

时间轴视图

命令

Snowpark Checkpoints 可以使用以下命令。要使用这些命令,请在命令面板中输入 Snowflake: [command name]

Snowpark Checkpoints 命令

命令

描述

Snowflake:切换检查点

切换所有检查点的已启用属性。

Snowflake:Snowpark Checkpoints 项目初始化

触发项目初始化,如果不存在合同文件,则创建合同文件。如果存在,将显示一个弹出窗口,询问您是否要将检查点加载到合同文件中。

Snowflake:清除所有检查点

从工作区中的所有文件中删除所有检查点。

Snowflake:恢复所有检查点

恢复之前从已存在的合同文件中删除的检查点。

Snowflake:添加验证/收集检查点

在光标位置添加带有其强制参数的验证器或收集器。

Snowflake:关注 Snowpark Checkpoints 视图

将专注点转移到面板 SNOWPARK CHECKPOINTS

Snowflake:打开检查点时间轴

显示检查点执行的时间轴。

Snowflake:从当前文件运行所有检查点

运行当前文件中所有已启用的检查点。

Snowflake:运行工作区中的所有检查点

从工作区运行所有已启用的检查点。

Snowflake:显示所有 Snowpark Checkpoints 结果

显示包含所有检查点结果的选项卡。

警告

  • 重复:在集合项目中,如果分配了两个同名检查点,则会显示警告:“Another checkpoint with an identical name has been detected and will be overwritten.” 验证项目可以有多个检查点共享相同名称,但不会显示警告。

  • 错误类型:如果在项目中添加了与项目类型不同类型的检查点,系统将会显示以下错误消息 “Please make sure you are using the correct Snowpark-Checkpoints statement.This particular checkpoint statement is different from the others used in this project, statements that don't match the project type will be ignored when executed.”

  • 无效的检查点名称:添加检查点名称参数的方法无效。如果出现这种情况,将显示一条警告消息:“Invalid checkpoint name.Checkpoint names must start with a letter and can only contain letters, numbers, hyphens, and underscores”

日志记录

Snowpark Checkpoints 库使用 Python 内置的 日志 (https://docs.python.org/3/library/logging.html) 模块来提供有关其内部操作的日志消息。库会发出不同 日志级别 (https://docs.python.org/3/library/logging.html#logging-levels) 的日志消息,可用于了解库的行为和诊断问题。

日志结构

Snowpark Checkpoints 库遵循模块级日志记录方法,每个需要记录消息的 Python 模块定义自己的记录器,记录器的名称与模块的完全限定名称相匹配。

组成 Snowpark Checkpoints 库的每个包定义一个顶级记录器。此记录器以包本身命名,并充当该包中所有模块级记录器的父级。顶级记录器使用 NullHandler (https://docs.python.org/3/library/logging.handlers.html#nullhandler) 初始化,确保日志消息不会产生意外输出,除非您显式配置日志记录。应用于顶级记录器的任何日志记录配置都自动应用于该包中的所有模块记录器。

以下是 Snowpark Checkpoints 库的顶级记录器名称:

Snowpark Checkpoints 记录器名称

包名称

顶级记录器名称

snowpark-checkpoints-collectors

snowflake.snowpark_checkpoints_collector

snowpark-checkpoints-validators

snowflake.snowpark_checkpoints

snowpark-checkpoints-configuration

snowflake.snowpark_checkpoints_configuration

snowpark-checkpoints-hypothesis

snowflake.hypothesis_snowpark

这种模块级方法允许对日志记录输出进行细粒度控制,并确保日志从更高级别的记录器继承设置,同时发出有关其来源的精确信息。

日志记录配置

Snowpark Checkpoints 库不提供默认日志记录配置。您必须在应用程序中显式配置日志记录才能查看日志消息。

如果您的应用程序已经使用 Python 内置的日志记录模块进行了日志记录配置,那么您应该能够看到 Snowpark Checkpoints 库发出的日志消息,而无需任何额外的配置。如果没有日志记录配置,可以使用 basicConfig (https://docs.python.org/3/library/logging.html#logging.basicConfig) 函数或通过创建自定义配置来设置日志记录。

最好在应用程序的入口点配置一次日志记录,例如在主脚本或初始化应用程序的模块中。这样可以确保在使用任何库组件之前设置日志记录。如果在独立脚本中使用库,则应在该脚本的开头设置日志记录。以下是一些帮助您入门的示例:

基本日志记录配置

启用日志最简单快捷的方法是使用 basicConfig (https://docs.python.org/3/library/logging.html#logging.basicConfig) 函数。此功能允许您配置根记录器,该根记录器是记录模块层次结构中所有记录器的“父级”。

以下示例演示了如何设置根记录器以捕获指定日志级别及以上的日志消息并将其打印到控制台:

import logging

logging.basicConfig(
  level=logging.DEBUG, # Adjust the log level as needed
  format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
Copy

高级日志记录配置

对于更高级的日志记录配置,您可以使用 logging.config (https://docs.python.org/3/library/logging.config.html) 模块设置日志记录。此方法允许您定义自定义记录器、处理程序和格式化程序,并使用字典对其进行配置。

以下示例演示了如何使用自定义配置设置根记录器,该配置将消息记录到控制台和文件中:

import logging.config
from datetime import datetime

LOGGING_CONFIG = {
  "version": 1,
  "disable_existing_loggers": False,
  "formatters": {
      "standard": {
          "format": "{asctime} - {name} - {levelname} - {message}",
          "style": "{",
          "datefmt": "%Y-%m-%d %H:%M:%S",
      },
  },
  "handlers": {
      "console": {
          "class": "logging.StreamHandler",
          "formatter": "standard",
          "level": "DEBUG",  # Adjust the log level as needed
      },
      "file": {
          "class": "logging.FileHandler",
          "formatter": "standard",
          "filename": f"app_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.log",
          "level": "DEBUG",  # Adjust the log level as needed
          "encoding": "utf-8",
      },
  },
  "root": {
      "handlers": ["console", "file"],
      "level": "DEBUG",  # Adjust the log level as needed
  },
}

logging.config.dictConfig(LOGGING_CONFIG)
Copy

启用特定包的日志记录

要为 Snowpark Checkpoints 库的特定包配置日志记录而不影响其他记录器,您可以为该包使用顶级记录器名称,并根据需要应用任何自定义处理程序和格式化程序。将配置应用于顶级记录器确保所有模块级记录器都继承该配置。

以下示例演示了如何仅为以下包配置日志记录:

  • snowpark-checkpoints-collectors

  • snowpark-checkpoints-configuration

  • snowpark-checkpoints-validators

  • snowpark-checkpoints-hypothesis

import logging.config
from datetime import datetime

LOGGING_CONFIG = {
  "version": 1,
  "disable_existing_loggers": False,
  "formatters": {
      "standard": {
          "format": "{asctime} - {name} - {levelname} - {message}",
          "style": "{",
          "datefmt": "%Y-%m-%d %H:%M:%S",
      },
  },
  "handlers": {
      "console": {
          "class": "logging.StreamHandler",
          "formatter": "standard",
          "level": "DEBUG",  # Adjust the log level as needed
      },
      "file": {
          "class": "logging.FileHandler",
          "formatter": "standard",
          "filename": f"app_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.log",
          "level": "DEBUG",  # Adjust the log level as needed
          "encoding": "utf-8",
      },
  },
  "loggers": {
      "snowflake.snowpark_checkpoints_collector": {
          "handlers": ["console", "file"],
          "level": "DEBUG",  # Adjust the log level as needed
          "propagate": False,
      },
      "snowflake.snowpark_checkpoints": {
          "handlers": ["console", "file"],
          "level": "DEBUG",  # Adjust the log level as needed
          "propagate": False,
      },
      "snowflake.snowpark_checkpoints_configuration": {
          "handlers": ["console", "file"],
          "level": "DEBUG",  # Adjust the log level as needed
          "propagate": False,
      },
      "snowflake.hypothesis_snowpark": {
          "handlers": ["console", "file"],
          "level": "DEBUG",  # Adjust the log level as needed
          "propagate": False,
      },
  },
}

logging.config.dictConfig(LOGGING_CONFIG)
Copy

有关 Python 日志记录模块的更多详细信息,请参阅 Python 日志记录文档 (https://docs.python.org/3/library/logging.html)。

语言: 中文