为 Snowpark Python 编写测试¶
本主题介绍如何在连接到 Snowflake 时测试您的 Snowpark 代码。您可以使用标准测试工具(例如 PyTest)来测试您的 Snowpark Python UDFs、DataFrame 转换和存储过程。
本主题内容:
全面的测试有助于防止出现意外的破坏性更改。单元测试能够验证一段代码是否按预期运行。集成测试有助于确保组件在端到端用例中正确地协同工作。
本文档中的示例使用 PyTest,这是热门的 Python 测试框架之一。有关其他指导和最佳实践,请参阅 PyTest 文档 (https://docs.pytest.org/en/7.4.x/)。
或者,您可以使用 Snowpark Python 本地测试框架在本地创建和运行 Snowpark Python DataFrames,而无需连接到 Snowflake 账户。有关更多信息,请参阅 本地测试框架。
设置您的测试¶
通过在您的项目中安装 conda install pytest
,运行 pip install pytest
或 PyTest。您也可以将其添加到您的 requirements.txt
或 Conda 环境文件中。
在源代码目录旁边创建一个 test
目录,并将您的单元和集成测试添加到其中。要查看示例,请参阅 ` Snowpark Python 项目模板 <https://github.com/Snowflake-Labs/snowpark-python-template/ (https://github.com/Snowflake-Labs/snowpark-python-template/)>`__。
为 Snowpark 会话创建 PyTest 夹具¶
PyTest 夹具是在测试(或测试模块)之前执行的函数,用于为测试提供数据或连接。在这种情况下,创建一个返回 Snowpark Session
对象的 PyTest 夹具。
如果您还没有目录,请创建
test
目录。在
conftest.py
下创建包含以下内容的test
,其中包含的connection_parameters
是附带 Snowflake 账户凭证的词典。有关词典格式的更多信息,请参阅 创建会话。将
Session
夹具创建为模块范围而不是文件范围的固定,防止创建多个会话并由于会话对象冲突而导致问题。
from snowflake.snowpark.session import Session
@pytest.fixture(scope='module')
def session(request) -> Session:
connection_parameters = {}
return Session.builder.configs(...).create()
UDFs 单元测试¶
您可以通过将 UDF 处理程序作为通用 Python 方法进行测试来测试您的 Python UDF 逻辑。
在您的
test
目录下为 UDF 单元测试创建文件。例如,将文件命名为test_functions.py
。导入 Python 方法进行测试。
对于每个测试场景,创建一个名为 :code:` test_ <scenario_to_test> ` 的 Python 方法。
例如,这是一个 Python UDF 处理程序:
def fahrenheit_to_celsius(temp_f: float) -> float:
"""
Converts fahrenheit to celsius
"""
return (float(temp_f) - 32) * (5/9)
您可以将此方法导入到测试文件 (test/test_functions.py
) 中,并将其作为通用 Python 方法进行测试。
import farenheit_to_celsius
def test_farenheit_to_celsius():
expected = 0.0
actual = farenheit_to_celsius(32)
assert expected == actual
DataFrame 转换单元测试¶
为 DataFrame 转换添加单元测试有助于防止意外错误和回归。为了使您的 DataFrame 逻辑易于测试,请将转换封装到一个 Python 方法中,该方法会将 DataFrames 视为要转换的输入并返回转换后的 DataFrames。
在下面的示例中,mf_df_transformer
包含转换逻辑。它可以导入到 Python 项目的其他模块中并轻松进行测试。
from snowflake.snowpark.dataframe import DataFrame, col
def my_df_tranformer(df: DataFrame) -> DataFrame:
return df \
.with_column('c', df['a']+df['b']) \
.filter(col('c') > 3)
要测试此转换,请执行以下步骤:
在
test
目录 (test/test_transformers.py
) 下为 DataFrame 测试创建文件test_transformers.py
为要测试的转换器创建测试方法:
test_my_df_transformer(session)
。这里的session
参数是指前一部分中创建的会话夹具。使用会话夹具,在测试方法中创建输入和预期输出 DataFrames。
将输入 DataFrame 传递给转换器,并将预期的 DataFrame 与转换器返回的实际 DataFrame 进行比较。
# test/test_transformers.py
import my_df_transformer
def test_my_df_transformer(session):
input_df = session.create_dataframe([[1,2],[3,4]], ['a', 'b'])
expected_df = session.create_dataframe([3,4,7], ['a','b','c'])
actual_df = my_df_transformer(input_df)
assert input_df.collect() == actual_df.collect()
存储过程的集成测试¶
要测试存储过程处理程序,请使用会话夹具调用存储过程处理程序。如果您的存储过程读取自表(例如 ETL 管道中),则可以在调用存储过程处理程序之前创建这些表,如下方示例所示。这种模式可确保您的输入数据在源代码管理中得到跟踪,并且在两次测行之间不会发生意外变化。
from project import my_sproc_handler # import stored proc handler
def test_my_sproc_handler(session: Session):
# Create input table
input_tbl = session.create_dataframe(
data=[...],
schema=[...],
)
input_tbl.write.mode('overwrite').save_as_table(['DB', 'SCHEMA', 'INPUT_TBL'], mode='overwrite')
# Create expected output dataframe
expected_df = session.create_dataframe(
data=[...],
schema=[...],
).collect()
# Call the stored procedure
my_sproc_handler()
# Get actual table
actual_tbl = session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).collect()
# Clean up tables
session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).delete()
session.table(['DB', 'SCHEMA', 'INPUT_TBL']).delete()
# Compare the actual and expected tables
assert expected_df == actual_tbl