本地测试框架

本主题说明在使用 Snowpark 库时如何在本地测试代码。

本主题内容:

Snowpark Python 本地测试框架允许您在本地 Snowpark Python DataFrames 上创建和操作,无需连接到 Snowflake 账户。您可以使用此本地测试框架在开发机器或 CI(持续集成)管道中,本地测试您的 DataFrame 操作,然后再将代码变更部署到账户。API 相同,因此您可以在本地或针对 Snowflake 账户运行测试,无需更改代码。

先决条件

要使用本地测试框架,请执行以下操作:

  • 您必须使用版本 1.11.1 或更高版本的 Snowpark Python 库以及可选依赖项 pandas。通过运行 pip install "snowflake-snowpark-python[pandas]" 进行安装

  • 受支持的 Python 版本包括:

    • 3.8

    • 3.9

    • 3.10

    • 3.11

创建会话并启用本地测试

首先,创建一个 Snowpark Session 并将本地测试配置设置为 True

from snowflake.snowpark import Session

session = Session.builder.config('local_testing', True).create()
Copy

创建会话后,您可以使用它在 DataFrames 上进行创建和操作。

df = session.create_dataframe([[1,2],[3,4]],['a','b'])
df.with_column('c', df['a']+df['b']).show()
Copy

加载数据

您可以基于 Python 基元、文件和 Pandas DataFrames 创建 Snowpark DataFrames。这对于指定测试用例的输入和预期输出非常有用。通过这样做,数据受到来源管理,使得让测试数据与测试用例保持同步变得更加容易。

加载 CSV 数据

您可以通过首次调用 Session.file.put() 将文件加载到内存中阶段,然后使用 Session.read() 读取内容,将 CSV 文件加载到 Snowpark DataFrame 中。假设有一个文件,data.csv,该文件包含以下内容:

col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Copy

您可以使用以下代码将 data.csv 加载到 Snowpark DataFrame 中。您需要先将文件放到暂存区中。否则,您将收到找不到文件的错误。

from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType


# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
    [
        StructField("col1", IntegerType()),
        StructField("col2", StringType()),
        StructField("col3", BooleanType()),
        StructField("col4", DoubleType()),
    ]
)

# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Copy

dataframe.show() 的输出将是:

-------------------------------------
|"COL1"  |"COL2"  |"COL3"  |"COL4"  |
-------------------------------------
|1       |a       |True    |1.23    |
|2       |b       |False   |4.56    |
-------------------------------------

加载 Pandas 数据

您可以通过调用 create_dataframe 方法以 Pandas DataFrame 的形式传递数据,基于 Pandas DataFrame 创建 Snowpark Python DataFrame。

import pandas as pd

pandas_df = pd.DataFrame(
    data={
        "col1": pd.Series(["value1", "value2"]),
        "col2": pd.Series([1.23, 4.56]),
        "col3": pd.Series([123, 456]),
        "col4": pd.Series([True, False]),
    }
)

dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Copy

dataframe.show() 输出以下内容:

-------------------------------------
|"col1"  |"col2"  |"col3"  |"col4"  |
-------------------------------------
|value1  |1.23    |123     |True    |
|value2  |4.56    |456     |False   |
-------------------------------------

Snowpark Python DataFrame 也可以通过调用DataFrame 的 to_pandas 方法转换为 Pandas DataFrame。

from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType

dataframe = session.create_dataframe(
    data=[
        ["value1", 1.23, 123, True],
        ["value2", 4.56, 456, False],
    ],
    schema=StructType([
        StructField("col1", StringType()),
        StructField("col2", DoubleType()),
        StructField("col3", LongType()),
        StructField("col4", BooleanType()),
    ])
)

pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Copy

print(pandas_dataframe.to_string()) 的调用输出以下内容:

    COL1  COL2  COL3   COL4
0  value1  1.23   123   True
1  value2  4.56   456  False

为会议创建 PyTest 夹具

PyTest 夹具 (https://docs.pytest.org/en/6.2.x/fixture.html) 是在测试(或测试的模块)之前执行的函数,通常用于提供数据或与测试的连接。在本例中,创建一个返回 Snowpark Session 对象的夹具。首先,创建 test 目录(如果还没有)。然后,在 test 目录中,创建包含以下内容的文件 conftest.py,其中 connection_parameters 是包含 Snowflake 账户凭证的词典。有关词典格式的更多信息,请参阅 创建会话

# test/conftest.py
import pytest
from snowflake.snowpark.session import Session

def pytest_addoption(parser):
    parser.addoption("--snowflake-session", action="store", default="live")

@pytest.fixture(scope='module')
def session(request) -> Session:
    if request.config.getoption('--snowflake-session') == 'local':
        return Session.builder.config('local_testing', True).create()
    else:
        return Session.builder.configs(CONNECTION_PARAMETERS).create()
Copy

pytest_addoption 的调用会向 pytest 命令添加一个名为 snowflake-session 的命令行。Session 夹具会检查此命令行选项,并根据其值创建一个本地或实时的 Session。这使您可以轻松地在本地模式和实时模式之间切换以进行测试。

# Using local mode:
pytest --snowflake-session local

# Using live mode
pytest
Copy

SQL 操作

Session.sql(...) 在本地测试框架中不受支持。在可能的情况下使用 Snowpark 的DataFrame APIs,在您必须使用 Session.sql(...) 的情况下,您可以使用 Python 的 unittest.mock.patch 模拟表格返回值,修补来自给定 Session.sql() 调用的预期响应。

在下面的示例中,mock_sql() 映射 SQL 查询文本至所需的 DataFrame 响应。以下条件语句会检查当前会话是否正在使用本地测试,如果是,则将补丁应用于 Session.sql() 方法。

from unittest import mock
from functools import partial

def test_something(pytestconfig, session):

    def mock_sql(session, sql_string):  # patch for SQL operations
        if sql_string == "select 1,2,3":
            return session.create_dataframe([[1,2,3]])
        else:
            raise RuntimeError(f"Unexpected query execution: {sql_string}")

    if pytestconfig.getoption('--snowflake-session') == 'local':
        with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
            assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
    else:
        assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Copy

启用本地测试后,由 DataFrame.save_as_table() 创建的所有表会在内存中另存为临时表,可以使用 Session.table() 进行检索。您可以使用受支持的 DataFrame 像往常一样在表上进行操作。

修补内置函数

并非所有 snowflake.snowpark.functions 下的内置函数都在在本地测试框架中受支持。如果您使用不受支持的函数,则需要使用 snowflake.snowpark.mock 中的 @patch 装饰器创建补丁。

要定义和实施修补后的函数,签名(参数列表)必须与内置函数的参数一致。本地测试框架使用以下规则将参数传递给修补后的函数:

  • 对于内置函数签名中 ColumnOrName 类型的参数,将 ColumnEmulator 作为修补函数的参数进行传递。ColumnEmulator 类似于包含列数据的 pandas.Series 对象。

  • 对于内置函数签名中 LiteralType 类型的参数,将字面量值作为修补函数的参数进行传递。

  • 否则,原始值将作为已修补函数的参数进行传递。

至于已修补函数的返回类型,预计会返回 ColumnEmulator 的实例,其类型与内置函数的 Column 返回类型相对应。

例如,内置函数 to_timestamp() 可以像这样修补:

import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType

@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
    ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
    ret_column.sf_type = ColumnType(TimestampType(), True)
    return ret_column
Copy

跳过测试用例

如果 PyTest 测试套件包含一个测试用例,该测试用例没有得到本地测试的良好支持,您可以使用 PyTest 的 mark.skipif 装饰器跳过这些用例。以下示例假定您已如前所述配置了会话和参数。该条件会检查 local_testing_mode 是否设置为了 local,如果是,则跳过测试用例,并显示一条消息,解释跳过测试用例的原因。

import pytest

@pytest.mark.skipif(
    condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
    ...
Copy

限制

不支持以下功能:

  • 需要分析 SQL 字符串的原始 SQL 字符串和操作。例如,session.sqlDataFrame.filter("col1 > 12") 不受支持。

  • UDFs、UDTFs 和存储过程。

  • 表函数。

  • AsyncJobs。

  • 会话操作,例如更改仓库、架构和其他会话属性。

  • GeometryGeography 数据类型:

  • 聚合窗口函数。

    # Selecting window function expressions is supported
    df.select("key", "value", sum_("value").over(), avg("value").over())
    
    # Aggregating window function expressions is NOT supported
    df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
    
    Copy

其他限制包括:

  • VariantArrayObject 数据类型仅受标准 JSON 编码和解码支持。在 Snowflake 中,像 [1,2,,3,] 这样的表达式被认为是有效的 JSON,但在使用 Python 内置 JSON 功能的本地测试中并非如此。您可以指定模块级变量 snowflake.snowpark.mock.CUSTOM_JSON_ENCODERsnowflake.snowpark.mock.CUSTOM_JSON_DECODER 以覆盖默认设置。

  • 仅实施了 Snowflake 函数(包括窗口函数)的子集。请参阅 修补内置函数,了解如何注入您自己的函数定义。

  • 目前不支持修补排名相关函数。

  • 选择具有相同名称的列将仅返回一列。作为一种解决方法,请使用 Column.alias 为列重命名以提供不同的名称。

    df.select(lit(1), lit(1)).show() # col("a"), col("a")
    #---------
    #|"'1'"  |
    #---------
    #|1      |
    #|...    |
    #---------
    
    # Workaround: Column.alias
    DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2"))
    # "col1_1", "col1_2"
    
    Copy
  • 使用 Column.cast 的显式类型转换具有不支持以下格式字符串的输入限制:to_decimalto_numberto_numericto_doubleto_dateto_timeto_timestamp 以及输出限制:to_charto_varcharto_binary

  • JSON 存储在 VariantType 中的字符串无法转换为 Datetime 类型。

  • 对于 Table.mergeTable.update,仅支持将会话参数 ERROR_ON_NONDETERMINISTIC_UPDATEERROR_ON_NONDETERMINISTIC_MERGE 设置为 False 的情况。这意味着多重联接会更新其中一个匹配的行。

受支持 APIs 的列表

Snowpark 会话

Session.createDataFrame

Session.create_dataframe

Session.flatten

Session.range

Session.table

输入/输出

DataFrameReader.csv

DataFrameReader.table

DataFrameWriter.saveAsTable

DataFrameWriter.save_as_table

DataFrame

DataFrame.agg

DataFrame.cache_result

DataFrame.col

DataFrame.collect

DataFrame.collect_nowait

DataFrame.copy_into_table

DataFrame.count

DataFrame.createOrReplaceTempView

DataFrame.createOrReplaceView

DataFrame.create_or_replace_temp_view

DataFrame.create_or_replace_view

DataFrame.crossJoin

DataFrame.cross_join

DataFrame.distinct

DataFrame.drop

DataFrame.dropDuplicates

DataFrame.drop_duplicates

DataFrame.dropna

DataFrame.except_

DataFrame.explain

DataFrame.fillna

DataFrame.filter

DataFrame.first

DataFrame.groupBy

DataFrame.group_by

DataFrame.intersect

DataFrame.join

DataFrame.limit

DataFrame.minus

DataFrame.natural_join

DataFrame.orderBy

DataFrame.order_by

DataFrame.rename

DataFrame.replace

DataFrame.rollup

DataFrame.sample

DataFrame.select

DataFrame.show

DataFrame.sort

DataFrame.subtract

DataFrame.take

DataFrame.toDF

DataFrame.toLocalIterator

DataFrame.toPandas

DataFrame.to_df

DataFrame.to_local_iterator

DataFrame.to_pandas

DataFrame.to_pandas_batches

DataFrame.union

DataFrame.unionAll

DataFrame.unionAllByName

DataFrame.unionByName

DataFrame.union_all

DataFrame.union_all_by_name

DataFrame.union_by_name

DataFrame.unpivot

DataFrame.where

DataFrame.withColumn

DataFrame.withColumnRenamed

DataFrame.with_column

DataFrame.with_column_renamed

DataFrame.with_columns

DataFrameNaFunctions.drop

DataFrameNaFunctions.fill

DataFrameNaFunctions.replace

Column.alias

Column.as_

Column.asc

Column.asc_nulls_first

Column.asc_nulls_last

Column.astype

Column.between

Column.bitand

Column.bitor

Column.bitwiseAnd

Column.bitwiseOR

Column.bitwiseXOR

Column.bitxor

Column.cast

Column.collate

Column.desc

Column.desc_nulls_first

Column.desc_nulls_last

Column.endswith

Column.eqNullSafe

Column.equal_nan

Column.equal_null

Column.getItem

Column.getName

Column.get_name

Column.in_

Column.isNotNull

Column.isNull

Column.is_not_null

Column.is_null

Column.isin

Column.like

Column.name

Column.over

Column.regexp

Column.rlike

Column.startswith

Column.substr

Column.substring

Column.try_cast

Column.within_group

CaseExpr.when

CaseExpr.otherwise

数据类型

ArrayType

BinaryType

BooleanType

ByteType

ColumnIdentifier

DataType

DateType

DecimalType

DoubleType

FloatType

IntegerType

LongType

MapType

NullType

ShortType

StringType

StructField

StructType

Timestamp

TimestampType

TimeType

Variant

VariantType

Row.asDict

Row.as_dict

Row.count

Row.index

函数

abs

avg

coalesce

contains

count

count_distinct

covar_pop

endswith

first_value

iff

lag

last_value

lead

list_agg

max

median

min

parse_json

row_number

startswith

substring

sum

to_array

to_binary

to_boolean

to_char

to_date

to_decimal

to_double

to_object

to_time

to_timestamp

to_variant

窗口

Window.orderBy

Window.order_by

Window.partitionBy

Window.partition_by

Window.rangeBetween

Window.range_between

Window.rowsBetween

Window.rows_between

WindowSpec.orderBy

WindowSpec.order_by

WindowSpec.partitionBy

WindowSpec.partition_by

WindowSpec.rangeBetween

WindowSpec.range_between

WindowSpec.rowsBetween

WindowSpec.rows_between

分组

RelationalGroupedDataFrame.agg

RelationalGroupedDataFrame.apply_in_pandas

RelationalGroupedDataFrame.applyInPandas

RelationalGroupedDataFrame.avg

RelationalGroupedDataFrame.builtin

RelationalGroupedDataFrame.count

RelationalGroupedDataFrame.function

RelationalGroupedDataFrame.max

RelationalGroupedDataFrame.mean

RelationalGroupedDataFrame.median

RelationalGroupedDataFrame.min

RelationalGroupedDataFrame.sum

Table.delete

Table.drop_table

Table.merge

Table.sample

Table.update

WhenMatchedClause.delete

WhenMatchedClause.update

WhenNotMatchedClause.insert

语言: 中文