为 Snowpark Python 编写测试

本主题介绍如何在连接到 Snowflake 时测试您的 Snowpark 代码。您可以使用标准测试工具(例如 PyTest)来测试您的 Snowpark Python UDFs、DataFrame 转换和存储过程。

本主题内容:

全面的测试有助于防止出现意外的破坏性更改。单元测试能够验证一段代码是否按预期运行。集成测试有助于确保组件在端到端用例中正确地协同工作。

本文档中的示例使用 PyTest,这是热门的 Python 测试框架之一。有关其他指导和最佳实践,请参阅 PyTest 文档 (https://docs.pytest.org/en/7.4.x/)。

或者,您可以使用 Snowpark Python 本地测试框架在本地创建和运行 Snowpark Python DataFrames,而无需连接到 Snowflake 账户。有关更多信息,请参阅 本地测试框架

设置您的测试

通过在您的项目中安装 conda install pytest,运行 pip install pytest 或 PyTest。您也可以将其添加到您的 requirements.txt 或 Conda 环境文件中。

在源代码目录旁边创建一个 test 目录,并将您的单元和集成测试添加到其中。要查看示例,请参阅 ` Snowpark Python 项目模板 <https://github.com/Snowflake-Labs/snowpark-python-template/ (https://github.com/Snowflake-Labs/snowpark-python-template/)>`__。

为 Snowpark 会话创建 PyTest 夹具

PyTest 夹具是在测试(或测试模块)之前执行的函数,用于为测试提供数据或连接。在这种情况下,创建一个返回 Snowpark Session 对象的 PyTest 夹具。

  1. 如果您还没有目录,请创建 test 目录。

  2. conftest.py 下创建包含以下内容的 test,其中包含的 connection_parameters 是附带 Snowflake 账户凭证的词典。有关词典格式的更多信息,请参阅 创建会话

  3. Session 夹具创建为模块范围而不是文件范围的固定,防止创建多个会话并由于会话对象冲突而导致问题。

from snowflake.snowpark.session import Session

@pytest.fixture(scope='module')
def session(request) -> Session:
    connection_parameters = {}
    return Session.builder.configs(...).create()
Copy

UDFs 单元测试

您可以通过将 UDF 处理程序作为通用 Python 方法进行测试来测试您的 Python UDF 逻辑。

  1. 在您的 test 目录下为 UDF 单元测试创建文件。例如,将文件命名为 test_functions.py

  2. 导入 Python 方法进行测试。

  3. 对于每个测试场景,创建一个名为 :code:` test_ <scenario_to_test> ` 的 Python 方法。

例如,这是一个 Python UDF 处理程序:

def fahrenheit_to_celsius(temp_f: float) -> float:
    """
    Converts fahrenheit to celsius
    """
    return (float(temp_f) - 32) * (5/9)
Copy

您可以将此方法导入到测试文件 (test/test_functions.py) 中,并将其作为通用 Python 方法进行测试。

import farenheit_to_celsius

def test_farenheit_to_celsius():
    expected = 0.0
    actual = farenheit_to_celsius(32)
    assert expected == actual
Copy

DataFrame 转换单元测试

为 DataFrame 转换添加单元测试有助于防止意外错误和回归。为了使您的 DataFrame 逻辑易于测试,请将转换封装到一个 Python 方法中,该方法会将 DataFrames 视为要转换的输入并返回转换后的 DataFrames。

在下面的示例中,mf_df_transformer 包含转换逻辑。它可以导入到 Python 项目的其他模块中并轻松进行测试。

from snowflake.snowpark.dataframe import DataFrame, col

def my_df_tranformer(df: DataFrame) -> DataFrame:
    return df \
        .with_column('c', df['a']+df['b']) \
        .filter(col('c') > 3)
Copy

要测试此转换,请执行以下步骤:

  1. test 目录 (test/test_transformers.py) 下为 DataFrame 测试创建文件 test_transformers.py

  2. 为要测试的转换器创建测试方法:test_my_df_transformer(session)。这里的 session 参数是指前一部分中创建的会话夹具。

  3. 使用会话夹具,在测试方法中创建输入和预期输出 DataFrames。

  4. 将输入 DataFrame 传递给转换器,并将预期的 DataFrame 与转换器返回的实际 DataFrame 进行比较。

# test/test_transformers.py

import my_df_transformer

def test_my_df_transformer(session):
    input_df = session.create_dataframe([[1,2],[3,4]], ['a', 'b'])
    expected_df = session.create_dataframe([3,4,7], ['a','b','c'])
    actual_df = my_df_transformer(input_df)
    assert input_df.collect() == actual_df.collect()
Copy

存储过程的集成测试

要测试存储过程处理程序,请使用会话夹具调用存储过程处理程序。如果您的存储过程读取自表(例如 ETL 管道中),则可以在调用存储过程处理程序之前创建这些表,如下方示例所示。这种模式可确保您的输入数据在源代码管理中得到跟踪,并且在两次测行之间不会发生意外变化。

from project import my_sproc_handler  # import stored proc handler

def test_my_sproc_handler(session: Session):

    # Create input table
    input_tbl = session.create_dataframe(
        data=[...],
        schema=[...],
    )

    input_tbl.write.mode('overwrite').save_as_table(['DB', 'SCHEMA', 'INPUT_TBL'], mode='overwrite')

    # Create expected output dataframe
    expected_df = session.create_dataframe(
        data=[...],
        schema=[...],
    ).collect()

    # Call the stored procedure
    my_sproc_handler()

    # Get actual table
    actual_tbl = session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).collect()

    # Clean up tables
    session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).delete()
    session.table(['DB', 'SCHEMA', 'INPUT_TBL']).delete()

    # Compare the actual and expected tables
    assert expected_df == actual_tbl
Copy
语言: 中文