Snowpark Connect for Spark compatibility guide

This guide documents the compatibility between the Snowpark Connect for Spark implementation of the Spark DataFrame APIs and native Apache Spark. It is intended to help users understand the key differences, unsupported features, and migration considerations when moving Spark workloads to Snowpark Connect for Spark.

Snowpark Connect for Spark aims to provide a familiar Spark DataFrame API experience on top of the Snowflake execution engine. However, there are the compatibility gaps described in this topic. This guide highlights those differences to help you plan and adapt your migration. These might be addressed in a future release.

DataTypes

不支持的数据类型

隐式数据类型转换

When using Snowpark Connect for Spark, keep in mind how data types are handled. Snowpark Connect for Spark implicitly represents ByteType, ShortType, and IntegerType as LongType. This means that while you might define columns or data with ByteType, ShortType, or IntegerType, the data will be represented and returned by Snowpark Connect for Spark as LongType. Similarly, implicit conversion might also occur for FloatType and DoubleType depending on the specific operations and context. The Snowflake execution engine will internally handle data type compression and may in fact store the data as Byte or Short, but these are considered implementation details and not exposed to the end user.

从语义上讲,这种表示方式不会影响到 Spark 查询的正确性。

来自原生 PySpark 的数据类型Data type from Snowpark Connect for Spark
ByteTypeLongType
ShortTypeLongType
IntegerTypeLongType
LongTypeLongType

The following example shows a difference in how Spark and Snowpark Connect for Spark handle data types in query results.

查询

query = """
    SELECT * FROM VALUES
    (float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
    (float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
    (float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
    AS tab(a, b, c, d, e, f)
    """

Spark

spark.sql(query).printSchema()
root
 |-- a: float (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal(2,1) (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = false)
 |-- f: void (nullable = true)

Snowpark Connect for Spark

snowpark_connect_spark.sql(query).printSchema()
root
 |-- a: double (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = true)
 |-- f: string (nullable = true)

NullType nuance

Snowpark Connect for Spark doesn’t support the NullType (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.NullType.html) datatype, which is a supported data type in Spark. This causes behavior changes when using Null or None in dataframes.

In Spark, a literal NULL (for example, with lit(None)) is automatically inferred as a NullType. In Snowpark Connect for Spark, it is inferred as a StringType during schema inference.

df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]

# Spark: StructField('null_col', :code:`NullType`(), True)
# Snowpark Connect for Spark: StructField('null_col', :code:`StringType`(), True)

Structured data types in ArrayType, MapType, and ObjectType

While structured type support is not available by default in Snowpark Connect for Spark, ARRAY, MAP and Object datatypes are treated as generic, untyped collections. This means there is no enforcement of element types, field names, schema, or nullability, unlike what would be provided by structured type support.

如果您依赖此支持,请与您的客户团队配合,为您的账户启用此功能。

不支持的 Spark APIs

The following are the APIs supported by classic Spark and Spark Connect but not supported in Snowpark Connect for Spark.

UDF 差异

StructType differences

When Spark converts a StructType to be used in a user-defined function (UDF), it converts it to a tuple type in Python. Snowpark Connect for Spark will convert a StructType into a dict type in Python. This has fundamental differences in element access and output.

  • Spark 将使用 0、1、2、3 这样的形式访问索引。
  • Snowpark Connect for Spark will access indexes using ‘_1’, ‘_2’, and so on.
def f(e):
    return e[0]

    df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
    result = df.select("*", udf(f, DoubleType())("c1"))

# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:

def f(e):
    return e['_1']

row = (
    self.spark.range(1)
    .selectExpr("struct(1, 2) as struct")
    .select(
        udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
    )
    .first()
)

self.assertEquals(row[0], Row(col1=1, col2=2))

# Spark: Row(col1=1, col2=2)

# Snowpark Connect for Spark: {'col1': 1, 'col2': 2}

UDFs 中的迭代器类型

不支持将迭代器作为返回类型或输入类型。

# This will not work
def func(iterator):
  for _ in iterator:
              ...

df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()

将文件导入 Python UDF

With Snowpark Connect for Spark, you can specify external libraries and files in Python UDFs. Snowflake includes Python files and archives in your code’s execution context. You can import functions from these included files in a UDF without additional steps. This dependency-handling behavior works as described in Creating a Python UDF with code uploaded from a stage.

To include external libraries and files, you provide stage paths to the files as the value of the configuration setting snowpark.connect.udf.imports. The configuration value should be an array of stage paths to the files, where the paths are separated by commas.

以下示例代码在 UDF 执行上下文中包含了两个文件。该 UDF 从这些文件中导入函数,并在其逻辑中使用。

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/library.py, @other_lib.zip]")

@udf(returnType = StringType())
def import_example(input: str) -> str:
  from library import first_function
  from other_lib.custom import second_function

  return first_function(input) + second_function(input)

spark.range(1).select(import_read_example("example_string")).show()

You can use the snowpark.connect.udf.imports setting to include other kinds of files as well, such as those with data your code needs to read. Note that when you do this, your code should only read from the included files; any writes to such files will be lost after the function’s execution ends.

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/data.csv]")

@udf(returnType = StringType())
def import_read_example(file_name: str) -> str:
  with open(file_name) as f:
    return f.read()

spark.range(1).select(import_read_example("data.csv")).show()

Lambda 函数限制

Lambda 表达式不支持用户定义的函数 (UDFs)。这包括自定义 UDFs 和某些底层实现依赖于 Snowflake UDFs 的内置函数。尝试在 lambda 表达式内部使用 UDF 会导致错误。

df = spark.createDataFrame([({"a": 123},)], ("data",))
df.select(map_filter("data", lambda _, v: bit_count(v) > 3)).show() # does not work, since `bit_count` is implemented with UDF

使用路径敏感模块

If the Python UDF body imports a module that requires a precise path, you need to take additional steps. When loading dependencies for UDFs, Snowflake puts all of the files in the working directory without preserving the original path. To preserve the original structure, you must zip dependencies and then add as an import for SCOS by using either addArtifacts or configuration snowpark.connect.udf.python.imports.

# Make sure to zip module before importing to stage
spark.conf.set("snowpark.connect.udf.python.imports", "[@nested_library.zip]")

@udf(returnType = StringType())
def import_example(input: str) -> str:
  from nested_library.sub_module.functions import example_func

  return example_func(input)

spark.range(1).select(import_read_example("example_string")).show()

#add dependencies for import
spark.addArtifacts("nested_library.zip", pyfile=True)

@udf(returnType = StringType())
def import_example(input: str) -> str:
  from nested_library.sub_module.functions import example_func

  return example_func(input)

spark.range(1).select(import_read_example("example_string")).show()

数据源

Data sourceCompatibility issues compared with PySpark
AvroFile type is not supported.
CSV

Save mode is not supported for the following: Append, Ignore.

以下是已知限制:

  • compression: This parameter supports only the following values: GZIP, BZ2, BROTLI, ZSTD, DEFLATE, RAW_DEFLATE, NONE, UNCOMPRESSED.
  • dateFormat: Custom date formats must follow the formats at Datetime Patterns.
  • encoding: Encoding in multiLine mode is not supported.
  • lineSep: This parameter cannot be set to an empty string.
  • quote: This parameter cannot be set to an empty string.
  • timestampFormat: Custom date formats must follow the formats at Datetime Patterns.
  • 不支持读取空文件。

The following options are not supported: charToEscapeQuoteEscaping, columnNameOfCorruptRecord, comment, emptyValue, enableDateTimeParsingFallback, enforceSchema, escape, escapeQuotes, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, locale, maxCharsPerColumn, maxColumns, mode, nanValue, negativeInf, positiveInf, preferDate, quoteAll, samplingRatio, timestampNTZFormat, unescapedQuoteHandling.

JSON

Save mode not supported for the following: Append, Ignore.

以下是已知限制:

  • compression: This parameter supports only the following values: GZIP, BZ2, BROTLI, ZSTD, DEFLATE, RAW_DEFLATE, NONE, UNCOMPRESSED.
  • dateFormat: Custom date formats must follow the formats at Datetime Patterns.
  • encoding: Encoding in multiline mode is not supported.
  • timestampFormat: Custom date formats must follow the formats at Datetime Patterns.
  • Difference in show: If the value of field is a string, it would be quoted. An extra n character would be shown in the result.
  • 不支持通过点表示法进行结构体数组字段投影
  • 不支持使用 Spark SQL 读取 JSON 文件。
  • 不支持 MapType。

The following options are not supported: allowBackslashEscapingAnyCharacter, allowComments, allowNonNumericNumbers, allowNumericLeadingZeros, allowSingleQuotes, allowUnquotedControlChars, allowUnquotedFieldNames, columnNameOfCorruptRecord, dropFieldIfAllNull, enableDateTimeParsingFallback, ignoreNullFields, lineSep, locale, mode, prefersDecimal, primitivesAsString, samplingRatio, timeZone, timestampNTZFormat.

OrcFile type is not supported.
Parquet

Save mode is not supported for the following: Append, Ignore.

以下是已知限制:

  • compression: This parameter supports only the following values: GZIP, BZ2, BROTLI, ZSTD, DEFLATE, RAW_DEFLATE, NONE, UNCOMPRESSED.
  • Date formats must follow the formats at Datetime Patterns.
  • MapType 和 IntervalType 不受支持。
  • 不支持配置:(ALL)。

The following options are not supported: datetimeRebaseMode, int96RebaseMode, mergeSchema.

Text

Save mode is not supported for the following: Append, Ignore.

以下是已知限制:

  • compression: This parameter supports only the following values: GZIP, BZ2, BROTLI, ZSTD, DEFLATE, RAW_DEFLATE, NONE, UNCOMPRESSED.
  • The lineSep parameter is not supported in write.
  • 不支持分区目录。
XML

Save mode is not supported for the following: Append, Ignore.

以下是已知限制:

  • Permissive mode is partially supported. If input data doesn’t match the specified schema type and can’t be coerced, an error might be thrown.
  • compression: This parameter is not supported when rowTag is specified. Supports only the following values: GZIP, BZ2, BROTLI, ZSTD, DEFLATE, RAW_DEFLATE, NONE, UNCOMPRESSED.
  • 不支持 MapType。
  • Reading an XML file with Spark SQL is not supported.

The following options are not supported: arrayElementName, dateFormat, declaration, locale, modifiedBefore, recursiveFileLookup, rootTag, timeZone, timestampFormat, timestampNTZFormat, validateName, wildcardColName.

Snowflake table

写入表不需要提供商格式。

不支持分桶和分区。

不支持存储格式和版本控制。

目录

Snowflake Horizon 目录提供商支持

  • 仅支持 Snowflake 作为目录提供商。

不支持的目录 APIs

  • registerFunction
  • listFunctions
  • getFunction
  • functionExists
  • createExternalTable

部分支持的目录 APIs

  • createTable (no external table support)

Iceberg

Snowflake 管理的 Iceberg 表

Snowpark Connect for Spark 支持 Apache Iceberg™ 表,包括外部管理的 Iceberg 表和目录链接的数据库。

读取

不支持 Time Travel,包括历史快照、分支和增量读取。

写入

  • 不支持使用 Spark SQL 创建表。
  • 不支持架构合并。
  • 要创建该表,必须:
    • 创建外部卷。
    • 通过以下任一方式将所需的外部卷与表创建相关联:
      • 将 EXTERNAL_VOLUME 设置为数据库。
      • Set snowpark.connect.iceberg.external_volume to Spark configuration.

外部管理的 Iceberg 表

读取

  • 必须创建 Snowflake 非托管表实体。
  • 不支持 Time Travel,包括历史快照、分支和增量读取。

写入

  • 不支持创建表。
  • 支持写入现有 Iceberg 表。