Snowpark Connect for Spark 属性

Snowpark Connect for Spark 支持以类似于标准 Spark 的方式进行自定义配置。您只能通过会话的 set 方法使用键值对来修改配置属性。请注意,Snowpark Connect for Spark 仅识别一组有限的、会影响执行的属性。任何不支持的属性都会被静默忽略,而不会引发异常。

支持的 Spark 属性

Snowpark Connect for Spark 支持 Spark 属性的一个子集。

属性名称

默认值

含义

始于版本

spark.app.name

(无)

设置为 Snowflake query_tag (Spark-Connect-App-Name={name}) 的应用程序名称,用于查询跟踪。

1.0.0

spark.Catalog.databaseFilterInformationSchema

false

当为 true 时,在目录操作中从数据库列表中过滤掉 INFORMATION_SCHEMA

1.0.0

spark.hadoop.fs.s3a.access.key

(无)

在读取或写入 S3 位置时,用于 S3 身份验证的 AWS 访问密钥 ID。

1.0.0

spark.hadoop.fs.s3a.assumed.role.arn

(无)

使用基于角色的身份验证时,具有 S3 访问权限的 AWS IAM 角色 ARN。

1.0.0

spark.hadoop.fs.s3a.secret.key

(无)

读取或写入 S3 位置时,用于 S3 身份验证的 AWS 密钥。

1.0.0

spark.hadoop.fs.s3a.server-side-encryption.key

(无)

使用 AWS_SSE_KMS 加密类型时,用于服务器端加密的 AWS KMS 密钥 ID。

1.0.0

spark.hadoop.fs.s3a.session.token

(无)

使用 STS 时,用于临时 S3 凭据的 AWS 会话令牌。

1.0.0

spark.sql.ansi.enabled

false

启用 ANSI SQL 模式,以进行更严格的类型检查和错误处理。当为 true 时,算术溢出和无效转换将引发错误,而不是返回 NULL

1.0.0

spark.sql.caseSensitive

false

控制标识符是否区分大小写。当为 false 时,列名和表名不区分大小写(在 Snowflake 中自动转换为大写)。

1.0.0

spark.sql.crossJoin.enabled

true

启用或禁用隐式笛卡尔积连接。如果为 false 且缺少连接条件或连接条件微不足道,则会导致错误。

1.0.0

spark.sql.execution.pythonUDTF.arrow.enabled

false

当为 true 时,为 Python UDTF 的序列化/反序列化启用 Apache Arrow 优化。

1.0.0

spark.sql.globalTempDatabase

global_temp

全局临时视图的架构名称;如果不存在则自动创建。

1.0.0

spark.sql.legacy.allowHashOnMapType

false

当为 true 时,允许对 MAP 类型列进行哈希处理。为了与 MAP 行为保持一致,默认情况下无法对 MAP 类型进行哈希处理。

1.0.0

spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue

false

数据集分组键命名的遗留行为。

1.6.0

spark.sql.mapKeyDedupPolicy

EXCEPTION

控制在创建 map 时发现重复键时的行为。值::code:`EXCEPTION`(引发错误)或 :code:`LAST_WIN`(保留最后一个值)。

1.0.0

spark.sql.parser.quotedRegexColumnNames

false

当为 true 时,在 SQL 查询的引号列名中启用正则表达式模式匹配(例如 SELECT '(col1|col2)' FROM table)。

1.0.0

spark.sql.parquet.outputTimestampType

TIMESTAMP_MILLIS

控制 Parquet 输出的时间戳类型。支持 TIMESTAMP_MILLISTIMESTAMP_MICROS

1.7.0

spark.sql.pyspark.inferNestedDictAsStruct.enabled

false

当为 true 时,在创建 DataFrame 期间将嵌套的 Python 字典推断为 StructType 而非 MapType

1.0.0

spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled

false

当为 true 时,仅根据第一个元素推断数组元素类型,而不是对所有元素进行采样。

1.0.0

spark.sql.repl.eagerEval.enabled

false

当为 true 时,在 REPL 中启用及早求值,无需调用 show() 即可自动显示 DataFrame 结果。

1.0.0

spark.sql.repl.eagerEval.maxNumRows

20

在 REPL 及早求值模式下显示的最多行数。

1.0.0

spark.sql.repl.eagerEval.truncate

20

在 REPL 及早求值显示中,列值截断前的最大宽度。

1.0.0

spark.sql.session.localRelationCacheThreshold

2147483647

缓存本地关系的字节阈值。大于此值的关系将被缓存以提高性能。

1.0.0

spark.sql.session.timeZone

<system_local_timezone>

用于时间戳操作的会话时区。通过 ALTER SESSION SET TIMEZONE 与 Snowflake 会话同步。

1.0.0

spark.sql.sources.default

parquet

未明确指定格式时,读/写操作的默认数据源格式。

1.0.0

spark.sql.timestampType

TIMESTAMP_LTZ

时间戳操作的默认时间戳类型。值::code:`TIMESTAMP_LTZ`(使用当地时区)或 :code:`TIMESTAMP_NTZ`(无时区)。

1.0.0

spark.sql.tvf.allowMultipleTableArguments.enabled

true

当为 true 时,允许表值函数 (TVF) 接受多个表实参。

1.0.0

支持的 Snowpark Connect for Spark 属性

特定于 Snowpark Connect for Spark 的自定义配置属性。

属性名称

默认值

含义

始于版本

fs.azure.sas.<container>.<account>.blob.core.windows.net

(无)

用于 Blob 存储身份验证的 Azure SAS 令牌。在读取或写入 Azure Blob 存储位置时使用。

1.0.0

fs.azure.sas.fixed.token.<account>.dfs.core.windows.net

(无)

用于 ADLS Gen2 (Data Lake Storage) 身份验证的 Azure SAS 令牌。在读取或写入 Azure Data Lake Storage Gen2 位置时使用。

1.0.0

mapreduce.fileoutputcommitter.marksuccessfuljobs

false

当为 true 时,在写操作成功后生成 _SUCCESS 文件,以确保与 Hadoop/Spark 工作流的兼容性。

1.0.0

parquet.enable.summary-metadata

false

生成 Parquet 摘要元数据文件的备选配置。此属性或 spark.sql.parquet.enable.summary-metadata 均可启用该功能。

1.4.0

snowflake.repartition.for.writes

false

当为 true 时,强制 DataFrame.repartition(n) 在写入期间将输出拆分为 n 个文件。这与 Spark 行为一致,但会增加额外开销。

1.0.0

snowpark.connect.cte.optimization_enabled

false

当为 true 时,在 Snowpark 会话中启用公用表表达式 (CTE) 优化,以提高查询性能。

1.0.0

snowpark.connect.describe_cache_ttl_seconds

300

查询缓存条目的生存时间(以秒为单位)。减少重复的架构查找。

1.0.0

snowpark.connect.enable_snowflake_extension_behavior

false

当为 true 时,启用可能与 Spark 行为不同的 Snowflake 特定扩展(例如对 MAP 类型进行哈希处理或 MD5 的返回类型)。

1.0.0

snowpark.connect.handleIntegralOverflow

false

当为 true 时,整型溢出行为将与 Spark 的处理方式保持一致。

1.7.0

snowpark.connect.iceberg.external_volume

(无)

用于 Iceberg 表操作的 Snowflake 外部卷名称。

1.0.0

snowpark.connect.integralTypesEmulation

client_default

控制小数到整型的转换。值:client_defaultenableddisabled

1.7.0

snowpark.connect.scala.version

2.12

控制使用的 Scala 版本(支持 2.122.13

1.7.0

snowpark.connect.sql.partition.external_table_location

(无)

分区写入时的外部表位置路径。

1.4.0

snowpark.connect.temporary.views.create_in_snowflake

false

当为 true 时,直接在 Snowflake 中创建临时视图,而不是在本地管理。

1.0.1

snowpark.connect.udf.imports [DEPRECATED 1.7.0]

(无)

UDF 执行时需导入的文件或模块的逗号分隔列表。更改时会触发 UDF 重新创建。

1.0.0

snowpark.connect.udf.python.imports

(无)

Python UDF 执行时需导入的文件/模块的逗号分隔列表。更改时会触发 UDF 重新创建。

1.7.0

snowpark.connect.udf.java.imports

(无)

Java UDF 执行时需导入的文件或模块的逗号分隔列表。更改时会触发 UDF 重新创建。

1.7.0

snowpark.connect.udf.packages

(无)

注册 UDFs 时需包含的 Python 包的逗号分隔列表。

1.0.0

snowpark.connect.udtf.compatibility_mode

false

当为 true 时,启用 Spark 兼容的 UDTF 行为,以提高与 Spark UDTF 语义的兼容性。

1.0.0

snowpark.connect.version

<current_version>

只读。返回当前的 Snowpark Connect for Spark 版本。

1.0.0

snowpark.connect.views.duplicate_column_names_handling_mode

rename

如何处理视图中的重复列名。值::code:`rename`(添加后缀):code:`fail`(引发错误)或 :code:`drop`(移除重复项)。

1.0.0

spark.sql.parquet.enable.summary-metadata

false

当为 true 时,在 Parquet 写入期间生成 Parquet 摘要元数据文件 (_metadata _common_metadata)。

1.4.0

snowpark.connect.sql.emulatePartitionOverwritesForSnowflakeTables

false

当设置为 true 时,允许在 Spark SQL 中对 Snowflake 表执行分区覆盖操作 (INSERT OVERWRITE <table> PARTITION(<partition spec>))。

1.12.3

fs.azure.sas.<container>.<account>.blob.core.windows.net

指定用于 Blob 存储身份验证的 Azure SAS 令牌。在读取或写入 Azure Blob 存储位置时使用。

默认值:(无)

始于版本:1.0.0

fs.azure.sas.fixed.token.<account>.dfs.core.windows.net

指定用于 ADLS Gen2 (Data Lake Storage) 身份验证的 Azure SAS 令牌。在读取或写入 Azure Data Lake Storage Gen2 位置时使用。

默认值:(无)

始于版本:1.0.0

mapreduce.fileoutputcommitter.marksuccessfuljobs

指定为 true 以在写入操作成功后生成 _SUCCESS 文件,从而确保与 Hadoop 或 Spark 工作流的兼容性。

默认:false

始于版本:1.0.0

parquet.enable.summary-metadata

指定用于生成 Parquet 摘要元数据文件的备选配置。可以通过此属性或 spark.sql.parquet.enable.summary-metadata 启用该功能。

默认:false

始于版本:1.4.0

snowflake.repartition.for.writes

指定为 true 以强制 DataFrame.repartition(n) 在写入期间将输出拆分为 n 个文件。这与 Spark 行为一致,但会增加额外开销。

默认:false

始于版本:1.0.0

snowpark.connect.cte.optimization_enabled

指定为 true 以在 Snowpark 会话中启用公用表表达式 (CTE) 优化,从而提高查询性能。

默认:false

始于版本:1.0.0

注释

配置用于启用 Snowflake 公共表表达式 (CTEs)。该配置可优化包含大量重复代码块的 Snowflake 查询。此修改将提高查询编译和执行性能。

snowpark.connect.describe_cache_ttl_seconds

指定查询缓存条目的生存时间(以秒为单位)。减少重复的架构查找。

默认:300

始于版本:1.0.0

snowpark.connect.enable_snowflake_extension_behavior

指定为 true 以启用可能与 Spark 行为不同的 Snowflake 特定扩展(例如对 MAP 类型进行哈希处理或 MD5 的返回类型)。

默认:false

始于版本:1.0.0

注释

当设置为 true 时,会改变某些操作的行为:

snowpark.connect.handleIntegralOverflow

指定为 true 以使整型溢出行为与 Spark 的处理方式保持一致。

默认:false

始于版本:1.7.0

snowpark.connect.iceberg.external_volume

指定用于 Iceberg 表操作的 Snowflake 外部卷名称。

默认值:(无)

始于版本:1.0.0

snowpark.connect.integralTypesEmulation

指定如何将小数转换为整型。值:client_defaultenableddisabled

默认:client_default

始于版本:1.7.0

注释

默认情况下,Snowpark Connect for Spark 将所有整型视为 Long 类型。这是由 Snowflake 中数字的表示方式 决定的。整型仿真允许在从数据源读取时,在 Snowpark 和 Spark 类型之间进行精确映射。

默认选项 client_default 仅在从 Scala 客户端执行脚本时激活仿真。整型根据以下精度进行映射:

精度

Spark 类型

19

LongType

10

IntegerType

5

ShortType

3

ByteType

其他

DecimalType(precision, 0)

当发现其他精度时,最终类型将映射为 DecimalType

snowpark.connect.scala.version

指定要使用的 Scala 版本(支持 2.122.13)。

默认:2.12

始于版本:1.7.0

snowpark.connect.sql.partition.external_table_location

指定分区写入时的外部表位置路径。

默认值:(无)

始于版本:1.4.0

注释

若要仅从提供的目录中读取分区文件的精确子集,需要进行额外配置。此功能仅适用于存储在 外部暂存区 上的文件。为了裁剪读取的文件,Snowpark Connect for Spark 会使用 外部表

当设置配置项 snowpark.connect.sql.partition.external_table_location 时,此功能将启用。该配置项应包含用于创建外部表的现有数据库和架构名称。

读取存储在外部暂存区上的 Parquet 文件将创建一个外部表;对于内部暂存区上的文件,则不会创建。提供架构将减少执行时间,从而消除从源推断架构的成本。

为了获得最佳性能,请根据 Snowflake External Tables 筛选限制 进行筛选。

示例
spark.conf.set("snowpark.connect.sql.partition.external_table_location", "<database-name>.<schema-name>")

spark.read.parquet("@external-stage/example").filter(col("x") > lit(1)).show()

schema = StructType([StructField("x",IntegerType()),StructField("y",DoubleType())])

spark.read.schema(schema).parquet("@external-stage/example").filter(col("x") > lit(1)).show()
Copy

snowpark.connect.temporary.views.create_in_snowflake

指定为 true 以直接在 Snowflake 中创建临时视图,而不是在本地进行管理。

默认:false

始于版本:1.0.1

snowpark.connect.udf.imports [DEPRECATED 1.7.0]

指定用于 UDF 执行的以逗号分隔的文件和模块导入列表。当此值更改时,会触发 UDF 的重新创建。

默认值:(无)

始于版本:1.0.0

snowpark.connect.udf.python.imports

指定用于 Python UDF 执行的以逗号分隔的文件和模块导入列表。当此值更改时,会触发 UDF 的重新创建。

默认值:(无)

始于版本:1.7.0

snowpark.connect.udf.java.imports

指定用于 Java UDF 执行的以逗号分隔的文件和模块导入列表。更改时会触发 UDF 重新创建。

默认值:(无)

始于版本:1.7.0

注释

此配置的工作方式与 snowpark.connect.udf.python.imports 非常相似。通过它,您可以为使用 registerJavaFunction (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.UDFRegistration.registerJavaFunction.html) 创建的 Java UDFs 指定外部库和文件。这些配置是互斥的,以防止不必要的依赖混淆。

要包含外部库和文件,您需将文件的暂存区路径作为配置设置 snowpark.connect.udf.java.imports 的值提供。配置值应是一个以逗号分隔的文件暂存区路径数组。

示例

以下示例代码在 UDF 执行上下文中包含了两个文件。该 UDF 从这些文件中导入函数,并在其逻辑中使用。

# Files need to be previously staged

spark.conf.set("snowpark.connect.udf.java.imports", "[@stage/library.jar]")

spark.registerJavaFunction("javaFunction", "com.example.ExampleFunction")

spark.sql("SELECT javaFunction('arg')").show()
Copy

您也可通过 snowpark.connect.udf.java.imports 设置包含其他类型的文件,例如代码需读取的数据文件。请注意,在此情况下,您的代码应仅从包含的文件中读取数据;对该类文件的任何写入操作将在函数执行结束后丢失。

snowpark.connect.udf.packages

指定注册 UDFs 时需要包含的 Python 包的逗号分隔列表。

默认值:(无)

始于版本:1.0.0

注释

您可以使用此项来定义可在 Python UDFs 中使用的其他包。该值为依赖项的逗号分隔列表。

您可以通过在 Snowflake 中执行以下 SQL 来查询支持的包列表:

SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
Copy
示例
spark.conf.set("snowpark.connect.udf.packages", "[numpy]")

@udtf(returnType="val: int")

class Powers:

  def eval(self, x: int):
      import numpy as np

      for v in np.power(np.array([x, x, x]), [0, 1, 2]):
          yield (int(v),)

spark.udtf.register(name="powers", f=Powers)

spark.sql("SELECT * FROM powers(10)").show()
Copy

有关更多信息,请参阅 Python

snowpark.connect.udtf.compatibility_mode

指定为 true 以启用 Spark 兼容的 UDTF 行为,从而提高与 Spark UDTF 语义的兼容性。

默认:false

始于版本:1.0.0

注释

此属性决定了 UDTFs 应使用 Spark 兼容行为还是默认的 Snowpark 行为。当设置为 true 时,它会应用一个兼容性包装器,该包装器模仿了 Spark 的输出类型强制转换和错误处理模式。

启用后,UDTFs 将使用兼容性包装器,该封装器应用 Spark 风格的自动类型强制转换(例如,将字符串“true”转换为布尔值,将布尔值转换为整数)和错误处理。该包封装器还会将表参数转换为支持位置和命名访问的类 Row 对象,并妥善处理 SQL 空值,以匹配 Spark 的行为模式。

snowpark.connect.version

返回当前的 Snowpark Connect for Spark 版本。只读。

默认:<current_version>

始于版本:1.0.0

snowpark.connect.views.duplicate_column_names_handling_mode

指定如何处理视图中的重复列名。允许的值包括 :code:`rename`(添加后缀):code:`fail`(引发错误)或 :code:`drop`(移除重复项)。

默认:rename

始于版本:1.0.0

注释

Snowflake 不支持重复的列名。

示例

以下代码在视图创建步骤会失败,并提示如下 SQL 编译错误:“duplicate column name 'foo'”。

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

为了解决这个问题,请将 snowpark.connect.views.duplicate_column_names_handling_mode 配置选项设置为以下值之一:

  • rename:将在第一个重复的列名之后,为其所有重复的列名附加后缀,例如 _dedup_1_dedup_2 等。

  • drop:选择使用 时默认使用的角色和仓库。除一个重复列以外,所有其他重复列都将被删除。如果这些列具有不同的值,可能会导致不正确的结果。

snowpark.connect.udf.java.imports

指定用于 Java UDF 执行的以逗号分隔的文件和模块导入列表。更改时会触发 UDF 重新创建。

默认值:(无)

始于版本:1.7.0

注释

此配置的工作方式与 snowpark.connect.udf.python.imports 非常相似。您可以使用它为通过 registerJavaFunction (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.UDFRegistration.registerJavaFunction.html) 创建的 Java UDFs 指定外部库和文件。这些配置是互斥的,以防止不必要的依赖混淆。

要包含外部库和文件,您需将文件的暂存区路径作为配置设置 snowpark.connect.udf.java.imports 的值提供。该值是一个以逗号分隔的文件暂存区路径数组。

示例

以下示例代码在 UDF 执行上下文中包含了两个文件。该 UDF 从这些文件中导入函数,并在其逻辑中使用。

# Files need to be previously staged

spark.conf.set("snowpark.connect.udf.java.imports", "[@stage/library.jar]")

spark.registerJavaFunction("javaFunction", "com.example.ExampleFunction")

spark.sql("SELECT javaFunction('arg')").show()
Copy

您也可通过 snowpark.connect.udf.java.imports 设置包含其他类型的文件,例如代码需读取的数据文件。在此情况下,您的代码应仅从包含的文件中读取数据;对该类文件的任何写入操作将在函数执行结束后丢失。

snowpark.connect.udf.packages

指定注册 UDFs 时需要包含的 Python 包的逗号分隔列表。

默认值:(无)

始于版本:1.0.0

注释

该配置允许定义可在 Python UDFs 中使用的其他包。该值为依赖项的逗号分隔列表。

您可以通过在 Snowflake 中执行以下 SQL 来查询支持的包列表:

SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
Copy
示例
spark.conf.set("snowpark.connect.udf.packages", "[numpy]")

@udtf(returnType="val: int")

class Powers:

  def eval(self, x: int):
      import numpy as np

      for v in np.power(np.array([x, x, x]), [0, 1, 2]):
          yield (int(v),)

spark.udtf.register(name="powers", f=Powers)

spark.sql("SELECT * FROM powers(10)").show()
Copy

参考:包参考

snowpark.connect.udtf.compatibility_mode

指定为 true 以启用 Spark 兼容的 UDTF 行为,从而提高与 Spark UDTF 语义的兼容性。

默认:false

始于版本:1.0.0

注释

This configuration determines whether UDTFs should use Spark-compatible behavior or the default Snowpark behavior. When enabled (true), it applies a compatibility wrapper that mimics Spark's output type coercion (for example, string "true" to boolean, boolean to integer) and error handling patterns.

The wrapper also converts table arguments to row-like objects for both positional and named access, and properly handles SQL null values to match Spark's behavior patterns.

snowpark.connect.sql.emulatePartitionOverwritesForSnowflakeTables

当设置为 true 时,允许在 Spark SQL 中对 Snowflake 表执行分区覆盖操作 (INSERT OVERWRITE <table> PARTITION(<partition spec>))。

默认:false

Since: 1.12.3

注释

Snowflake 表不支持用户定义的分区,默认情况下,分区覆盖会导致错误。启用此选项后,可以使用 INSERT OVERWRITE <table> PARTITION(<partition spec>) 语法来执行覆盖。

<partition spec> 可接受目标表中存在的所有列。

示例

以下示例中的代码会覆盖 students 表中所有 student_id 为 222222 的行。

spark.conf.set("snowpark.connect.sql.emulatePartitionOverwritesForSnowflakeTables", True)

# create the students and persons tables as standard Snowflake tables
students_data = [
  ("Ashua Hill", "456 Erica Ct, Cupertino", 111111),
  ("Brian Reed", "723 Kern Ave, Palo Alto", 222222)
]

students_df = spark.createDataFrame(students_data, ["name", "address", "student_id"])
students_df.write.mode("overwrite").saveAsTable("students")

persons_data = [
    ("Dora Williams", "134 Forest Ave, Menlo Park", 123456789),
    ("Eddie Davis", "245 Market St, Milpitas", 345678901)
]

persons_df = spark.createDataFrame(persons_data, ["name", "address", "ssn"])
persons_df.write.mode("overwrite").saveAsTable("persons")

# overwrites all rows in the students table that have a student_id of 222222
spark.sql("""
    INSERT OVERWRITE students PARTITION (student_id = 222222)
    SELECT name, address FROM persons WHERE name = 'Dora Williams'
""").collect()
Copy
语言: 中文