本地测试框架¶
本主题说明在使用 Snowpark 库时如何在本地测试代码。
本主题内容:
Snowpark Python 本地测试框架允许您在本地 Snowpark Python DataFrames 上创建和操作,无需连接到 Snowflake 账户。您可以使用此本地测试框架在开发机器或 CI(持续集成)管道中,本地测试您的 DataFrame 操作,然后再将代码变更部署到账户。API 相同,因此您可以在本地或针对 Snowflake 账户运行测试,无需更改代码。
先决条件¶
要使用本地测试框架,请执行以下操作:
您必须使用版本 1.11.1 或更高版本的 Snowpark Python 库以及可选依赖项
pandas
。通过运行pip install "snowflake-snowpark-python[pandas]"
进行安装受支持的 Python 版本包括:
3.8
3.9
3.10
3.11
创建会话并启用本地测试¶
首先,创建一个 Snowpark Session
并将本地测试配置设置为 True
。
from snowflake.snowpark import Session
session = Session.builder.config('local_testing', True).create()
创建会话后,您可以使用它在 DataFrames 上进行创建和操作。
df = session.create_dataframe([[1,2],[3,4]],['a','b'])
df.with_column('c', df['a']+df['b']).show()
加载数据¶
您可以基于 Python 基元、文件和 Pandas DataFrames 创建 Snowpark DataFrames。这对于指定测试用例的输入和预期输出非常有用。通过这样做,数据受到来源管理,使得让测试数据与测试用例保持同步变得更加容易。
加载 CSV 数据¶
您可以通过首次调用 Session.file.put()
将文件加载到内存中阶段,然后使用 Session.read()
读取内容,将 CSV 文件加载到 Snowpark DataFrame 中。假设有一个文件,data.csv
,该文件包含以下内容:
col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
您可以使用以下代码将 data.csv
加载到 Snowpark DataFrame 中。您需要先将文件放到暂存区中。否则,您将收到找不到文件的错误。
from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType
# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
[
StructField("col1", IntegerType()),
StructField("col2", StringType()),
StructField("col3", BooleanType()),
StructField("col4", DoubleType()),
]
)
# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
dataframe.show()
的输出将是:
-------------------------------------
|"COL1" |"COL2" |"COL3" |"COL4" |
-------------------------------------
|1 |a |True |1.23 |
|2 |b |False |4.56 |
-------------------------------------
加载 Pandas 数据¶
您可以通过调用 create_dataframe
方法以 Pandas DataFrame 的形式传递数据,基于 Pandas DataFrame 创建 Snowpark Python DataFrame。
import pandas as pd
pandas_df = pd.DataFrame(
data={
"col1": pd.Series(["value1", "value2"]),
"col2": pd.Series([1.23, 4.56]),
"col3": pd.Series([123, 456]),
"col4": pd.Series([True, False]),
}
)
dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
dataframe.show()
输出以下内容:
-------------------------------------
|"col1" |"col2" |"col3" |"col4" |
-------------------------------------
|value1 |1.23 |123 |True |
|value2 |4.56 |456 |False |
-------------------------------------
Snowpark Python DataFrame 也可以通过调用DataFrame 的 to_pandas
方法转换为 Pandas DataFrame。
from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType
dataframe = session.create_dataframe(
data=[
["value1", 1.23, 123, True],
["value2", 4.56, 456, False],
],
schema=StructType([
StructField("col1", StringType()),
StructField("col2", DoubleType()),
StructField("col3", LongType()),
StructField("col4", BooleanType()),
])
)
pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
对 print(pandas_dataframe.to_string())
的调用输出以下内容:
COL1 COL2 COL3 COL4
0 value1 1.23 123 True
1 value2 4.56 456 False
为会议创建 PyTest 夹具¶
PyTest 夹具 (https://docs.pytest.org/en/6.2.x/fixture.html) 是在测试(或测试的模块)之前执行的函数,通常用于提供数据或与测试的连接。在本例中,创建一个返回 Snowpark Session
对象的夹具。首先,创建 test
目录(如果还没有)。然后,在 test
目录中,创建包含以下内容的文件 conftest.py
,其中 connection_parameters
是包含 Snowflake 账户凭证的词典。有关词典格式的更多信息,请参阅 创建会话。
# test/conftest.py
import pytest
from snowflake.snowpark.session import Session
def pytest_addoption(parser):
parser.addoption("--snowflake-session", action="store", default="live")
@pytest.fixture(scope='module')
def session(request) -> Session:
if request.config.getoption('--snowflake-session') == 'local':
return Session.builder.config('local_testing', True).create()
else:
return Session.builder.configs(CONNECTION_PARAMETERS).create()
对 pytest_addoption
的调用会向 pytest
命令添加一个名为 snowflake-session
的命令行。Session
夹具会检查此命令行选项,并根据其值创建一个本地或实时的 Session
。这使您可以轻松地在本地模式和实时模式之间切换以进行测试。
# Using local mode:
pytest --snowflake-session local
# Using live mode
pytest
SQL 操作¶
Session.sql(...)
在本地测试框架中不受支持。在可能的情况下使用 Snowpark 的DataFrame APIs,在您必须使用 Session.sql(...)
的情况下,您可以使用 Python 的 unittest.mock.patch
模拟表格返回值,修补来自给定 Session.sql()
调用的预期响应。
在下面的示例中,mock_sql()
映射 SQL 查询文本至所需的 DataFrame 响应。以下条件语句会检查当前会话是否正在使用本地测试,如果是,则将补丁应用于 Session.sql()
方法。
from unittest import mock
from functools import partial
def test_something(pytestconfig, session):
def mock_sql(session, sql_string): # patch for SQL operations
if sql_string == "select 1,2,3":
return session.create_dataframe([[1,2,3]])
else:
raise RuntimeError(f"Unexpected query execution: {sql_string}")
if pytestconfig.getoption('--snowflake-session') == 'local':
with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
else:
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
启用本地测试后,由 DataFrame.save_as_table()
创建的所有表会在内存中另存为临时表,可以使用 Session.table()
进行检索。您可以使用受支持的 DataFrame 像往常一样在表上进行操作。
修补内置函数¶
并非所有 snowflake.snowpark.functions
下的内置函数都在在本地测试框架中受支持。如果您使用不受支持的函数,则需要使用 snowflake.snowpark.mock
中的 @patch
装饰器创建补丁。
要定义和实施修补后的函数,签名(参数列表)必须与内置函数的参数一致。本地测试框架使用以下规则将参数传递给修补后的函数:
对于内置函数签名中
ColumnOrName
类型的参数,将ColumnEmulator
作为修补函数的参数进行传递。ColumnEmulator
类似于包含列数据的pandas.Series
对象。对于内置函数签名中
LiteralType
类型的参数,将字面量值作为修补函数的参数进行传递。否则,原始值将作为已修补函数的参数进行传递。
至于已修补函数的返回类型,预计会返回 ColumnEmulator
的实例,其类型与内置函数的 Column
返回类型相对应。
例如,内置函数 to_timestamp()
可以像这样修补:
import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType
@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
ret_column.sf_type = ColumnType(TimestampType(), True)
return ret_column
跳过测试用例¶
如果 PyTest 测试套件包含一个测试用例,该测试用例没有得到本地测试的良好支持,您可以使用 PyTest 的 mark.skipif
装饰器跳过这些用例。以下示例假定您已如前所述配置了会话和参数。该条件会检查 local_testing_mode
是否设置为了 local
,如果是,则跳过测试用例,并显示一条消息,解释跳过测试用例的原因。
import pytest
@pytest.mark.skipif(
condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
...
限制¶
不支持以下功能:
需要分析 SQL 字符串的原始 SQL 字符串和操作。例如,
session.sql
和DataFrame.filter("col1 > 12")
不受支持。UDFs、UDTFs 和存储过程。
表函数。
AsyncJobs。
会话操作,例如更改仓库、架构和其他会话属性。
Geometry
和Geography
数据类型:聚合窗口函数。
# Selecting window function expressions is supported df.select("key", "value", sum_("value").over(), avg("value").over()) # Aggregating window function expressions is NOT supported df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
其他限制包括:
Variant
、Array
和Object
数据类型仅受标准 JSON 编码和解码支持。在 Snowflake 中,像 [1,2,,3,] 这样的表达式被认为是有效的 JSON,但在使用 Python 内置 JSON 功能的本地测试中并非如此。您可以指定模块级变量snowflake.snowpark.mock.CUSTOM_JSON_ENCODER
和snowflake.snowpark.mock.CUSTOM_JSON_DECODER
以覆盖默认设置。仅实施了 Snowflake 函数(包括窗口函数)的子集。请参阅 修补内置函数,了解如何注入您自己的函数定义。
目前不支持修补排名相关函数。
选择具有相同名称的列将仅返回一列。作为一种解决方法,请使用
Column.alias
为列重命名以提供不同的名称。df.select(lit(1), lit(1)).show() # col("a"), col("a") #--------- #|"'1'" | #--------- #|1 | #|... | #--------- # Workaround: Column.alias DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2")) # "col1_1", "col1_2"
使用
Column.cast
的显式类型转换具有不支持以下格式字符串的输入限制:to_decimal
、to_number
、to_numeric
、to_double
、to_date
、to_time
、to_timestamp
以及输出限制:to_char
、to_varchar
、to_binary
。JSON 存储在
VariantType
中的字符串无法转换为Datetime
类型。对于
Table.merge
和Table.update
,仅支持将会话参数ERROR_ON_NONDETERMINISTIC_UPDATE
和ERROR_ON_NONDETERMINISTIC_MERGE
设置为False
的情况。这意味着多重联接会更新其中一个匹配的行。
受支持 APIs 的列表¶
Snowpark 会话¶
Session.createDataFrame
Session.create_dataframe
Session.flatten
Session.range
Session.table
输入/输出¶
DataFrameReader.csv
DataFrameReader.table
DataFrameWriter.saveAsTable
DataFrameWriter.save_as_table
DataFrame¶
DataFrame.agg
DataFrame.cache_result
DataFrame.col
DataFrame.collect
DataFrame.collect_nowait
DataFrame.copy_into_table
DataFrame.count
DataFrame.createOrReplaceTempView
DataFrame.createOrReplaceView
DataFrame.create_or_replace_temp_view
DataFrame.create_or_replace_view
DataFrame.crossJoin
DataFrame.cross_join
DataFrame.distinct
DataFrame.drop
DataFrame.dropDuplicates
DataFrame.drop_duplicates
DataFrame.dropna
DataFrame.except_
DataFrame.explain
DataFrame.fillna
DataFrame.filter
DataFrame.first
DataFrame.groupBy
DataFrame.group_by
DataFrame.intersect
DataFrame.join
DataFrame.limit
DataFrame.minus
DataFrame.natural_join
DataFrame.orderBy
DataFrame.order_by
DataFrame.rename
DataFrame.replace
DataFrame.rollup
DataFrame.sample
DataFrame.select
DataFrame.show
DataFrame.sort
DataFrame.subtract
DataFrame.take
DataFrame.toDF
DataFrame.toLocalIterator
DataFrame.toPandas
DataFrame.to_df
DataFrame.to_local_iterator
DataFrame.to_pandas
DataFrame.to_pandas_batches
DataFrame.union
DataFrame.unionAll
DataFrame.unionAllByName
DataFrame.unionByName
DataFrame.union_all
DataFrame.union_all_by_name
DataFrame.union_by_name
DataFrame.unpivot
DataFrame.where
DataFrame.withColumn
DataFrame.withColumnRenamed
DataFrame.with_column
DataFrame.with_column_renamed
DataFrame.with_columns
DataFrameNaFunctions.drop
DataFrameNaFunctions.fill
DataFrameNaFunctions.replace
列¶
Column.alias
Column.as_
Column.asc
Column.asc_nulls_first
Column.asc_nulls_last
Column.astype
Column.between
Column.bitand
Column.bitor
Column.bitwiseAnd
Column.bitwiseOR
Column.bitwiseXOR
Column.bitxor
Column.cast
Column.collate
Column.desc
Column.desc_nulls_first
Column.desc_nulls_last
Column.endswith
Column.eqNullSafe
Column.equal_nan
Column.equal_null
Column.getItem
Column.getName
Column.get_name
Column.in_
Column.isNotNull
Column.isNull
Column.is_not_null
Column.is_null
Column.isin
Column.like
Column.name
Column.over
Column.regexp
Column.rlike
Column.startswith
Column.substr
Column.substring
Column.try_cast
Column.within_group
CaseExpr.when
CaseExpr.otherwise
数据类型¶
ArrayType
BinaryType
BooleanType
ByteType
ColumnIdentifier
DataType
DateType
DecimalType
DoubleType
FloatType
IntegerType
LongType
MapType
NullType
ShortType
StringType
StructField
StructType
Timestamp
TimestampType
TimeType
Variant
VariantType
行¶
Row.asDict
Row.as_dict
Row.count
Row.index
函数¶
abs
avg
coalesce
contains
count
count_distinct
covar_pop
endswith
first_value
iff
lag
last_value
lead
list_agg
max
median
min
parse_json
row_number
startswith
substring
sum
to_array
to_binary
to_boolean
to_char
to_date
to_decimal
to_double
to_object
to_time
to_timestamp
to_variant
窗口¶
Window.orderBy
Window.order_by
Window.partitionBy
Window.partition_by
Window.rangeBetween
Window.range_between
Window.rowsBetween
Window.rows_between
WindowSpec.orderBy
WindowSpec.order_by
WindowSpec.partitionBy
WindowSpec.partition_by
WindowSpec.rangeBetween
WindowSpec.range_between
WindowSpec.rowsBetween
WindowSpec.rows_between
分组¶
RelationalGroupedDataFrame.agg
RelationalGroupedDataFrame.apply_in_pandas
RelationalGroupedDataFrame.applyInPandas
RelationalGroupedDataFrame.avg
RelationalGroupedDataFrame.builtin
RelationalGroupedDataFrame.count
RelationalGroupedDataFrame.function
RelationalGroupedDataFrame.max
RelationalGroupedDataFrame.mean
RelationalGroupedDataFrame.median
RelationalGroupedDataFrame.min
RelationalGroupedDataFrame.sum
表¶
Table.delete
Table.drop_table
Table.merge
Table.sample
Table.update
WhenMatchedClause.delete
WhenMatchedClause.update
WhenNotMatchedClause.insert