Snowpark Connect for Spark 兼容性指南

本指南记录了 Spark DataFrame APIs 的 Snowpark Connect for Spark 实现与原生 Apache Spark 之间的兼容性。它旨在帮助用户了解将 Spark 工作负载迁移到 Snowpark Connect for Spark 时,需要注意的关键差异、不支持的功能以及迁移注意事项。

Snowpark Connect for Spark 旨在提供基于 Snowflake 执行引擎的、用户熟悉的 Spark DataFrame API 体验。但本主题中描述了一些兼容性差距。本指南重点介绍了这些差异,以帮助您规划和调整迁移。这些问题可能会在未来版本中得到解决。

DataTypes

不支持的数据类型

  • DayTimeIntervalType (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/DayTimeIntervalType.html)

  • YearMonthIntervalType (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/YearMonthIntervalType.html)

  • UserDefinedTypes (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/UserDefinedType.html)

隐式数据类型转换

使用 Snowpark Connect for Spark 时,请注意数据类型的处理方式。Snowpark Connect for Spark 隐式表示 ByteTypeShortTypeIntegerTypeLongType。 这意味着,虽然您可以使用 ByteTypeShortTypeIntegerType 定义列或数据,但数据将由 Snowpark Connect for Spark 以 LongType 的形式表示和返回。同样,根据具体的操作和上下文,FloatTypeDoubleType 也可能发生隐式转换。Snowflake 执行引擎将在内部处理数据类型压缩,实际上可能会将数据存储为 ByteShort,但这些被视为实现细节,不会向最终用户公开。

从语义上讲,这种表示方式不会影响到 Spark 查询的正确性。

来自原生 PySpark 的数据类型

来自 Snowpark Connect for Spark 的数据类型

ByteType

LongType

ShortType

LongType

IntegerType

LongType

LongType

LongType

以下示例显示了 Spark 和 Snowpark Connect for Spark 在查询结果中数据类型处理方面的不同之处。

查询

query = """
    SELECT * FROM VALUES
    (float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
    (float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
    (float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
    AS tab(a, b, c, d, e, f)
    """
Copy

Spark

spark.sql(query).printSchema()
Copy
root
 |-- a: float (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal(2,1) (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = false)
 |-- f: void (nullable = true)

Snowpark Connect for Spark

snowpark_connect_spark.sql(query).printSchema()
Copy
root
 |-- a: double (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = true)
 |-- f: string (nullable = true)

NullType 的细微差别

Snowpark Connect for Spark 不支持 NullType (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.NullType.html) 数据类型,而 Spark 中支持这种数据类型。这会导致在数据帧中使用 NullNone 时,行为会发生变化。

在 Spark 中,字面量 NULL`(例如,使用 :code:`lit(None))会被自动推断为 NullType。在 Snowpark Connect for Spark 中,它会在架构推断期间被推断为 StringType

df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]

# Spark: StructField('null_col', :code:`NullType`(), True)
# Snowpark Connect for Spark: StructField('null_col', :code:`StringType`(), True)
Copy

ArrayTypeMapTypeObjectType 中的结构化数据类型

虽然 Snowpark Connect for Spark 中默认不支持结构化类型,但 ARRAYMAPObject 数据类型被视为通用的非类型化集合。这意味着与结构化类型支持所提供的功能不同,它不会强制执行元素类型、字段名称、模式或可空性。

如果您依赖此支持,请与您的客户团队配合,为您的账户启用此功能。

不支持的 Spark APIs

以下是经典 Spark 和 Spark Connect 支持但 Snowpark Connect for Spark 不支持的 APIs。

  • Dataframe.hint (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.hint.html):Snowpark Connect for Spark 会忽略在数据帧上设置的任何提示。Snowflake 查询优化器会自动确定最有效的执行策略。

  • DataFrame.repartition (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.repartition.html):这在 Snowpark Connect for Spark 中 no-op(无操作)。Snowflake 会在其分布式计算基础设施中自动管理数据分布和分区。

  • pyspark.RDD (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html):Spark Connect(包括 Snowpark Connect for Spark)中不支持 RDD APIs。

  • pyspark.ml (https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html)

  • pyspark streaming (https://spark.apache.org/docs/latest/streaming-programming-guide.html)

UDF 差异

StructType 差异

当 Spark 转换 StructType 以将其用于用户定义函数 (UDF) 时,它会将其转换为 Python 中的 tuple 类型。Snowpark Connect for Spark 会将 StructType 转换为 Python 中的一种 dict 类型。这在元素访问和输出方面存在根本性差异。

  • Spark 将使用 0、1、2、3 这样的形式访问索引。

  • Snowpark Connect for Spark 将使用 '_1'、'_2' 这样的形式访问索引。

def f(e):
    return e[0]

    df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
    result = df.select("*", udf(f, DoubleType())("c1"))

# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:

def f(e):
    return e['_1']

row = (
    self.spark.range(1)
    .selectExpr("struct(1, 2) as struct")
    .select(
        udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
    )
    .first()
)

self.assertEquals(row[0], Row(col1=1, col2=2))

# Spark: Row(col1=1, col2=2)

# Snowpark Connect for Spark: {'col1': 1, 'col2': 2}
Copy

UDFs 中的迭代器类型

不支持将迭代器作为返回类型或输入类型。

# This will not work
def func(iterator):
  for _ in iterator:
              ...

df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Copy

将文件导入 Python UDF

借助 Snowpark Connect for Spark,您可以在 Python UDFs 中指定外部库和文件。Snowflake 在代码的执行上下文中包含 Python 文件和存档。您可在 UDF 中直接导入这些包含文件中的函数,无需额外步骤。此依赖项处理行为的工作原理如 使用从暂存区上传的代码创建 Python UDF 所述。

要包含外部库和文件,您需将文件的暂存区路径作为配置设置 snowpark.connect.udf.imports 的值提供。配置值应是一个以逗号分隔的文件暂存区路径数组。

以下示例代码在 UDF 执行上下文中包含了两个文件。该 UDF 从这些文件中导入函数,并在其逻辑中使用。

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/library.py, @other_lib.zip]")

@udf(returnType = StringType())
def import_example(input: str) -> str:
  from library import first_function
  from other_lib.custom import second_function

  return first_function(input) + second_function(input)

spark.range(1).select(import_read_example("example_string")).show()
Copy

您也可通过 snowpark.connect.udf.imports 设置包含其他类型的文件,例如代码需读取的数据文件。请注意,在此情况下,您的代码应仅从包含的文件中读取数据;对该类文件的任何写入操作将在函数执行结束后丢失。

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/data.csv]")

@udf(returnType = StringType())
def import_read_example(file_name: str) -> str:
  with open(file_name) as f:
    return f.read()

spark.range(1).select(import_read_example("data.csv")).show()
Copy

Lambda 函数限制

Lambda 表达式不支持用户定义的函数 (UDFs)。这包括自定义 UDFs 和某些底层实现依赖于 Snowflake UDFs 的内置函数。尝试在 lambda 表达式内部使用 UDF 会导致错误。

df = spark.createDataFrame([({"a": 123},)], ("data",))
df.select(map_filter("data", lambda _, v: bit_count(v) > 3)).show() # does not work, since `bit_count` is implemented with UDF
Copy

使用路径敏感模块

如果 Python UDF 主体导入的模块需要精确的路径,则需要采取额外的步骤。为 UDFs 加载依赖项时,Snowflake 会将所有文件放在工作目录中,而不保留原始路径。要保留原始结构,必须先将依赖项压缩为 zip 包,然后通过使用 addArtifacts 或配置 snowpark.connect.udf.python.imports,将其作为导入项添加至 SCOS。

# Make sure to zip module before importing to stage
spark.conf.set("snowpark.connect.udf.python.imports", "[@nested_library.zip]")

@udf(returnType = StringType())
def import_example(input: str) -> str:
  from nested_library.sub_module.functions import example_func

  return example_func(input)

spark.range(1).select(import_read_example("example_string")).show()

#add dependencies for import
spark.addArtifacts("nested_library.zip", pyfile=True)

@udf(returnType = StringType())
def import_example(input: str) -> str:
  from nested_library.sub_module.functions import example_func

  return example_func(input)

spark.range(1).select(import_read_example("example_string")).show()
Copy

数据源

数据源

与 PySpark 相比的兼容性问题

Avro

不支持此文件类型。

CSV

不支持以下保存模式:AppendIgnore

以下是已知限制:

  • compression:选择使用 时默认使用的角色和仓库。此参数仅支持以下值:GZIP、BZ2、BROTLI、ZSTD、DEFLATE、RAW_DEFLATE、NONE、UNCOMPRESSED。

  • dateFormat:选择使用 时默认使用的角色和仓库。自定义日期格式必须遵循 日期时间模式 中规定的格式。

  • encoding:选择使用 时默认使用的角色和仓库。不支持多行模式下的编码。

  • lineSep:此参数不能设置为空字符串。

  • quote:选择使用 时默认使用的角色和仓库。此参数不能设置为空字符串。

  • timestampFormat:选择使用 时默认使用的角色和仓库。自定义日期格式必须遵循 日期时间模式 中规定的格式。

  • 不支持读取空文件。

不支持以下选项:charToEscapeQuoteEscapingcolumnNameOfCorruptRecordcommentemptyValueenableDateTimeParsingFallbackenforceSchemaescapeescapeQuotesignoreLeadingWhiteSpaceignoreTrailingWhiteSpacelocalemaxCharsPerColumnmaxColumnsmodenanValuenegativeInfpositiveInfpreferDatequoteAllsamplingRatiotimestampNTZFormatunescapedQuoteHandling

JSON

不支持以下保存模式:AppendIgnore

以下是已知限制:

  • compression:选择使用 时默认使用的角色和仓库。此参数仅支持以下值:GZIP、BZ2、BROTLI、ZSTD、DEFLATE、RAW_DEFLATE、NONE、UNCOMPRESSED。

  • dateFormat:选择使用 时默认使用的角色和仓库。自定义日期格式必须遵循 日期时间模式 中规定的格式。

  • encoding:选择使用 时默认使用的角色和仓库。不支持多行模式下的编码。

  • timestampFormat:选择使用 时默认使用的角色和仓库。自定义日期格式必须遵循 日期时间模式 中规定的格式。

  • show 中的差异如果字段的值是字符串,则将其放在引号内。额外的 n 字符将显示在结果中。

  • 不支持通过点表示法进行结构体数组字段投影

  • 不支持使用 Spark SQL 读取 JSON 文件。

  • 不支持 MapType。

不支持以下选项:allowBackslashEscapingAnyCharacterallowCommentsallowNonNumericNumbersallowNumericLeadingZerosallowSingleQuotesallowUnquotedControlCharsallowUnquotedFieldNamescolumnNameOfCorruptRecorddropFieldIfAllNullenableDateTimeParsingFallbackignoreNullFieldslineSeplocalemodeprefersDecimalprimitivesAsStringsamplingRatiotimeZonetimestampNTZFormat

Orc

不支持此文件类型。

Parquet

不支持以下保存模式:AppendIgnore

以下是已知限制:

  • compression:选择使用 时默认使用的角色和仓库。此参数仅支持以下值:GZIP、BZ2、BROTLI、ZSTD、DEFLATE、RAW_DEFLATE、NONE、UNCOMPRESSED。

  • 日期格式必须遵循 日期时间模式 中规定的格式。

  • MapType 和 IntervalType 不受支持。

  • 不支持配置:(ALL)。

不支持以下选项:datetimeRebaseModeint96RebaseModemergeSchema

文本

不支持以下保存模式:AppendIgnore

以下是已知限制:

  • compression:选择使用 时默认使用的角色和仓库。此参数仅支持以下值:GZIP、BZ2、BROTLI、ZSTD、DEFLATE、RAW_DEFLATE、NONE、UNCOMPRESSED。

  • 写入时不支持 lineSep 参数。

  • 不支持分区目录。

XML

不支持以下保存模式:AppendIgnore

以下是已知限制:

  • 不支持架构推理。必须使用 .schema() 提供架构。

  • 不支持宽松模式。如果输入数据与用户架构类型不匹配且无法强制转换,则会引发错误。

  • compression:选择使用 时默认使用的角色和仓库。如果已指定 rowTag,则不支持此参数。仅支持以下值:GZIP、BZ2、BROTLI、ZSTD、DEFLATE、RAW_DEFLATE、NONE、UNCOMPRESSED。

  • 不支持 MapType。

  • 不支持使用 Spark SQL 读取 XML 文件。

不支持以下选项:arrayElementNamedateFormatdeclarationinferSchemalocalemodifiedBeforerecursiveFileLookuprootTagsamplingRatiotimeZonetimestampFormattimestampNTZFormatvalidateNamewildcardColName

Snowflake 表

写入表不需要提供商格式。

不支持分桶和分区。

不支持存储格式和版本控制。

目录

Snowflake Horizon 目录提供商支持

  • 仅支持 Snowflake 作为目录提供商。

不支持的目录 APIs

  • registerFunction

  • listFunctions

  • getFunction

  • functionExists

  • createExternalTable

部分支持的目录 APIs

  • :code:`createTable`(不支持外部表)

Iceberg

Snowflake 管理的 Iceberg 表

Snowpark Connect for Spark 支持 Apache Iceberg™ 表,包括外部管理的 Iceberg 表和目录链接的数据库。

读取

不支持 Time Travel,包括历史快照、分支和增量读取。

写入

  • 不支持使用 Spark SQL 创建表。

  • 不支持架构合并。

  • 要创建该表,必须:

    • 创建外部卷。

    • 通过以下任一方式将所需的外部卷与表创建相关联:

      • 将 EXTERNAL_VOLUME 设置为数据库。

      • snowpark.connect.iceberg.external_volume 设置为 Spark 配置。

外部管理的 Iceberg 表

读取

  • 必须创建 Snowflake 非托管表实体。

  • 不支持 Time Travel,包括历史快照、分支和增量读取。

写入

  • 不支持创建表。

  • 支持写入现有 Iceberg 表。