Snowpark Connect for Spark 兼容性指南

本指南记录了 Spark DataFrame APIs 的 Snowpark Connect for Spark 实现与原生 Apache Spark 之间的兼容性。它旨在帮助用户了解将 Spark 工作负载迁移到 Snowpark Connect for Spark 时,需要注意的关键差异、不支持的功能以及迁移注意事项。

Snowpark Connect for Spark 旨在提供基于 Snowflake 执行引擎的、用户熟悉的 Spark DataFrame API 体验。但本主题中描述了一些兼容性差距。本指南重点介绍了这些差异,以帮助您规划和调整迁移。这些问题可能会在未来版本中得到解决。

DataTypes

不支持的数据类型

  • DayTimeIntervalType (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/DayTimeIntervalType.html)

  • YearMonthIntervalType (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/YearMonthIntervalType.html)

  • UserDefinedTypes (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/UserDefinedType.html)

隐式数据类型转换

使用 Snowpark Connect for Spark 时,请注意数据类型的处理方式。Snowpark Connect for Spark 隐式表示 ByteTypeShortTypeIntegerTypeLongType。这意味着,虽然您可以使用 ByteTypeShortTypeIntegerType 定义列或数据,但数据将由 Snowpark Connect for Spark 以 LongType 的形式表示和返回。 同样,根据具体的操作和上下文,FloatTypeDoubleType 也可能发生隐式转换。Snowflake 执行引擎将在内部处理数据类型压缩,实际上可能会将数据存储为 ByteShort,但这些被视为实现细节,不会向最终用户公开。

从语义上讲,这种表示方式不会影响到 Spark 查询的正确性。

来自原生 PySpark 的数据类型

来自 Snowpark Connect for Spark 的数据类型

ByteType

LongType

ShortType

LongType

IntegerType

LongType

LongType

LongType

以下示例显示了 Spark 和 Snowpark Connect for Spark 在查询结果中数据类型处理方面的不同之处。

查询

query = """
    SELECT * FROM VALUES
    (float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
    (float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
    (float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
    AS tab(a, b, c, d, e, f)
    """
Copy

Spark

spark.sql(query).printSchema()
Copy
root
 |-- a: float (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal(2,1) (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = false)
 |-- f: void (nullable = true)

Snowpark Connect for Spark

snowpark_connect_spark.sql(query).printSchema()
Copy
root
 |-- a: double (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = true)
 |-- f: string (nullable = true)

NullType 的细微差别

Snowpark Connect for Spark 不支持 NullType (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.NullType.html) 数据类型,而 Spark 中支持这种数据类型。这会导致在数据帧中使用 NullNone 时,行为会发生变化。

在 Spark 中,字面量 NULL`(例如,使用 :code:`lit(None))会被自动推断为 NullType。在 Snowpark Connect for Spark 中,它会在架构推断期间被推断为 StringType

df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]

# Spark: StructField('null_col', :code:`NullType`(), True)
# |spconnect|: StructField('null_col', :code:`StringType`(), True)
Copy

ArrayTypeMapTypeObjectType 中的结构化数据类型

虽然 Snowpark Connect for Spark 中默认不支持结构化类型,但 ARRAYMAPObject 数据类型被视为通用的非类型化集合。这意味着与结构化类型支持所提供的功能不同,它不会强制执行元素类型、字段名称、模式或可空性。

如果您依赖此支持,请与您的客户团队配合,为您的账户启用此功能。

不支持的 Spark APIs

以下是经典 Spark 和 Spark Connect 支持但 Snowpark Connect for Spark 不支持的 APIs。

  • Dataframe.hint (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.hint.html):Snowpark Connect for Spark 会忽略在数据帧上设置的任何提示。Snowflake 查询优化器会自动确定最有效的执行策略。

  • DataFrame.repartition (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.repartition.html):这在 Snowpark Connect for Spark 中 no-op(无操作)。Snowflake 会在其分布式计算基础设施中自动管理数据分布和分区。

  • pyspark.RDD (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html):Spark Connect(包括 Snowpark Connect for Spark)中不支持 RDD APIs。

  • pyspark.ml (https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html)

  • pyspark streaming (https://spark.apache.org/docs/latest/streaming-programming-guide.html)

UDF 差异

StructType 差异

当 Spark 转换 StructType 以将其用于用户定义函数 (UDF) 时,它会将其转换为 Python 中的 tuple 类型。Snowpark Connect for Spark 会将 StructType 转换为 Python 中的一种 dict 类型。这在元素访问和输出方面存在根本性差异。

  • Spark 将使用 0、1、2、3 这样的形式访问索引。

  • Snowpark Connect for Spark 将使用 '_1'、'_2' 这样的形式访问索引。

def f(e):
    return e[0]

    df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
    result = df.select("*", udf(f, DoubleType())("c1"))

# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:

def f(e):
    return e['_1']

row = (
    self.spark.range(1)
    .selectExpr("struct(1, 2) as struct")
    .select(
        udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
    )
    .first()
)

self.assertEquals(row[0], Row(col1=1, col2=2))

# Spark: Row(col1=1, col2=2)

# |spconnect|: {'col1': 1, 'col2': 2}
Copy

UDFs 中的迭代器类型

不支持将迭代器作为返回类型或输入类型。

# This will not work
def func(iterator):
  for _ in iterator:
              ...

df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Copy

Lambda 函数限制

虽然 Snowpark Connect for Spark 支持 lambda 表达式和高阶函数(例如 transform 函数),但它不支持从 lambda 主体内引用外部列或表达式。

此限制是由 Snowflake 中对 lambda 表达式的限制 造成的。

data = [
    (1, [1, 2, 3]),
    (2, [4, 5]),
    (3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])

df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context

df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
Copy

另一个限制是 lambda 表达式不支持用户定义的函数 (UDFs)。这包括自定义 UDFs 和某些底层实现依赖于 Snowflake UDFs 的内置函数。尝试在 lambda 表达式内部使用 UDF 会导致错误。

spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
Copy

数据源

数据源

与 PySpark 相比的兼容性问题

Avro

不支持此文件类型。

CSV

不支持以下保存模式:AppendIgnore

不支持以下选项:encodingquotequoteAllescapeescapeQuotescommentpreferDateenforceSchemaignoreLeadingWhiteSpaceignoreTrailingWhiteSpacenanValuepositiveInfnegativeInftimestampNTZFormatenableDateTimeParsingFallbackmaxColumnsmaxCharsPerColumnmodecolumnNameOfCorruptRecordcharToEscapeQuoteEscapingsamplingRatioemptyValuelocalelineSepunescapedQuoteHandlingcompression

JSON

不支持以下保存模式:AppendIgnore

不支持以下选项:timeZoneprimitivesAsStringprefersDecimalallowCommentsallowUnquotedFieldNamesallowSingleQuotesallowNumericLeadingZerosallowBackslashEscapingAnyCharactermodecolumnNameOfCorruptRecordtimestampNTZFormatenableDateTimeParsingFallbackallowUnquotedControlCharsencodinglineSepsamplingRatiodropFieldIfAllNulllocaleallowNonNumericNumberscompressionignoreNullFields

Show 中的差异如果字段值为字符串,则会置于引号之间。结果中将显示额外的“n”字符。

Orc

不支持此文件类型。

Parquet

不支持以下保存模式:AppendIgnore

不支持以下选项:datetimeRebaseModeint96RebaseModemergeSchemacompression

不支持的配置:(ALL)

文本

不支持以下写入模式:AppendIgnore

不支持以下选项:compression

写入时不支持 lineSep 参数。

XML

不支持此文件类型。

Snowflake 表

写入表不需要提供商格式。

不支持分桶和分区。

不支持存储格式和版本控制。

目录

Snowflake Horizon 目录提供商支持

  • 仅支持 Snowflake 作为目录提供商。

不支持的目录 APIs

  • registerFunction

  • listFunctions

  • getFunction

  • functionExists

  • createExternalTable

部分支持的目录 APIs

  • :code:`createTable`(不支持外部表)

Iceberg

Snowflake 管理的 Iceberg 表

Snowpark Connect for Spark 支持 Apache Iceberg™ 表,包括外部管理的 Iceberg 表和目录链接的数据库。

读取

不支持 Time Travel,包括历史快照、分支和增量读取。

写入

  • 不支持使用 Spark SQL 创建表。

  • 不支持架构合并。

  • 要创建该表,必须:

    • 创建外部卷。

    • 通过以下任一方式将所需的外部卷与表创建相关联:

      • 将 EXTERNAL_VOLUME 设置为数据库。

      • snowpark.connect.iceberg.external_volume 设置为 Spark 配置。

外部管理的 Iceberg 表

读取

  • 必须创建 Snowflake 非托管表实体。

  • 不支持 Time Travel,包括历史快照、分支和增量读取。

写入

  • 不支持创建表。

  • 支持写入现有 Iceberg 表。

重复的列名

Snowflake 不支持重复的列名。

以下代码会在视图创建步骤中失败,并出现以下 SQL 编译错误:duplicate column name 'foo'

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

要解决此问题,请将 snowpark.connect.views.duplicate_column_names_handling_mode 配置选项设置为以下值之一:

  • rename:将在第一个重复的列名之后,为其所有重复的列名附加后缀,例如 _dedup_1_dedup_2 等。

  • drop:除一个重复列以外,所有其他重复列都将被删除。如果这些列具有不同的值,可能会导致不正确的结果。

语言: 中文