Snowpark Connect for Spark 兼容性指南¶
本指南记录了 Spark DataFrame APIs 的 Snowpark Connect for Spark 实现与原生 Apache Spark 之间的兼容性。它旨在帮助用户了解将 Spark 工作负载迁移到 Snowpark Connect for Spark 时,需要注意的关键差异、不支持的功能以及迁移注意事项。
Snowpark Connect for Spark 旨在提供基于 Snowflake 执行引擎的、用户熟悉的 Spark DataFrame API 体验。但本主题中描述了一些兼容性差距。本指南重点介绍了这些差异,以帮助您规划和调整迁移。这些问题可能会在未来版本中得到解决。
DataTypes¶
不支持的数据类型¶
DayTimeIntervalType (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/DayTimeIntervalType.html)
YearMonthIntervalType (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/YearMonthIntervalType.html)
UserDefinedTypes (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/UserDefinedType.html)
隐式数据类型转换¶
使用 Snowpark Connect for Spark 时,请注意数据类型的处理方式。Snowpark Connect for Spark 隐式表示 ByteType
、ShortType
、IntegerType
和 LongType
。这意味着,虽然您可以使用 ByteType
、ShortType
或 IntegerType
定义列或数据,但数据将由 Snowpark Connect for Spark 以 LongType
的形式表示和返回。 同样,根据具体的操作和上下文,FloatType
和 DoubleType
也可能发生隐式转换。Snowflake 执行引擎将在内部处理数据类型压缩,实际上可能会将数据存储为 Byte
或 Short
,但这些被视为实现细节,不会向最终用户公开。
从语义上讲,这种表示方式不会影响到 Spark 查询的正确性。
来自原生 PySpark 的数据类型 |
来自 Snowpark Connect for Spark 的数据类型 |
---|---|
|
|
|
|
|
|
|
|
以下示例显示了 Spark 和 Snowpark Connect for Spark 在查询结果中数据类型处理方面的不同之处。
查询¶
query = """
SELECT * FROM VALUES
(float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
(float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
(float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
AS tab(a, b, c, d, e, f)
"""
Spark¶
spark.sql(query).printSchema()
root
|-- a: float (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal(2,1) (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = false)
|-- f: void (nullable = true)
Snowpark Connect for Spark¶
snowpark_connect_spark.sql(query).printSchema()
root
|-- a: double (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = true)
|-- f: string (nullable = true)
NullType
的细微差别¶
Snowpark Connect for Spark 不支持 NullType (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.NullType.html) 数据类型,而 Spark 中支持这种数据类型。这会导致在数据帧中使用 Null
或 None
时,行为会发生变化。
在 Spark 中,字面量 NULL`(例如,使用 :code:`lit(None)
)会被自动推断为 NullType
。在 Snowpark Connect for Spark 中,它会在架构推断期间被推断为 StringType
。
df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]
# Spark: StructField('null_col', :code:`NullType`(), True)
# |spconnect|: StructField('null_col', :code:`StringType`(), True)
ArrayType
、MapType
和 ObjectType
中的结构化数据类型¶
虽然 Snowpark Connect for Spark 中默认不支持结构化类型,但 ARRAY
、MAP
和 Object
数据类型被视为通用的非类型化集合。这意味着与结构化类型支持所提供的功能不同,它不会强制执行元素类型、字段名称、模式或可空性。
如果您依赖此支持,请与您的客户团队配合,为您的账户启用此功能。
不支持的 Spark APIs¶
以下是经典 Spark 和 Spark Connect 支持但 Snowpark Connect for Spark 不支持的 APIs。
Dataframe.hint (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.hint.html):Snowpark Connect for Spark 会忽略在数据帧上设置的任何提示。Snowflake 查询优化器会自动确定最有效的执行策略。
DataFrame.repartition (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.repartition.html):这在 Snowpark Connect for Spark 中 no-op(无操作)。Snowflake 会在其分布式计算基础设施中自动管理数据分布和分区。
pyspark.RDD (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html):Spark Connect(包括 Snowpark Connect for Spark)中不支持 RDD APIs。
pyspark.ml (https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html)
pyspark streaming (https://spark.apache.org/docs/latest/streaming-programming-guide.html)
UDF 差异¶
StructType
差异¶
当 Spark 转换 StructType
以将其用于用户定义函数 (UDF) 时,它会将其转换为 Python 中的 tuple
类型。Snowpark Connect for Spark 会将 StructType
转换为 Python 中的一种 dict
类型。这在元素访问和输出方面存在根本性差异。
Spark 将使用 0、1、2、3 这样的形式访问索引。
Snowpark Connect for Spark 将使用 '_1'、'_2' 这样的形式访问索引。
def f(e):
return e[0]
df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
result = df.select("*", udf(f, DoubleType())("c1"))
# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:
def f(e):
return e['_1']
row = (
self.spark.range(1)
.selectExpr("struct(1, 2) as struct")
.select(
udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
)
.first()
)
self.assertEquals(row[0], Row(col1=1, col2=2))
# Spark: Row(col1=1, col2=2)
# |spconnect|: {'col1': 1, 'col2': 2}
UDFs 中的迭代器类型¶
不支持将迭代器作为返回类型或输入类型。
# This will not work
def func(iterator):
for _ in iterator:
...
df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Lambda 函数限制¶
虽然 Snowpark Connect for Spark 支持 lambda 表达式和高阶函数(例如 transform
函数),但它不支持从 lambda 主体内引用外部列或表达式。
此限制是由 Snowflake 中对 lambda 表达式的限制 造成的。
data = [
(1, [1, 2, 3]),
(2, [4, 5]),
(3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])
df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context
df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
另一个限制是 lambda 表达式不支持用户定义的函数 (UDFs)。这包括自定义 UDFs 和某些底层实现依赖于 Snowflake UDFs 的内置函数。尝试在 lambda 表达式内部使用 UDF 会导致错误。
spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
数据源¶
数据源 |
与 PySpark 相比的兼容性问题 |
---|---|
Avro |
不支持此文件类型。 |
CSV |
不支持以下保存模式: 不支持以下选项: |
JSON |
不支持以下保存模式: 不支持以下选项:
|
Orc |
不支持此文件类型。 |
Parquet |
不支持以下保存模式: 不支持以下选项: 不支持的配置:(ALL) |
文本 |
不支持以下写入模式: 不支持以下选项: 写入时不支持 |
XML |
不支持此文件类型。 |
Snowflake 表 |
写入表不需要提供商格式。 不支持分桶和分区。 不支持存储格式和版本控制。 |
目录¶
Snowflake Horizon 目录提供商支持¶
仅支持 Snowflake 作为目录提供商。
不支持的目录 APIs¶
registerFunction
listFunctions
getFunction
functionExists
createExternalTable
部分支持的目录 APIs¶
:code:`createTable`(不支持外部表)
Iceberg¶
Snowflake 管理的 Iceberg 表¶
Snowpark Connect for Spark 支持 Apache Iceberg™ 表,包括外部管理的 Iceberg 表和目录链接的数据库。
读取¶
不支持 Time Travel,包括历史快照、分支和增量读取。
写入¶
不支持使用 Spark SQL 创建表。
不支持架构合并。
要创建该表,必须:
创建外部卷。
通过以下任一方式将所需的外部卷与表创建相关联:
将 EXTERNAL_VOLUME 设置为数据库。
将
snowpark.connect.iceberg.external_volume
设置为 Spark 配置。
外部管理的 Iceberg 表¶
读取¶
必须创建 Snowflake 非托管表实体。
不支持 Time Travel,包括历史快照、分支和增量读取。
写入¶
不支持创建表。
支持写入现有 Iceberg 表。
重复的列名¶
Snowflake 不支持重复的列名。
以下代码会在视图创建步骤中失败,并出现以下 SQL 编译错误:duplicate column name 'foo'
。
df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])
df.show() # works
df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
要解决此问题,请将 snowpark.connect.views.duplicate_column_names_handling_mode
配置选项设置为以下值之一:
rename
:将在第一个重复的列名之后,为其所有重复的列名附加后缀,例如_dedup_1
、_dedup_2
等。drop
:除一个重复列以外,所有其他重复列都将被删除。如果这些列具有不同的值,可能会导致不正确的结果。