本地测试框架

本主题说明在使用 Snowpark Python 库时如何在本地测试代码。

本主题内容:

Snowpark Python 本地测试框架是一个模拟器,允许您在本地 Snowpark Python DataFrames 上创建和操作,无需连接到 Snowflake 账户。您可以使用此本地测试框架在开发机器或 CI(持续集成)管道中测试 DataFrame 操作,然后再将代码变更部署到账户。API 是相同的,因此您可以在本地或针对 Snowflake 账户运行测试,而无需更改代码。

先决条件

要使用本地测试框架,请执行以下操作:

  • 您必须使用版本 1.18.0 或更高版本的 Snowpark Python 库以及可选依赖项 localtest

  • 受支持的 Python 版本包括:

    • 3.9

    • 3.10

    • 3.11

安装 Snowpark Python 库

  • 要安装带有可选依赖项的库,请运行以下命令:

    pip install "snowflake-snowpark-python[localtest]"
    
    Copy

创建会话并启用本地测试

  1. 创建 Snowpark Session 并将本地测试配置设置为 True

    from snowflake.snowpark import Session
    
    session = Session.builder.config('local_testing', True).create()
    
    Copy
  2. 使用会话创建并在 DataFrames 上操作:

    df = session.create_dataframe([[1,2],[3,4]],['a','b'])
    df.with_column('c', df['a']+df['b']).show()
    
    Copy

加载数据

您可以基于 Python 基元、文件和 Pandas DataFrames 创建 Snowpark DataFrames。这对于指定测试用例的输入和预期输出非常有用。使用这种方法,数据受到来源管理,使得让测试数据与测试用例保持同步变得更加容易。

加载 CSV 数据

  • 要将 CSV 文件加载到 Snowpark DataFrame 中,首先调用 Session.file.put() 将文件加载到内存暂存区,然后使用 Session.read() 读取内容。

示例

假设有一个文件,data.csv,包含以下内容:

col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Copy

您可以使用以下代码将 data.csv 加载到 Snowpark DataFrame 中。您需要先将文件放到暂存区中;如果不这样做,您会收到错误消息“找不到文件”。

from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType


# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
    [
        StructField("col1", IntegerType()),
        StructField("col2", StringType()),
        StructField("col3", BooleanType()),
        StructField("col4", DoubleType()),
    ]
)

# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Copy

预期输出:

-------------------------------------
|"COL1"  |"COL2"  |"COL3"  |"COL4"  |
-------------------------------------
|1       |a       |True    |1.23    |
|2       |b       |False   |4.56    |
-------------------------------------

加载 pandas 数据

  • 要从 pandas DataFrame 创建 Snowpark Python DataFrame,请调用 create_dataframe 方法并将数据作为 pandas DataFrame 传递。

示例

import pandas as pd

pandas_df = pd.DataFrame(
    data={
        "col1": pd.Series(["value1", "value2"]),
        "col2": pd.Series([1.23, 4.56]),
        "col3": pd.Series([123, 456]),
        "col4": pd.Series([True, False]),
    }
)

dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Copy

预期输出:

-------------------------------------
|"col1"  |"col2"  |"col3"  |"col4"  |
-------------------------------------
|value1  |1.23    |123     |True    |
|value2  |4.56    |456     |False   |
-------------------------------------
  • 要将 Snowpark Python DataFrame 转换为 pandas DataFrame,请在 DataFrame 中调用 to_pandas 方法。

示例

from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType

dataframe = session.create_dataframe(
    data=[
        ["value1", 1.23, 123, True],
        ["value2", 4.56, 456, False],
    ],
    schema=StructType([
        StructField("col1", StringType()),
        StructField("col2", DoubleType()),
        StructField("col3", LongType()),
        StructField("col4", BooleanType()),
    ])
)

pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Copy

预期输出:

    COL1  COL2  COL3   COL4
0  value1  1.23   123   True
1  value2  4.56   456  False

为会话创建 PyTest 夹具

PyTest 夹具 (https://docs.pytest.org/en/6.2.x/fixture.html) 是在测试(或测试的模块)之前执行的函数,通常用于提供数据或与测试的连接。在此过程中,您会创建返回 Snowpark Session 对象的夹具。

  1. 如果您还没有目录,请创建 test 目录。

  2. test 目录中,创建包含以下内容且名为 conftest.py 的文件,其中 connection_parameters 是包含 Snowflake 账户凭证的词典。

    # test/conftest.py
    import pytest
    from snowflake.snowpark.session import Session
    
    def pytest_addoption(parser):
        parser.addoption("--snowflake-session", action="store", default="live")
    
    @pytest.fixture(scope='module')
    def session(request) -> Session:
        if request.config.getoption('--snowflake-session') == 'local':
            return Session.builder.config('local_testing', True).create()
        else:
            return Session.builder.configs(CONNECTION_PARAMETERS).create()
    
    Copy

有关词典格式的更多信息,请参阅 创建会话

pytest_addoption 的调用会向 pytest 命令添加一个名为 snowflake-session 的命令行。Session 夹具会检查此命令行选项,并根据其值创建一个本地或实时的 Session。这使您可以轻松地在本地和实时模式之间切换以进行测试,如以下命令行示例所示:

# Using local mode:
pytest --snowflake-session local

# Using live mode
pytest
Copy

SQL 操作

Session.sql(...) 在本地测试框架中不受支持。在可能的情况下使用 Snowpark 的 DataFrame APIs,在您必须使用 Session.sql(...) 的情况下,您可以使用 Python 的 unittest.mock.patch 模拟表格返回值,修补来自给定 Session.sql() 调用的预期响应。

在以下示例中,mock_sql() 映射 SQL 查询文本至所需的 DataFrame 响应。该条件语句会检查当前会话是否正在使用本地测试,如果是,则将补丁应用于 Session.sql() 方法。

from unittest import mock
from functools import partial

def test_something(pytestconfig, session):

    def mock_sql(session, sql_string):  # patch for SQL operations
        if sql_string == "select 1,2,3":
            return session.create_dataframe([[1,2,3]])
        else:
            raise RuntimeError(f"Unexpected query execution: {sql_string}")

    if pytestconfig.getoption('--snowflake-session') == 'local':
        with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
            assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
    else:
        assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Copy

启用本地测试后,由 DataFrame.save_as_table() 创建的所有表会在内存中另存为临时表,可以使用 Session.table() 进行检索。您可以使用受支持的 DataFrame 像往常一样在表上进行操作。

修补内置函数

snowflake.snowpark.functions 下的一些内置函数在本地测试框架中不受支持。如果您使用不受支持的函数,您可以使用 snowflake.snowpark.mock 中的 @patch 装饰器创建补丁。

对于要定义和实施的修补后函数,签名(参数列表)必须与内置函数的参数一致。本地测试框架使用以下规则将参数传递给修补后的函数:

  • 对于内置函数签名中 ColumnOrName 类型的参数,将 ColumnEmulator 作为修补函数的参数进行传递。ColumnEmulator 类似于包含列数据的 pandas.Series 对象。

  • 对于内置函数签名中 LiteralType 类型的参数,将字面量值作为修补函数的参数进行传递。

  • 否则,原始值将作为已修补函数的参数进行传递。

至于已修补函数的返回类型,预计会返回 ColumnEmulator 的实例,其类型与内置函数的 Column 返回类型相对应。

例如,内置函数 to_timestamp() 可以像这样修补:

import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType

@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
    ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
    ret_column.sf_type = ColumnType(TimestampType(), True)
    return ret_column
Copy

跳过测试用例

如果 PyTest 测试套件包含一个测试用例,该测试用例没有得到本地测试的良好支持,您可以使用 PyTest 的 mark.skipif 装饰器跳过这些用例。以下示例假定您已如前所述配置了会话和参数。该条件检查 local_testing_mode 是否设置为 local;如果已设置,则跳过测试用例并显示解释性消息。

import pytest

@pytest.mark.skipif(
    condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
    ...
Copy

注册 UDFs 和存储过程

您可以在本地测试框架中创建和调用用户定义函数 (UDFs) 和存储过程。要创建对象,您可以使用以下语法选项:

语法

UDF

存储过程

装饰器

@udf

@sproc

注册方法

udf.register()

sproc.register()

从文件注册的方法

udf.register_from_file()

sproc.register_from_file()

示例

以下代码示例使用装饰器创建 UDF 和存储过程,然后按名称调用两者:

from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import col, DataFrame
from snowflake.snowpark.functions import udf, sproc, call_udf
from snowflake.snowpark.types import IntegerType, StringType

# Create local session
session = Session.builder.config('local_testing', True).create()

# Create local table
table = 'example'
session.create_dataframe([[1,2],[3,4]],['a','b']).write.save_as_table(table)

# Register a UDF, which is called from the stored procedure
@udf(name='example_udf', return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
def example_udf(a, b):
    return a + b

# Register stored procedure
@sproc(name='example_proc', return_type=IntegerType(), input_types=[StringType()])
def example_proc(session, table_name):
    return session.table(table_name)\
        .with_column('c', call_udf('example_udf', col('a'), col('b')))\
        .count()

# Call the stored procedure by name
output = session.call('example_proc', table)

print(output)
Copy

限制

以下列表包含本地测试框架中的已知限制和行为差距。Snowflake 目前没有解决这些问题的计划。

  • 不支持原始 SQL 字符串和需要解析 SQL 字符串的操作,例如 session.sqlDataFrame.filter("col1 > 12")

  • 不支持异步操作。

  • 数据库对象(如表、存储过程和 UDFs)不会持久化到会话级别之外,所有操作都在内存中执行。例如,在一个模拟会话中注册的永久存储过程对其他模拟会话不可见。

  • 不支持与 字符串排序规则 相关的功能,如 Column.collate

  • VariantArrayObject 数据类型仅受标准 JSON 编码和解码支持。在 Snowflake 中,像 [1,2,,3,] 这样的表达式被认为是有效的 JSON,但在使用 Python 内置 JSON 功能的本地测试中并非如此。您可以指定模块级变量 snowflake.snowpark.mock.CUSTOM_JSON_ENCODERsnowflake.snowpark.mock.CUSTOM_JSON_DECODER 以替换默认设置。

  • 仅实施了 Snowflake 函数(包括窗口函数)的子集。要了解如何注入您自己的函数定义。请参阅 修补内置函数

    • 目前不支持修补排名相关函数。

  • SQL 格式模型 不受支持。例如,to_decimal 的模拟实现不处理可选参数 format

  • Snowpark Python 库没有内置的 Python API 来创建或删除暂存区,因此本地测试框架假设每个传入的暂存区都已经创建。

  • UDFs 和存储过程的当前实现不执行任何包验证。代码中引用的所有包都需要在程序执行之前安装。

  • 不支持查询标签。

  • 不支持查询历史记录。

  • 不支持沿袭。

  • 注册 UDF 或存储过程时,将忽略 parallelexecute_asstatement_paramssource_code_displayexternal_access_integrationssecretscomment 等可选参数。

  • 对于 Table.sample,SYSTEM 或 BLOCK 采样与 ROW 采样相同。

  • Snowflake 不对在存储过程中运行本地测试框架提供官方支持。存储过程中的本地测试模式会话可能会遇到或触发意外错误。

不支持的功能

以下是当前未在本地测试框架中实现的功能列表。Snowflake 正在积极努力解决这些问题。

一般而言,对这些功能的所有引用都应引发 NotImplementedError

  • UDTFs(用户定义的表函数)

  • UDAFs(用户定义的聚合函数)

  • 矢量化的 UDFs 和 UDTFs

  • 内置表函数

  • 表存储过程

  • GeometryGeographyVector 数据类型

  • 间隔表达式

  • 读取除 JSON 和 CSV 之外的文件格式

    • 对于受支持的文件格式,并非所有读取选项都受支持。例如,CSV 格式不支持 infer_schema

对于此处未列为不受支持或已知限制的任何功能,请查看 本地测试的功能请求 (https://github.com/snowflakedb/snowpark-python/issues?q=is%3Aopen+label%3A%22local+testing%22+label%3A%22feature%22+) 的最新列表,或在 snowpark-python GitHub 存储库中 创建功能请求 (https://github.com/snowflakedb/snowpark-python/issues/new/choose)。

已知问题

以下是本地测试框架中存在的已知问题或行为差距的列表。Snowflake 正在积极计划解决这些问题。

  • 不支持在 DataFrame.groupby 内部使用窗口函数或其他聚合操作。

    # Selecting window function expressions is supported
    df.select("key", "value", sum_("value").over(), avg("value").over())
    
    # Aggregating window function expressions is NOT supported
    df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
    
    Copy
  • 选择具有相同名称的列将仅返回一列。作为一种解决方法,使用 Column.alias 重命名列,使其具有不同的名称。

    df.select(lit(1), lit(1)).show() # col("a"), col("a")
    #---------
    #|"'1'"  |
    #---------
    #|1      |
    #|...    |
    #---------
    
    # Workaround: Column.alias
    DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2"))
    # "col1_1", "col1_2"
    
    Copy
  • 对于 Table.mergeTable.update,会话参数 ERROR_ON_NONDETERMINISTIC_UPDATEERROR_ON_NONDETERMINISTIC_MERGE 必须设置为 False。这意味着多重联接会更新其中一个匹配的行。

  • 不支持 GET 和 PUT 文件操作中的完全限定暂存区名称。数据库和架构名称被视为暂存区名称的一部分。

  • mock_to_char 实现仅支持在不同时间部分之间具有分隔符的格式的时间戳。

  • DataFrame.pivot 有一个名为 values 的参数,允许将透视限制为特定值。此时只能使用统计定义的值。使用子查询提供的值将引发错误。

  • 不支持从包含带有时区信息的时间戳的 pandas DataFrame 创建 DataFrame

对于此列表中未提及的任何问题,请查看 最新的未决问题列表 (https://github.com/snowflakedb/snowpark-python/issues?q=is%3Aopen+is%3Aissue+label%3A%22local+testing%22),或在 snowpark-python GitHub 存储库中 创建错误报告 (https://github.com/snowflakedb/snowpark-python/issues/new/choose)。

语言: 中文