Snowpark Connect for Spark 属性¶
Snowpark Connect for Spark 支持以类似于标准 Spark 的方式进行自定义配置。您只能通过会话的 set 方法使用键值对来修改配置属性。请注意,Snowpark Connect for Spark 仅识别一组有限的、会影响执行的属性。任何不支持的属性都会被静默忽略,而不会引发异常。
支持的 Spark 属性¶
Snowpark Connect for Spark 支持 Spark 属性的一个子集。
属性名称 |
默认值 |
含义 |
始于版本 |
|---|---|---|---|
|
(无) |
设置为 Snowflake |
1.0.0 |
|
|
当为 |
1.0.0 |
|
(无) |
在读取或写入 S3 位置时,用于 S3 身份验证的 AWS 访问密钥 ID。 |
1.0.0 |
|
(无) |
使用基于角色的身份验证时,具有 S3 访问权限的 AWS IAM 角色 ARN。 |
1.0.0 |
|
(无) |
读取或写入 S3 位置时,用于 S3 身份验证的 AWS 密钥。 |
1.0.0 |
|
(无) |
使用 |
1.0.0 |
|
(无) |
使用 STS 时,用于临时 S3 凭据的 AWS 会话令牌。 |
1.0.0 |
|
|
启用 ANSI SQL 模式,以进行更严格的类型检查和错误处理。当为 |
1.0.0 |
|
|
控制标识符是否区分大小写。当为 |
1.0.0 |
|
|
启用或禁用隐式笛卡尔积连接。如果为 |
1.0.0 |
|
|
当为 |
1.0.0 |
|
|
全局临时视图的架构名称;如果不存在则自动创建。 |
1.0.0 |
|
|
当为 |
1.0.0 |
|
|
数据集分组键命名的遗留行为。 |
1.6.0 |
|
|
控制在创建 map 时发现重复键时的行为。值::code:`EXCEPTION`(引发错误)或 :code:`LAST_WIN`(保留最后一个值)。 |
1.0.0 |
|
|
当为 |
1.0.0 |
|
|
控制 Parquet 输出的时间戳类型。支持 |
1.7.0 |
|
|
当为 |
1.0.0 |
|
|
当为 |
1.0.0 |
|
|
当为 |
1.0.0 |
|
|
在 REPL 及早求值模式下显示的最多行数。 |
1.0.0 |
|
|
在 REPL 及早求值显示中,列值截断前的最大宽度。 |
1.0.0 |
|
|
缓存本地关系的字节阈值。大于此值的关系将被缓存以提高性能。 |
1.0.0 |
|
|
用于时间戳操作的会话时区。通过 |
1.0.0 |
|
|
未明确指定格式时,读/写操作的默认数据源格式。 |
1.0.0 |
|
|
时间戳操作的默认时间戳类型。值::code:`TIMESTAMP_LTZ`(使用当地时区)或 :code:`TIMESTAMP_NTZ`(无时区)。 |
1.0.0 |
|
|
当为 |
1.0.0 |
支持的 Snowpark Connect for Spark 属性¶
特定于 Snowpark Connect for Spark 的自定义配置属性。
属性名称 |
默认值 |
含义 |
始于版本 |
|---|---|---|---|
|
(无) |
用于 Blob 存储身份验证的 Azure SAS 令牌。在读取或写入 Azure Blob 存储位置时使用。 |
1.0.0 |
|
(无) |
用于 ADLS Gen2 (Data Lake Storage) 身份验证的 Azure SAS 令牌。在读取或写入 Azure Data Lake Storage Gen2 位置时使用。 |
1.0.0 |
|
|
当为 |
1.0.0 |
|
|
生成 Parquet 摘要元数据文件的备选配置。此属性或 |
1.4.0 |
|
|
当为 |
1.0.0 |
|
|
当为 |
1.0.0 |
|
|
查询缓存条目的生存时间(以秒为单位)。减少重复的架构查找。 |
1.0.0 |
|
|
当为 |
1.0.0 |
|
|
当为 |
1.7.0 |
|
(无) |
用于 Iceberg 表操作的 Snowflake 外部卷名称。 |
1.0.0 |
|
|
控制小数到整型的转换。值: |
1.7.0 |
|
|
控制使用的 Scala 版本(支持 |
1.7.0 |
|
(无) |
分区写入时的外部表位置路径。 |
1.4.0 |
|
|
当为 |
1.0.1 |
|
(无) |
UDF 执行时需导入的文件或模块的逗号分隔列表。更改时会触发 UDF 重新创建。 |
1.0.0 |
|
(无) |
Python UDF 执行时需导入的文件/模块的逗号分隔列表。更改时会触发 UDF 重新创建。 |
1.7.0 |
|
(无) |
Java UDF 执行时需导入的文件或模块的逗号分隔列表。更改时会触发 UDF 重新创建。 |
1.7.0 |
|
(无) |
注册 UDFs 时需包含的 Python 包的逗号分隔列表。 |
1.0.0 |
|
|
当为 |
1.0.0 |
|
|
只读。返回当前的 Snowpark Connect for Spark 版本。 |
1.0.0 |
|
|
如何处理视图中的重复列名。值::code:`rename`(添加后缀):code:`fail`(引发错误)或 :code:`drop`(移除重复项)。 |
1.0.0 |
|
|
当为 |
1.4.0 |
|
|
当设置为 |
1.12.3 |
fs.azure.sas.<container>.<account>.blob.core.windows.net¶
指定用于 Blob 存储身份验证的 Azure SAS 令牌。在读取或写入 Azure Blob 存储位置时使用。
默认值:(无)
始于版本:1.0.0
fs.azure.sas.fixed.token.<account>.dfs.core.windows.net¶
指定用于 ADLS Gen2 (Data Lake Storage) 身份验证的 Azure SAS 令牌。在读取或写入 Azure Data Lake Storage Gen2 位置时使用。
默认值:(无)
始于版本:1.0.0
mapreduce.fileoutputcommitter.marksuccessfuljobs¶
指定为 true 以在写入操作成功后生成 _SUCCESS 文件,从而确保与 Hadoop 或 Spark 工作流的兼容性。
默认:false
始于版本:1.0.0
parquet.enable.summary-metadata¶
指定用于生成 Parquet 摘要元数据文件的备选配置。可以通过此属性或 spark.sql.parquet.enable.summary-metadata 启用该功能。
默认:false
始于版本:1.4.0
snowflake.repartition.for.writes¶
指定为 true 以强制 DataFrame.repartition(n) 在写入期间将输出拆分为 n 个文件。这与 Spark 行为一致,但会增加额外开销。
默认:false
始于版本:1.0.0
snowpark.connect.cte.optimization_enabled ¶
指定为 true 以在 Snowpark 会话中启用公用表表达式 (CTE) 优化,从而提高查询性能。
默认:false
始于版本:1.0.0
snowpark.connect.describe_cache_ttl_seconds ¶
指定查询缓存条目的生存时间(以秒为单位)。减少重复的架构查找。
默认:300
始于版本:1.0.0
snowpark.connect.enable_snowflake_extension_behavior ¶
指定为 true 以启用可能与 Spark 行为不同的 Snowflake 特定扩展(例如对 MAP 类型进行哈希处理或 MD5 的返回类型)。
默认:false
始于版本:1.0.0
注释¶
当设置为 true 时,会改变某些操作的行为:
bit_get/getbit– 显式使用 Snowflake 的 getbit 函数
hash– 显式使用 Snowflake 的哈希函数
md5– 显式使用 Snowflake 的 md5 函数重命名表列 – 允许更改表列
snowpark.connect.handleIntegralOverflow¶
指定为 true 以使整型溢出行为与 Spark 的处理方式保持一致。
默认:false
始于版本:1.7.0
snowpark.connect.iceberg.external_volume ¶
指定用于 Iceberg 表操作的 Snowflake 外部卷名称。
默认值:(无)
始于版本:1.0.0
snowpark.connect.integralTypesEmulation¶
指定如何将小数转换为整型。值:client_default、enabled、disabled
默认:client_default
始于版本:1.7.0
注释¶
默认情况下,Snowpark Connect for Spark 将所有整型视为 Long 类型。这是由 Snowflake 中数字的表示方式 决定的。整型仿真允许在从数据源读取时,在 Snowpark 和 Spark 类型之间进行精确映射。
默认选项 client_default 仅在从 Scala 客户端执行脚本时激活仿真。整型根据以下精度进行映射:
精度 |
Spark 类型 |
|---|---|
19 |
|
10 |
|
5 |
|
3 |
|
其他 |
|
当发现其他精度时,最终类型将映射为 DecimalType。
snowpark.connect.scala.version¶
指定要使用的 Scala 版本(支持 2.12 或 2.13)。
默认:2.12
始于版本:1.7.0
snowpark.connect.sql.partition.external_table_location ¶
指定分区写入时的外部表位置路径。
默认值:(无)
始于版本:1.4.0
注释¶
若要仅从提供的目录中读取分区文件的精确子集,需要进行额外配置。此功能仅适用于存储在 外部暂存区 上的文件。为了裁剪读取的文件,Snowpark Connect for Spark 会使用 外部表。
当设置配置项 snowpark.connect.sql.partition.external_table_location 时,此功能将启用。该配置项应包含用于创建外部表的现有数据库和架构名称。
读取存储在外部暂存区上的 Parquet 文件将创建一个外部表;对于内部暂存区上的文件,则不会创建。提供架构将减少执行时间,从而消除从源推断架构的成本。
为了获得最佳性能,请根据 Snowflake External Tables 筛选限制 进行筛选。
示例¶
spark.conf.set("snowpark.connect.sql.partition.external_table_location", "<database-name>.<schema-name>")
spark.read.parquet("@external-stage/example").filter(col("x") > lit(1)).show()
schema = StructType([StructField("x",IntegerType()),StructField("y",DoubleType())])
spark.read.schema(schema).parquet("@external-stage/example").filter(col("x") > lit(1)).show()
snowpark.connect.temporary.views.create_in_snowflake ¶
指定为 true 以直接在 Snowflake 中创建临时视图,而不是在本地进行管理。
默认:false
始于版本:1.0.1
snowpark.connect.udf.imports [DEPRECATED 1.7.0]¶
指定用于 UDF 执行的以逗号分隔的文件和模块导入列表。当此值更改时,会触发 UDF 的重新创建。
默认值:(无)
始于版本:1.0.0
snowpark.connect.udf.python.imports¶
指定用于 Python UDF 执行的以逗号分隔的文件和模块导入列表。当此值更改时,会触发 UDF 的重新创建。
默认值:(无)
始于版本:1.7.0
snowpark.connect.udf.java.imports¶
指定用于 Java UDF 执行的以逗号分隔的文件和模块导入列表。更改时会触发 UDF 重新创建。
默认值:(无)
始于版本:1.7.0
注释¶
此配置的工作方式与 snowpark.connect.udf.python.imports 非常相似。通过它,您可以为使用 registerJavaFunction (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.UDFRegistration.registerJavaFunction.html) 创建的 Java UDFs 指定外部库和文件。这些配置是互斥的,以防止不必要的依赖混淆。
要包含外部库和文件,您需将文件的暂存区路径作为配置设置 snowpark.connect.udf.java.imports 的值提供。配置值应是一个以逗号分隔的文件暂存区路径数组。
示例¶
以下示例代码在 UDF 执行上下文中包含了两个文件。该 UDF 从这些文件中导入函数,并在其逻辑中使用。
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.java.imports", "[@stage/library.jar]")
spark.registerJavaFunction("javaFunction", "com.example.ExampleFunction")
spark.sql("SELECT javaFunction('arg')").show()
您也可通过 snowpark.connect.udf.java.imports 设置包含其他类型的文件,例如代码需读取的数据文件。请注意,在此情况下,您的代码应仅从包含的文件中读取数据;对该类文件的任何写入操作将在函数执行结束后丢失。
snowpark.connect.udf.packages¶
指定注册 UDFs 时需要包含的 Python 包的逗号分隔列表。
默认值:(无)
始于版本:1.0.0
注释¶
您可以使用此项来定义可在 Python UDFs 中使用的其他包。该值为依赖项的逗号分隔列表。
您可以通过在 Snowflake 中执行以下 SQL 来查询支持的包列表:
SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
示例¶
spark.conf.set("snowpark.connect.udf.packages", "[numpy]")
@udtf(returnType="val: int")
class Powers:
def eval(self, x: int):
import numpy as np
for v in np.power(np.array([x, x, x]), [0, 1, 2]):
yield (int(v),)
spark.udtf.register(name="powers", f=Powers)
spark.sql("SELECT * FROM powers(10)").show()
有关更多信息,请参阅 Python。
snowpark.connect.udtf.compatibility_mode ¶
指定为 true 以启用 Spark 兼容的 UDTF 行为,从而提高与 Spark UDTF 语义的兼容性。
默认:false
始于版本:1.0.0
注释¶
此属性决定了 UDTFs 应使用 Spark 兼容行为还是默认的 Snowpark 行为。当设置为 true 时,它会应用一个兼容性包装器,该包装器模仿了 Spark 的输出类型强制转换和错误处理模式。
启用后,UDTFs 将使用兼容性包装器,该封装器应用 Spark 风格的自动类型强制转换(例如,将字符串“true”转换为布尔值,将布尔值转换为整数)和错误处理。该包封装器还会将表参数转换为支持位置和命名访问的类 Row 对象,并妥善处理 SQL 空值,以匹配 Spark 的行为模式。
snowpark.connect.version¶
返回当前的 Snowpark Connect for Spark 版本。只读。
默认:<current_version>
始于版本:1.0.0
snowpark.connect.views.duplicate_column_names_handling_mode ¶
指定如何处理视图中的重复列名。允许的值包括 :code:`rename`(添加后缀):code:`fail`(引发错误)或 :code:`drop`(移除重复项)。
默认:rename
始于版本:1.0.0
注释¶
Snowflake 不支持重复的列名。
示例¶
以下代码在视图创建步骤会失败,并提示如下 SQL 编译错误:“duplicate column name 'foo'”。
df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])
df.show() # works
df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
为了解决这个问题,请将 snowpark.connect.views.duplicate_column_names_handling_mode 配置选项设置为以下值之一:
rename:将在第一个重复的列名之后,为其所有重复的列名附加后缀,例如_dedup_1、_dedup_2等。drop:选择使用 时默认使用的角色和仓库。除一个重复列以外,所有其他重复列都将被删除。如果这些列具有不同的值,可能会导致不正确的结果。
snowpark.connect.udf.java.imports¶
指定用于 Java UDF 执行的以逗号分隔的文件和模块导入列表。更改时会触发 UDF 重新创建。
默认值:(无)
始于版本:1.7.0
注释¶
此配置的工作方式与 snowpark.connect.udf.python.imports 非常相似。您可以使用它为通过 registerJavaFunction (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.UDFRegistration.registerJavaFunction.html) 创建的 Java UDFs 指定外部库和文件。这些配置是互斥的,以防止不必要的依赖混淆。
要包含外部库和文件,您需将文件的暂存区路径作为配置设置 snowpark.connect.udf.java.imports 的值提供。该值是一个以逗号分隔的文件暂存区路径数组。
示例¶
以下示例代码在 UDF 执行上下文中包含了两个文件。该 UDF 从这些文件中导入函数,并在其逻辑中使用。
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.java.imports", "[@stage/library.jar]")
spark.registerJavaFunction("javaFunction", "com.example.ExampleFunction")
spark.sql("SELECT javaFunction('arg')").show()
您也可通过 snowpark.connect.udf.java.imports 设置包含其他类型的文件,例如代码需读取的数据文件。在此情况下,您的代码应仅从包含的文件中读取数据;对该类文件的任何写入操作将在函数执行结束后丢失。
snowpark.connect.udf.packages¶
指定注册 UDFs 时需要包含的 Python 包的逗号分隔列表。
默认值:(无)
始于版本:1.0.0
注释¶
该配置允许定义可在 Python UDFs 中使用的其他包。该值为依赖项的逗号分隔列表。
您可以通过在 Snowflake 中执行以下 SQL 来查询支持的包列表:
SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
示例¶
spark.conf.set("snowpark.connect.udf.packages", "[numpy]")
@udtf(returnType="val: int")
class Powers:
def eval(self, x: int):
import numpy as np
for v in np.power(np.array([x, x, x]), [0, 1, 2]):
yield (int(v),)
spark.udtf.register(name="powers", f=Powers)
spark.sql("SELECT * FROM powers(10)").show()
参考:包参考
snowpark.connect.udtf.compatibility_mode ¶
指定为 true 以启用 Spark 兼容的 UDTF 行为,从而提高与 Spark UDTF 语义的兼容性。
默认:false
始于版本:1.0.0
注释¶
This configuration determines whether UDTFs should use Spark-compatible behavior or the default Snowpark behavior. When enabled (true), it applies a compatibility wrapper that mimics Spark's output type coercion (for example, string "true" to boolean, boolean to integer) and error handling patterns.
The wrapper also converts table arguments to row-like objects for both positional and named access, and properly handles SQL null values to match Spark's behavior patterns.
snowpark.connect.sql.emulatePartitionOverwritesForSnowflakeTables¶
当设置为 true 时,允许在 Spark SQL 中对 Snowflake 表执行分区覆盖操作 (INSERT OVERWRITE <table> PARTITION(<partition spec>))。
默认:false
Since: 1.12.3
注释¶
Snowflake 表不支持用户定义的分区,默认情况下,分区覆盖会导致错误。启用此选项后,可以使用 INSERT OVERWRITE <table> PARTITION(<partition spec>) 语法来执行覆盖。
<partition spec> 可接受目标表中存在的所有列。
示例¶
以下示例中的代码会覆盖 students 表中所有 student_id 为 222222 的行。
spark.conf.set("snowpark.connect.sql.emulatePartitionOverwritesForSnowflakeTables", True)
# create the students and persons tables as standard Snowflake tables
students_data = [
("Ashua Hill", "456 Erica Ct, Cupertino", 111111),
("Brian Reed", "723 Kern Ave, Palo Alto", 222222)
]
students_df = spark.createDataFrame(students_data, ["name", "address", "student_id"])
students_df.write.mode("overwrite").saveAsTable("students")
persons_data = [
("Dora Williams", "134 Forest Ave, Menlo Park", 123456789),
("Eddie Davis", "245 Market St, Milpitas", 345678901)
]
persons_df = spark.createDataFrame(persons_data, ["name", "address", "ssn"])
persons_df.write.mode("overwrite").saveAsTable("persons")
# overwrites all rows in the students table that have a student_id of 222222
spark.sql("""
INSERT OVERWRITE students PARTITION (student_id = 222222)
SELECT name, address FROM persons WHERE name = 'Dora Williams'
""").collect()
注释¶
配置用于启用 Snowflake 公共表表达式 (CTEs)。该配置可优化包含大量重复代码块的 Snowflake 查询。此修改将提高查询编译和执行性能。