本地测试框架¶
本主题说明在使用 Snowpark Python 库时如何在本地测试代码。
本主题内容:
Snowpark Python 本地测试框架是一个模拟器,允许您在本地 Snowpark Python DataFrames 上创建和操作,无需连接到 Snowflake 账户。您可以使用此本地测试框架在开发机器或 CI(持续集成)管道中测试 DataFrame 操作,然后再将代码变更部署到账户。API 是相同的,因此您可以在本地或针对 Snowflake 账户运行测试,而无需更改代码。
先决条件¶
要使用本地测试框架,请执行以下操作:
您必须使用版本 1.18.0 或更高版本的 Snowpark Python 库以及可选依赖项
localtest
。受支持的 Python 版本包括:
3.9
3.10
3.11
安装 Snowpark Python 库¶
要安装带有可选依赖项的库,请运行以下命令:
pip install "snowflake-snowpark-python[localtest]"
创建会话并启用本地测试¶
创建 Snowpark
Session
并将本地测试配置设置为True
:from snowflake.snowpark import Session session = Session.builder.config('local_testing', True).create()
使用会话创建并在 DataFrames 上操作:
df = session.create_dataframe([[1,2],[3,4]],['a','b']) df.with_column('c', df['a']+df['b']).show()
加载数据¶
您可以基于 Python 基元、文件和 Pandas DataFrames 创建 Snowpark DataFrames。这对于指定测试用例的输入和预期输出非常有用。使用这种方法,数据受到来源管理,使得让测试数据与测试用例保持同步变得更加容易。
加载 CSV 数据¶
要将 CSV 文件加载到 Snowpark DataFrame 中,首先调用
Session.file.put()
将文件加载到内存暂存区,然后使用Session.read()
读取内容。
示例
假设有一个文件,data.csv
,包含以下内容:
col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
您可以使用以下代码将 data.csv
加载到 Snowpark DataFrame 中。您需要先将文件放到暂存区中;如果不这样做,您会收到错误消息“找不到文件”。
from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType
# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
[
StructField("col1", IntegerType()),
StructField("col2", StringType()),
StructField("col3", BooleanType()),
StructField("col4", DoubleType()),
]
)
# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
预期输出:
-------------------------------------
|"COL1" |"COL2" |"COL3" |"COL4" |
-------------------------------------
|1 |a |True |1.23 |
|2 |b |False |4.56 |
-------------------------------------
加载 pandas 数据¶
要从 pandas DataFrame 创建 Snowpark Python DataFrame,请调用
create_dataframe
方法并将数据作为 pandas DataFrame 传递。
示例
import pandas as pd
pandas_df = pd.DataFrame(
data={
"col1": pd.Series(["value1", "value2"]),
"col2": pd.Series([1.23, 4.56]),
"col3": pd.Series([123, 456]),
"col4": pd.Series([True, False]),
}
)
dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
预期输出:
-------------------------------------
|"col1" |"col2" |"col3" |"col4" |
-------------------------------------
|value1 |1.23 |123 |True |
|value2 |4.56 |456 |False |
-------------------------------------
要将 Snowpark Python DataFrame 转换为 pandas DataFrame,请在 DataFrame 中调用
to_pandas
方法。
示例
from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType
dataframe = session.create_dataframe(
data=[
["value1", 1.23, 123, True],
["value2", 4.56, 456, False],
],
schema=StructType([
StructField("col1", StringType()),
StructField("col2", DoubleType()),
StructField("col3", LongType()),
StructField("col4", BooleanType()),
])
)
pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
预期输出:
COL1 COL2 COL3 COL4
0 value1 1.23 123 True
1 value2 4.56 456 False
为会话创建 PyTest 夹具¶
PyTest 夹具 (https://docs.pytest.org/en/6.2.x/fixture.html) 是在测试(或测试的模块)之前执行的函数,通常用于提供数据或与测试的连接。在此过程中,您会创建返回 Snowpark Session
对象的夹具。
如果您还没有目录,请创建
test
目录。在
test
目录中,创建包含以下内容且名为conftest.py
的文件,其中connection_parameters
是包含 Snowflake 账户凭证的词典。# test/conftest.py import pytest from snowflake.snowpark.session import Session def pytest_addoption(parser): parser.addoption("--snowflake-session", action="store", default="live") @pytest.fixture(scope='module') def session(request) -> Session: if request.config.getoption('--snowflake-session') == 'local': return Session.builder.config('local_testing', True).create() else: return Session.builder.configs(CONNECTION_PARAMETERS).create()
有关词典格式的更多信息,请参阅 创建会话。
对 pytest_addoption
的调用会向 pytest
命令添加一个名为 snowflake-session
的命令行。Session
夹具会检查此命令行选项,并根据其值创建一个本地或实时的 Session
。这使您可以轻松地在本地和实时模式之间切换以进行测试,如以下命令行示例所示:
# Using local mode:
pytest --snowflake-session local
# Using live mode
pytest
SQL 操作¶
Session.sql(...)
在本地测试框架中不受支持。在可能的情况下使用 Snowpark 的 DataFrame APIs,在您必须使用 Session.sql(...)
的情况下,您可以使用 Python 的 unittest.mock.patch
模拟表格返回值,修补来自给定 Session.sql()
调用的预期响应。
在以下示例中,mock_sql()
映射 SQL 查询文本至所需的 DataFrame 响应。该条件语句会检查当前会话是否正在使用本地测试,如果是,则将补丁应用于 Session.sql()
方法。
from unittest import mock
from functools import partial
def test_something(pytestconfig, session):
def mock_sql(session, sql_string): # patch for SQL operations
if sql_string == "select 1,2,3":
return session.create_dataframe([[1,2,3]])
else:
raise RuntimeError(f"Unexpected query execution: {sql_string}")
if pytestconfig.getoption('--snowflake-session') == 'local':
with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
else:
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
启用本地测试后,由 DataFrame.save_as_table()
创建的所有表会在内存中另存为临时表,可以使用 Session.table()
进行检索。您可以使用受支持的 DataFrame 像往常一样在表上进行操作。
修补内置函数¶
snowflake.snowpark.functions
下的一些内置函数在本地测试框架中不受支持。如果您使用不受支持的函数,您可以使用 snowflake.snowpark.mock
中的 @patch
装饰器创建补丁。
对于要定义和实施的修补后函数,签名(参数列表)必须与内置函数的参数一致。本地测试框架使用以下规则将参数传递给修补后的函数:
对于内置函数签名中
ColumnOrName
类型的参数,将ColumnEmulator
作为修补函数的参数进行传递。ColumnEmulator
类似于包含列数据的pandas.Series
对象。对于内置函数签名中
LiteralType
类型的参数,将字面量值作为修补函数的参数进行传递。否则,原始值将作为已修补函数的参数进行传递。
至于已修补函数的返回类型,预计会返回 ColumnEmulator
的实例,其类型与内置函数的 Column
返回类型相对应。
例如,内置函数 to_timestamp()
可以像这样修补:
import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType
@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
ret_column.sf_type = ColumnType(TimestampType(), True)
return ret_column
跳过测试用例¶
如果 PyTest 测试套件包含一个测试用例,该测试用例没有得到本地测试的良好支持,您可以使用 PyTest 的 mark.skipif
装饰器跳过这些用例。以下示例假定您已如前所述配置了会话和参数。该条件检查 local_testing_mode
是否设置为 local
;如果已设置,则跳过测试用例并显示解释性消息。
import pytest
@pytest.mark.skipif(
condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
...
注册 UDFs 和存储过程¶
您可以在本地测试框架中创建和调用用户定义函数 (UDFs) 和存储过程。要创建对象,您可以使用以下语法选项:
语法 |
UDF |
存储过程 |
---|---|---|
装饰器 |
|
|
注册方法 |
|
|
从文件注册的方法 |
|
|
示例
以下代码示例使用装饰器创建 UDF 和存储过程,然后按名称调用两者:
from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import col, DataFrame
from snowflake.snowpark.functions import udf, sproc, call_udf
from snowflake.snowpark.types import IntegerType, StringType
# Create local session
session = Session.builder.config('local_testing', True).create()
# Create local table
table = 'example'
session.create_dataframe([[1,2],[3,4]],['a','b']).write.save_as_table(table)
# Register a UDF, which is called from the stored procedure
@udf(name='example_udf', return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
def example_udf(a, b):
return a + b
# Register stored procedure
@sproc(name='example_proc', return_type=IntegerType(), input_types=[StringType()])
def example_proc(session, table_name):
return session.table(table_name)\
.with_column('c', call_udf('example_udf', col('a'), col('b')))\
.count()
# Call the stored procedure by name
output = session.call('example_proc', table)
print(output)
限制¶
以下列表包含本地测试框架中的已知限制和行为差距。Snowflake 目前没有解决这些问题的计划。
不支持原始 SQL 字符串和需要解析 SQL 字符串的操作,例如
session.sql
和DataFrame.filter("col1 > 12")
。不支持异步操作。
数据库对象(如表、存储过程和 UDFs)不会持久化到会话级别之外,所有操作都在内存中执行。例如,在一个模拟会话中注册的永久存储过程对其他模拟会话不可见。
不支持与 字符串排序规则 相关的功能,如
Column.collate
。Variant
、Array
和Object
数据类型仅受标准 JSON 编码和解码支持。在 Snowflake 中,像 [1,2,,3,] 这样的表达式被认为是有效的 JSON,但在使用 Python 内置 JSON 功能的本地测试中并非如此。您可以指定模块级变量snowflake.snowpark.mock.CUSTOM_JSON_ENCODER
和snowflake.snowpark.mock.CUSTOM_JSON_DECODER
以替换默认设置。仅实施了 Snowflake 函数(包括窗口函数)的子集。要了解如何注入您自己的函数定义。请参阅 修补内置函数。
目前不支持修补排名相关函数。
SQL 格式模型 不受支持。例如,
to_decimal
的模拟实现不处理可选参数format
。Snowpark Python 库没有内置的 Python API 来创建或删除暂存区,因此本地测试框架假设每个传入的暂存区都已经创建。
UDFs 和存储过程的当前实现不执行任何包验证。代码中引用的所有包都需要在程序执行之前安装。
不支持查询标签。
不支持查询历史记录。
不支持沿袭。
注册 UDF 或存储过程时,将忽略
parallel
、execute_as
、statement_params
、source_code_display
、external_access_integrations
、secrets
和comment
等可选参数。对于
Table.sample
,SYSTEM 或 BLOCK 采样与 ROW 采样相同。Snowflake 不对在存储过程中运行本地测试框架提供官方支持。存储过程中的本地测试模式会话可能会遇到或触发意外错误。
不支持的功能¶
以下是当前未在本地测试框架中实现的功能列表。Snowflake 正在积极努力解决这些问题。
一般而言,对这些功能的所有引用都应引发 NotImplementedError
:
UDTFs(用户定义的表函数)
UDAFs(用户定义的聚合函数)
矢量化的 UDFs 和 UDTFs
内置表函数
表存储过程
Geometry
、Geography
和Vector
数据类型间隔表达式
读取除 JSON 和 CSV 之外的文件格式
对于受支持的文件格式,并非所有读取选项都受支持。例如,CSV 格式不支持
infer_schema
。
对于此处未列为不受支持或已知限制的任何功能,请查看 本地测试的功能请求 (https://github.com/snowflakedb/snowpark-python/issues?q=is%3Aopen+label%3A%22local+testing%22+label%3A%22feature%22+) 的最新列表,或在 snowpark-python
GitHub 存储库中 创建功能请求 (https://github.com/snowflakedb/snowpark-python/issues/new/choose)。
已知问题¶
以下是本地测试框架中存在的已知问题或行为差距的列表。Snowflake 正在积极计划解决这些问题。
不支持在
DataFrame.groupby
内部使用窗口函数或其他聚合操作。# Selecting window function expressions is supported df.select("key", "value", sum_("value").over(), avg("value").over()) # Aggregating window function expressions is NOT supported df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
选择具有相同名称的列将仅返回一列。作为一种解决方法,使用
Column.alias
重命名列,使其具有不同的名称。df.select(lit(1), lit(1)).show() # col("a"), col("a") #--------- #|"'1'" | #--------- #|1 | #|... | #--------- # Workaround: Column.alias DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2")) # "col1_1", "col1_2"
对于
Table.merge
和Table.update
,会话参数ERROR_ON_NONDETERMINISTIC_UPDATE
和ERROR_ON_NONDETERMINISTIC_MERGE
必须设置为False
。这意味着多重联接会更新其中一个匹配的行。不支持 GET 和 PUT 文件操作中的完全限定暂存区名称。数据库和架构名称被视为暂存区名称的一部分。
mock_to_char
实现仅支持在不同时间部分之间具有分隔符的格式的时间戳。DataFrame.pivot
有一个名为values
的参数,允许将透视限制为特定值。此时只能使用统计定义的值。使用子查询提供的值将引发错误。不支持从包含带有时区信息的时间戳的 pandas
DataFrame
创建DataFrame
。
对于此列表中未提及的任何问题,请查看 最新的未决问题列表 (https://github.com/snowflakedb/snowpark-python/issues?q=is%3Aopen+is%3Aissue+label%3A%22local+testing%22),或在 snowpark-python
GitHub 存储库中 创建错误报告 (https://github.com/snowflakedb/snowpark-python/issues/new/choose)。