Snowpark Connect for Spark 兼容性指南¶
本指南记录了 Spark DataFrame APIs 的 Snowpark Connect for Spark 实现与原生 Apache Spark 之间的兼容性。它旨在帮助用户了解将 Spark 工作负载迁移到 Snowpark Connect for Spark 时,需要注意的关键差异、不支持的功能以及迁移注意事项。
Snowpark Connect for Spark 旨在提供基于 Snowflake 执行引擎的、用户熟悉的 Spark DataFrame API 体验。但本主题中描述了一些兼容性差距。本指南重点介绍了这些差异,以帮助您规划和调整迁移。这些问题可能会在未来版本中得到解决。
DataTypes¶
不支持的数据类型¶
DayTimeIntervalType (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/DayTimeIntervalType.html)
YearMonthIntervalType (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/YearMonthIntervalType.html)
UserDefinedTypes (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/UserDefinedType.html)
隐式数据类型转换¶
使用 Snowpark Connect for Spark 时,请注意数据类型的处理方式。Snowpark Connect for Spark 隐式表示 ByteType、ShortType、IntegerType 和 LongType。 这意味着,虽然您可以使用 ByteType、ShortType 或 IntegerType 定义列或数据,但数据将由 Snowpark Connect for Spark 以 LongType 的形式表示和返回。同样,根据具体的操作和上下文,FloatType 和 DoubleType 也可能发生隐式转换。Snowflake 执行引擎将在内部处理数据类型压缩,实际上可能会将数据存储为 Byte 或 Short,但这些被视为实现细节,不会向最终用户公开。
从语义上讲,这种表示方式不会影响到 Spark 查询的正确性。
来自原生 PySpark 的数据类型 |
来自 Snowpark Connect for Spark 的数据类型 |
|---|---|
|
|
|
|
|
|
|
|
以下示例显示了 Spark 和 Snowpark Connect for Spark 在查询结果中数据类型处理方面的不同之处。
查询¶
query = """
SELECT * FROM VALUES
(float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
(float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
(float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
AS tab(a, b, c, d, e, f)
"""
Spark¶
spark.sql(query).printSchema()
root
|-- a: float (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal(2,1) (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = false)
|-- f: void (nullable = true)
Snowpark Connect for Spark¶
snowpark_connect_spark.sql(query).printSchema()
root
|-- a: double (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = true)
|-- f: string (nullable = true)
NullType 的细微差别¶
Snowpark Connect for Spark 不支持 NullType (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.NullType.html) 数据类型,而 Spark 中支持这种数据类型。这会导致在数据帧中使用 Null 或 None 时,行为会发生变化。
在 Spark 中,字面量 NULL`(例如,使用 :code:`lit(None))会被自动推断为 NullType。在 Snowpark Connect for Spark 中,它会在架构推断期间被推断为 StringType。
df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]
# Spark: StructField('null_col', :code:`NullType`(), True)
# Snowpark Connect for Spark: StructField('null_col', :code:`StringType`(), True)
ArrayType、MapType 和 ObjectType 中的结构化数据类型¶
虽然 Snowpark Connect for Spark 中默认不支持结构化类型,但 ARRAY、MAP 和 Object 数据类型被视为通用的非类型化集合。这意味着与结构化类型支持所提供的功能不同,它不会强制执行元素类型、字段名称、模式或可空性。
如果您依赖此支持,请与您的客户团队配合,为您的账户启用此功能。
不支持的 Spark APIs¶
以下是经典 Spark 和 Spark Connect 支持但 Snowpark Connect for Spark 不支持的 APIs。
Dataframe.hint (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.hint.html):Snowpark Connect for Spark 会忽略在数据帧上设置的任何提示。Snowflake 查询优化器会自动确定最有效的执行策略。
DataFrame.repartition (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.repartition.html):这在 Snowpark Connect for Spark 中 no-op(无操作)。Snowflake 会在其分布式计算基础设施中自动管理数据分布和分区。
pyspark.RDD (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html):Spark Connect(包括 Snowpark Connect for Spark)中不支持 RDD APIs。
pyspark.ml (https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html)
pyspark streaming (https://spark.apache.org/docs/latest/streaming-programming-guide.html)
UDF 差异¶
StructType 差异¶
当 Spark 转换 StructType 以将其用于用户定义函数 (UDF) 时,它会将其转换为 Python 中的 tuple 类型。Snowpark Connect for Spark 会将 StructType 转换为 Python 中的一种 dict 类型。这在元素访问和输出方面存在根本性差异。
Spark 将使用 0、1、2、3 这样的形式访问索引。
Snowpark Connect for Spark 将使用 '_1'、'_2' 这样的形式访问索引。
def f(e):
return e[0]
df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
result = df.select("*", udf(f, DoubleType())("c1"))
# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:
def f(e):
return e['_1']
row = (
self.spark.range(1)
.selectExpr("struct(1, 2) as struct")
.select(
udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
)
.first()
)
self.assertEquals(row[0], Row(col1=1, col2=2))
# Spark: Row(col1=1, col2=2)
# Snowpark Connect for Spark: {'col1': 1, 'col2': 2}
UDFs 中的迭代器类型¶
不支持将迭代器作为返回类型或输入类型。
# This will not work
def func(iterator):
for _ in iterator:
...
df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
将文件导入 Python UDF¶
借助 Snowpark Connect for Spark,您可以在 Python UDFs 中指定外部库和文件。Snowflake 在代码的执行上下文中包含 Python 文件和存档。您可在 UDF 中直接导入这些包含文件中的函数,无需额外步骤。此依赖项处理行为的工作原理如 使用从暂存区上传的代码创建 Python UDF 所述。
要包含外部库和文件,您需将文件的暂存区路径作为配置设置 snowpark.connect.udf.imports 的值提供。配置值应是一个以逗号分隔的文件暂存区路径数组。
以下示例代码在 UDF 执行上下文中包含了两个文件。该 UDF 从这些文件中导入函数,并在其逻辑中使用。
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/library.py, @other_lib.zip]")
@udf(returnType = StringType())
def import_example(input: str) -> str:
from library import first_function
from other_lib.custom import second_function
return first_function(input) + second_function(input)
spark.range(1).select(import_read_example("example_string")).show()
您也可通过 snowpark.connect.udf.imports 设置包含其他类型的文件,例如代码需读取的数据文件。请注意,在此情况下,您的代码应仅从包含的文件中读取数据;对该类文件的任何写入操作将在函数执行结束后丢失。
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/data.csv]")
@udf(returnType = StringType())
def import_read_example(file_name: str) -> str:
with open(file_name) as f:
return f.read()
spark.range(1).select(import_read_example("data.csv")).show()
Lambda 函数限制¶
User-defined functions (UDFs) are not supported within lambda expressions. This includes both custom UDFs and certain built-in functions whose underlying implementation relies on Snowflake UDFs. Attempting to use a UDF inside a lambda expression will result in an error.
df = spark.createDataFrame([({"a": 123},)], ("data",))
df.select(map_filter("data", lambda _, v: bit_count(v) > 3)).show() # does not work, since `bit_count` is implemented with UDF
临时视图¶
默认情况下,Snowpark Connect for Spark 不会在 Snowflake 中创建临时视图。您可通过将配置参数 snowpark.connect.temporary.views.create_in_snowflake 设置为 true,指定 Snowpark Connect for Spark 创建临时视图。
若该参数设为 false,Snowpark Connect for Spark 将以 DataFrames 格式存储视图,而不创建 Snowflake 视图。这有助于避免 Spark Connect 请求生成的视图定义 SQL 超出 Snowflake 视图大小限制 (95KB) 时可能产生的问题。
使用 Spark Connect Catalog API 时,临时视图通常可见。然而,当通过 SQL 语句调用且配置 snowpark.connect.sql.passthrough 设为 true 时,这些视图将不可访问。要创建 Snowflake 临时视图,请将配置 snowpark.connect.temporary.views.create_in_snowflake 设为 true。
数据源¶
数据源 |
与 PySpark 相比的兼容性问题 |
|---|---|
Avro |
不支持此文件类型。 |
CSV |
不支持以下保存模式: 不支持以下选项: |
JSON |
不支持以下保存模式: 不支持以下选项:
|
Orc |
不支持此文件类型。 |
Parquet |
不支持以下保存模式: 不支持以下选项: 不支持的配置:(ALL) |
文本 |
不支持以下写入模式: 不支持以下选项: 写入时不支持 |
XML |
不支持此文件类型。 |
Snowflake 表 |
写入表不需要提供商格式。 不支持分桶和分区。 不支持存储格式和版本控制。 |
目录¶
Snowflake Horizon 目录提供商支持¶
仅支持 Snowflake 作为目录提供商。
不支持的目录 APIs¶
registerFunctionlistFunctionsgetFunctionfunctionExistscreateExternalTable
部分支持的目录 APIs¶
:code:`createTable`(不支持外部表)
Iceberg¶
Snowflake 管理的 Iceberg 表¶
Snowpark Connect for Spark 支持 Apache Iceberg™ 表,包括外部管理的 Iceberg 表和目录链接的数据库。
读取¶
不支持 Time Travel,包括历史快照、分支和增量读取。
写入¶
不支持使用 Spark SQL 创建表。
不支持架构合并。
要创建该表,必须:
创建外部卷。
通过以下任一方式将所需的外部卷与表创建相关联:
将 EXTERNAL_VOLUME 设置为数据库。
将
snowpark.connect.iceberg.external_volume设置为 Spark 配置。
外部管理的 Iceberg 表¶
读取¶
必须创建 Snowflake 非托管表实体。
不支持 Time Travel,包括历史快照、分支和增量读取。
写入¶
不支持创建表。
支持写入现有 Iceberg 表。
重复的列名¶
Snowflake 不支持重复的列名。
以下代码会在视图创建步骤中失败,并出现以下 SQL 编译错误:duplicate column name 'foo'。
df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])
df.show() # works
df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
要解决此问题,请将 snowpark.connect.views.duplicate_column_names_handling_mode 配置选项设置为以下值之一:
rename:将在第一个重复的列名之后,为其所有重复的列名附加后缀,例如_dedup_1、_dedup_2等。drop:除一个重复列以外,所有其他重复列都将被删除。如果这些列具有不同的值,可能会导致不正确的结果。