使用 Spark Connector

该连接器遵循标准的 Spark API,但增加了特定于 Snowflake 的选项,这将在本主题中介绍。

在本主题中,术语 COPY 指的是以下两者:

  • COPY INTO <table> (用于将数据从内部或外部暂存区传输到表中)。

  • :doc:`/sql-reference/sql/copy-into-location`(用于将数据从表传输到内部或外部暂存区)。

本主题内容:

使用 SnowCD 验证与 Snowflake 的网络连接

配置驱动程序后,可以使用 SnowCD 评估与 Snowflake 的网络连接并进行故障排除。

可以在初始配置过程中使用 SnowCD,也可以根据需要随时使用,以评估与 Snowflake 的网络连接并进行故障排除。

下推

Spark Connector 通过捕获和分析 SQL 运算的 Spark 逻辑计划来应用谓词和查询下推。当数据源是 Snowflake 时,运算被转换为 SQL 查询,然后在 Snowflake 中执行以提高性能。

但是,由于这种转换几乎需要将 Spark SQL 运算符一对一地转换为 Snowflake 表达式,因此并非所有 Spark SQL 运算符都可以下推。当下推失败时,连接器会退回到优化程度较低的执行计划。不支持的运算改为在 Spark 中执行。

备注

如果需要下推所有运算,请考虑编写代码以改用 Snowpark API

以下是支持的下推运算列表(以下所有函数都使用其 Spark 名称)。如果某个函数不在此列表中,则使用该函数的 Spark 计划可能会在 Spark 上执行,而不是下推到 Snowflake 中。

  • 聚合函数

    • Average

    • Corr

    • CovPopulation

    • CovSample

    • Count

    • Max

    • Min

    • StddevPop

    • StddevSamp

    • Sum

    • VariancePop

    • VarianceSamp

  • 布尔运算符

    • And

    • Between

    • Contains

    • EndsWith

    • EqualTo

    • GreaterThan

    • GreaterThanOrEqual

    • In

    • IsNull

    • IsNotNull

    • LessThan

    • LessThanOrEqual

    • Not

    • Or

    • StartsWith

  • 日期、时间和时间戳函数

    • DateAdd

    • DateSub

    • Month

    • Quarter

    • TruncDate

    • TruncTimestamp

    • Year

  • 数学函数

    • 算术运算符“+”(加法)、“-”(减法)、“*”(乘法)、“/”(除法)和“-”(一元否定)。

    • Abs

    • Acos

    • Asin

    • Atan

    • Ceil

    • CheckOverflow

    • Cos

    • Cosh

    • Exp

    • Floor

    • Greatest

    • Least

    • Log

    • Pi

    • Pow

    • PromotePrecision

    • Rand

    • Round

    • Sin

    • Sinh

    • Sqrt

    • Tan

    • Tanh

  • 混杂运算符

    • 别名(AS 表达式)

    • BitwiseAnd

    • BitwiseNot

    • BitwiseOr

    • BitwiseXor

    • CaseWhen

    • Cast(child, t, _)

    • Coalesce

    • If

    • MakeDecimal

    • ScalarSubquery

    • ShiftLeft

    • ShiftRight

    • SortOrder

    • UnscaledValue

  • 关系运算符

    • 聚合函数和分组依据子句

    • Distinct

    • Filters

    • In

    • InSet

    • Joins

    • Limits

    • Projections

    • Sorts (ORDER BY)

    • Union 和 Union All

    • 窗口函数和开窗子句

  • 字符串函数

    • Ascii

    • Concat(children)

    • Length

    • Like

    • Lower

    • StringLPad

    • StringRPad

    • StringTranslate

    • StringTrim

    • StringTrimLeft

    • StringTrimRight

    • Substring

    • Upper

  • 窗口函数(注意:这些不适用于 Spark 2.2)

    • DenseRank

    • Rank

    • RowNumber

在 Scala 中使用连接器

指定数据源类名

要在 Spark 中将 Snowflake 用作数据源,请使用 .format 选项提供定义数据源的 Snowflake 连接器类名。

net.snowflake.spark.snowflake

为了确保对类名进行编译时检查,Snowflake 强烈建议为类名定义一个变量。例如:

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
Copy

此外,为方便起见,Utils 类提供变量,可以按如下方式导入:

import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME
Copy

备注

本主题中的所有示例都使用 SNOWFLAKE_SOURCE_NAME 作为类定义。

在会话中启用/禁用下推

连接器版本 2.1.0(及更高版本)支持查询下推,当 Snowflake 是 Spark 数据源时,通过将查询处理推送到 Snowflake,可以显著提高性能。

默认情况下,下推处于启用状态。

要在 Spark 会话中禁用给定 DataFrame 的下推,请执行以下操作:

  1. 实例化 SparkSession 对象后,调用 SnowflakeConnectorUtils.disablePushdownSession 静态方法,传入 SparkSession 对象。例如:

    SnowflakeConnectorUtils.disablePushdownSession(spark)
    
    Copy

    其中 sparkSparkSession 对象。

  2. 创建一个 DataFrame,将 autopushdown 选项设置为 off。例如:

    val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME)
      .options(sfOptions)
      .option("query", query)
      .option("autopushdown", "off")
      .load()
    
    Copy

    请注意,您还可以在传递给 options 方法的 Map 中设置 autopushdown 选项(例如,在上面示例中的 sfOptions 中)。

要在禁用下推后再次启用下推,请调用 SnowflakeConnectorUtils.enablePushdownSession 静态方法(传入 SparkSession 对象),并创建启用了 autopushdown 的 DataFrame。

将数据从 Snowflake 移动到 Spark

备注

使用 DataFrames 时,Snowflake 连接器仅支持 SELECT 查询。

要将数据从 Snowflake 读入 Spark DataFrame,请执行以下操作:

  1. 使用 SqlContext 对象的 read() 方法构造 DataFrameReader

  2. 使用 format() 方法指定 SNOWFLAKE_SOURCE_NAME。有关定义,请参阅 指定数据源类名 (本主题内容)。

  3. 使用 option()options() 方法指定连接器选项。有关更多信息,请参阅 为连接器设置配置选项 (本主题内容)。

  4. 为要读取的表数据指定以下选项之一:

    • dbtable:要读取的表的名称。检索所有列和记录(即相当于 SELECT * FROM db_table)。

    • query:要运行的确切查询(SELECT 语句)。

使用说明

  • 目前,在使用 DataFrames 时,连接器不支持其他类型的查询(例如,SHOW 或 DESC,或 DML 语句)。

  • 单行的大小有上限。有关更多详细信息,请参阅 查询文本大小的限制

性能注意事项

在 Snowflake 和 Spark 之间传输数据时,使用以下方法来分析/提高性能:

  • 使用 net.snowflake.spark.snowflake.Utils.getLastSelect() 方法查看将数据从 Snowflake 移动到 Spark 时发出的实际查询。

  • 如果您使用 Spark filterwhere 或 DataFrame 功能,请检查发出的 SQL 查询中是否存在相应的筛选器。Snowflake 连接器尝试将 Spark 请求的所有筛选器转换为 SQL。

    但是,当今的 Spark 基础设施无法将某些形式的筛选器传递给 Snowflake 连接器。因此,在某些情况下,会要求 Snowflake 提供大量不必要的记录。

  • 如果只需要列的子集,请确保在 SQL 查询中反映该子集。

  • 通常,如果发出的 SQL 查询与您基于 DataFrame 操作的预期不匹配,请使用 query 选项提供您想要的确切 SQL 语法。

示例

阅读整个表:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()
Copy

阅读查询结果:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1")
    .load()
Copy

将数据从 Spark 移动到 Snowflake

将 DataFrame 的内容保存到 Snowflake 表的步骤类似于从 Snowflake 写入 Spark:

  1. 使用 DataFramewrite() 方法构造 DataFrameWriter

  2. 使用 format() 方法指定 SNOWFLAKE_SOURCE_NAME。有关定义,请参阅 指定数据源类名 (本主题内容)。

  3. 使用 option()options() 方法指定连接器选项。有关更多信息,请参阅 为连接器设置配置选项 (本主题内容)。

  4. 使用 dbtable 选项指定数据要写入的表。

  5. 使用 mode() 方法指定内容的保存模式。

    有关更多信息,请参阅 SaveMode (https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/SaveMode.html) (Spark 文档)。

示例

df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

将 JSON 从 Spark 导出到 Snowflake

Spark DataFrames 可以包含 JSON 对象,序列化为字符串。以下代码提供了一个将常规 DataFrame 转换为包含 JSON 数据的 DataFrame 的示例:

val rdd = myDataFrame.toJSON
val schema = new StructType(Array(StructField("JSON", StringType)))
val jsonDataFrame = sqlContext.createDataFrame(
            rdd.map(s => Row(s)), schema)
Copy

请注意,生成的 jsonDataFrame 包含一个 StringType 类型的列。因此,当使用通用 SaveMode.Overwrite 模式将此 DataFrame 导出到 Snowflake 时,Snowflake 中会创建一个新表,其中包含一个 VARCHAR 类型的列。

要将 jsonDataFrame 加载到 VARIANT 列中,请执行以下操作:

  1. 创建一个 Snowflake 表(使用 Snowflake JDBC 驱动程序在 Java 中连接到 Snowflake)。有关示例中使用的连接参数的说明,请参阅 JDBC 驱动程序连接参数参考

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class SnowflakeJDBCExample {
      public static void main(String[] args) throws Exception {
        String jdbcUrl = "jdbc:snowflake://myorganization-myaccount.snowflakecomputing.cn/";
    
        Properties properties = new Properties();
        properties.put("user", "peter");
        properties.put("password", "test");
        properties.put("account", "myorganization-myaccount");
        properties.put("warehouse", "mywh");
        properties.put("db", "mydb");
        properties.put("schema", "public");
    
        // get connection
        System.out.println("Create JDBC connection");
        Connection connection = DriverManager.getConnection(jdbcUrl, properties);
        System.out.println("Done creating JDBC connection\n");
        // create statement
        System.out.println("Create JDBC statement");
        Statement statement = connection.createStatement();
        System.out.println("Done creating JDBC statement\n");
        // create a table
        System.out.println("Create my_variant_table table");
        statement.executeUpdate("create or replace table my_variant_table(json VARIANT)");
        statement.close();
        System.out.println("Done creating demo table\n");
    
        connection.close();
        System.out.println("Close connection\n");
      }
    }
    
    Copy
  2. 不使用 SaveMode.Overwrite,而是使用 SaveMode.Append 来重用现有表。当表示 JSON 的字符串值被加载到 Snowflake 中时,因为目标列是 VARIANT 类型,所以被解析为 JSON。例如:

    df.write
        .format(SNOWFLAKE_SOURCE_NAME)
        .options(sfOptions)
        .option("dbtable", "my_variant_table")
        .mode(SaveMode.Append)
        .save()
    
    Copy

执行 DDL/DML SQL 语句

除了查询之外,还可以使用 Utils 对象的 runQuery() 方法执行 DDL/DML SQL 语句,例如:

var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.cn",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
    )
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")
Copy

其中 sfOptions 是用于读/写 DataFrames 的参数映射。

runQuery 方法只返回 TRUE 或 FALSE。它适用于不返回结果集的语句,例如像 CREATE TABLE 这样的 DDL 语句和像 INSERTUPDATEDELETE 这样的 DML 语句。它对于返回结果集的语句(如 SELECTSHOW)没有用处。

使用时间戳和时区

Spark 仅提供一种时间戳,等同于 Scala/Java 时间戳类型。其行为几乎与 Snowflake 中的 TIMESTAMP_LTZ (本地时区)数据类型相同。因此,在 Spark 和 Snowflake 之间传输数据时,Snowflake 建议使用以下方法来正确保存相对于时区的时间:

  • 在 Snowflake 中仅使用 TIMESTAMP_LTZ 数据类型。

    备注

    默认的时间戳数据类型映射是 TIMESTAMP_NTZ (不含时区),因此 必须 显式设置 TIMESTAMP_TYPE_MAPPING 参数以使用 TIMESTAMP_LTZ。

  • 将 Spark 时区设置为 UTC,并在 Snowflake 中使用该时区(即不要为连接器设置 sfTimezone 选项,也不要在 Snowflake 中显式设置时区)。在这种情况下,TIMESTAMP_LTZ 和 TIMESTAMP_NTZ 实际上是等效的。

    要设置时区,请将以下行添加到 Spark 代码中:

    java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
    
    Copy

如果没有实现这两种方法,可能会发生不必要的时间修改。例如,请考虑以下情景:

  • Spark 中的时区设置为 America/New_York

  • Snowflake 中的时区设置为 Europe/Warsaw,这可以通过以下任一方式实现:

    • 将连接器的 sfTimezone 设置为 Europe/Warsaw

    • 将连接器的 sfTimezone 设置为 snowflake,并将 Snowflake 中的 TIMEZONE 会话参数设置为 Europe/Warsaw

  • TIMESTAMP_NTZ 和 TIMESTAMP_LTZ 都在 Snowflake 中使用。

在这种情况下:

  1. 如果将 Snowflake 中 TIMESTAMP_NTZ 列中代表 12:00:00 的值发送到 Spark,则该值不含任何时区信息。Spark 将该值视为纽约的 12:00:00

  2. 如果 Spark 在 12:00:00 (纽约时区)将该值发送回 Snowflake 以加载到 TIMESTAMP_LTZ 列中,则该值会自动转换并加载为 18:00:00 (华沙时区)。

  3. 如果该值随后在 Snowflake 中转换为 TIMESTAMP_NTZ,用户将看到 18:00:00,这与原始值 12:00:00 不同。

总而言之,Snowflake 建议 至少 严格遵循以下规则之一:

  • 对 Spark 和 Snowflake 使用 相同 的时区,最好是 UTC

  • 使用 TIMESTAMP_LTZ 数据类型在 Spark 和 Snowflake 之间传输数据。

Scala 示例程序

重要

此示例程序假设您使用的是 2.2.0(或更高版本)的连接器,该版本使用 Snowflake 内部暂存区来存储临时数据,因此不需要 S3 位置来存储临时数据。如果使用的是早期版本,则您必须拥有 S3 位置并包含 sfOptionstempdirawsAccessKeyawsSecretKey 值。有关更多详细信息,请参阅 用于外部数据传输的 AWS 选项 (本主题内容)。

以下 Scala 程序提供了 Snowflake Connector for Spark 的完整用例。使用代码之前,请按照 为连接器设置配置选项 (本主题内容)中所述,用适当的值替换以下字符串:

  • <account_identifier>账户标识符

  • <user_name><password>:Snowflake 用户的登录凭据。

  • <database><schema><warehouse>:Snowflake 会话的默认值。

Scala 示例程序使用基本身份验证(即用户名和密码)。如果您想使用 OAuth 进行身份验证,请参阅 使用 External OAuth (本主题内容)。

import org.apache.spark.sql._

//
// Configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.cn",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "select c1, count(*) from t1 group by c1")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

将连接器与 Python 结合使用

将连接器与 Python 结合使用与 Scala 的用法非常相似。

我们建议使用 Spark 发行版中包含的 bin/pyspark 脚本。

配置 pyspark 脚本

pyspark 脚本的配置必须与 spark-shell 脚本类似,使用 --packages--jars 选项。例如:

bin/pyspark --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3
Copy

不要忘记在 CLASSPATH 环境变量中包含 Snowflake Spark Connector 和 JDBC Connector .jar 文件。

有关配置 spark-shell 脚本的更多信息,请参阅 第 4 步:配置本地 Spark 集群或 Amazon EMR 托管的 Spark 环境

在会话中启用/禁用下推

连接器版本 2.1.0(及更高版本)支持查询下推,当 Snowflake 是 Spark 数据源时,通过将查询处理推送到 Snowflake,可以显著提高性能。

默认情况下,下推处于启用状态。

要在 Spark 会话中禁用给定 DataFrame 的下推,请执行以下操作:

  1. 实例化 SparkSession 对象后,调用 SnowflakeConnectorUtils.disablePushdownSession 静态方法,传入 SparkSession 对象。例如:

    sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
    
    Copy
  2. 创建一个 DataFrame,将 autopushdown 选项设置为 off。例如:

    df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
      .options(**sfOptions) \
      .option("query",  query) \
      .option("autopushdown", "off") \
      .load()
    
    Copy

    请注意,您还可以在传递给 options 方法的 Dictionary 中设置 autopushdown 选项(例如,在上面示例中的 sfOptions 中)。

要在禁用下推后再次启用下推,请调用 SnowflakeConnectorUtils.enablePushdownSession 静态方法(传入 SparkSession 对象),并创建启用了 autopushdown 的 DataFrame。

Python 示例脚本

重要

此示例脚本假设您使用的是 2.2.0(或更高版本)的连接器,该版本使用 Snowflake 内部暂存区来存储临时数据,因此不需要 S3 位置来存储这些数据。如果使用的是早期版本,则您必须拥有 S3 位置并包含 sfOptionstempdirawsAccessKeyawsSecretKey 值。有关更多详细信息,请参阅 用于外部数据传输的 AWS 选项 (本主题内容)。

配置 pyspark 脚本后,您可以执行 SQL 查询和其他操作。以下是执行简单 SQL 查询的 Python 示例脚本。此脚本说明了连接器的基本用法。本文档中的大多数 Scala 示例只需很少的工作量/修改即可在 Python 中使用。

Python 示例脚本使用基本身份验证(即用户名和密码)。如果您想使用 OAuth 进行身份验证,请参阅 使用 External OAuth (本主题内容)。

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.cn",
  "sfUser" : "<user_name>",
  "sfPassword" : "<password>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()
Copy

小技巧

注意 sfOptionsSNOWFLAKE_SOURCE_NAME 的用法。这简化了代码并降低了出错的概率。

有关 sfOptions 支持的选项的详细信息,请参阅 为连接器设置配置选项 (本主题内容)。

数据类型映射

Spark Connector 支持在许多常见数据类型之间进行转换。

从 Spark SQL 到 Snowflake

Spark |~|数据|~| 类型

Snowflake |~|数据|~| 类型

ArrayType

VARIANT

BinaryType

不支持

BooleanType

BOOLEAN

ByteType

INTEGER。Snowflake 不支持 BYTE 类型。

DateType

DATE

DecimalType

DECIMAL

DoubleType

DOUBLE

FloatType

FLOAT

IntegerType

INTEGER

LongType

INTEGER

MapType

VARIANT

ShortType

INTEGER

StringType

如果指定了长度,则为 VARCHAR(N);否则,为 VARCHAR

StructType

VARIANT

TimestampType

TIMESTAMP

从 Snowflake 到 Spark SQL

Snowflake |~|数据|~| 类型

Spark |~|数据|~| 类型

ARRAY

StringType

BIGINT

DecimalType(38, 0)

BINARY

不支持

BLOB

不支持

BOOLEAN

BooleanType

CHAR

StringType

CLOB

StringType

DATE

DateType

DECIMAL

DecimalType

DOUBLE

DoubleType

FLOAT

DoubleType

INTEGER

DecimalType(38, 0)

OBJECT

StringType

TIMESTAMP

TimestampType

TIME

StringType (Spark Connector 版本 2.4.14 或更高版本)

VARIANT

StringType

调用 DataFrame.show 方法

如果您要调用 DataFrame.show 方法并传入一个小于 DataFrame 中行数的数字,请构造一个只包含要按排序顺序显示的行的 DataFrame。

要这样做,请执行以下操作:

  1. 首先调用 sort 方法返回包含已排序行的 DataFrame。

  2. 对该 DataFrame 调用 limit 方法以返回仅包含要显示的行的 DataFrame 。

  3. 对返回的 DataFrame 调用 show 方法。

例如,如果您希望显示 5 行并希望结果按列 my_col 排序:

val dfWithRowsToShow = originalDf.sort("my_col").limit(5)
dfWithRowsToShow.show(5)
Copy

或者,如果调用 show 在 DataFrame 中显示行的子集,代码的不同执行可能会导致显示不同的行。

为连接器设置配置选项

以下小节列出了您为配置连接器行为而设置的选项:

要设置这些选项,请调用 Spark DataframeReader (https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/DataFrameReader.html) 类的 .option(<key>, <value>).options(<map>) 方法。

小技巧

为了便于使用选项,Snowflake 建议在单个 Map 对象中指定选项,并调用 .options(<map>) 来设置选项。

必需的连接选项

连接到 Snowflake 需要以下选项:

sfUrl

使用以下格式为账户指定 主机名

account_identifier.snowflakecomputing.cn

account_identifier 是您的 账户标识符

sfUser

Snowflake 用户的登录名。

还必须使用以下选项之一进行身份验证:

  • sfPassword

    Snowflake 用户的密码。

  • pem_private_key

    用于密钥对身份验证的私钥(PEM 格式)。有关说明,请参阅 密钥对身份验证和密钥对轮换

  • sfAuthenticator

    指定使用 外部 OAuth 对 Snowflake 进行身份验证。将该值设置为 oauth

    使用外部 OAuth 需要设置 sfToken 参数。

sfToken

(如果使用外部 OAuth,则为必填项)将该值设置为外部 OAuth 访问令牌。

此连接参数要求将 sfAuthenticator 参数值设置为 oauth

默认为无。

必需的上下文选项

设置会话的数据库和架构上下文需要以下选项:

sfDatabase

连接后用于会话的数据库。

sfSchema

连接后用于会话的架构。

其他上下文选项

本节中列出的选项不是必需的。

sfAccount

账户标识符(例如 myorganization-myaccount)。不再需要此选项,因为在 sfUrl 中指定了账户标识符。这里记录的目的只是为了向后兼容。

sfWarehouse

连接后用于会话的默认虚拟仓库。

sfRole

连接后用于会话的默认安全角色。

代理选项

本节中列出的选项不是必需的。

use_proxy

指定连接器是否应使用代理:

  • true 指定连接器应使用代理。

  • false 指定连接器不应使用代理。

默认值为 false

proxy_host

(如果 use_proxytrue,则为必填项)指定要使用的代理服务器的主机名。

proxy_port

(如果 use_proxytrue,则为必填项)指定要使用的代理服务器的端口号。

proxy_protocol

指定用于连接代理服务器的协议。您可以指定以下值中的一个:

  • http

  • https

默认值为 http

这仅支持 AWS 上的 Snowflake。

此选项是在 Spark Connector 版本 2.11.1 中添加的。

proxy_user

指定用于向代理服务器进行身份验证的用户名。如果代理服务器需要身份验证,请设置此项。

这仅支持 AWS 上的 Snowflake。

proxy_password

指定用于向代理服务器进行身份验证的 proxy_user 的密码。如果代理服务器需要身份验证,请设置此项。

这仅支持 AWS 上的 Snowflake。

non_proxy_hosts

指定连接器应绕过代理服务器直接连接的主机列表。

用 URL 转义竖线符号 (%7C) 分隔主机名。也可以使用星号 (*) 作为通配符

这仅支持 AWS 上的 Snowflake。

其他选项

本节中列出的选项不是必需的。

sfTimezone

使用 Spark 时由 Snowflake 使用的时区。请注意,该参数仅在 Snowflake 中设置时区;Spark 环境保持不变。支持的值为:

  • spark:使用 Spark 的时区(默认)。

  • snowflake:使用 Snowflake 的当前时区。

  • sf_default:使用正在连接的 Snowflake 用户的默认时区。

  • time_zone:如果有效,使用特定的时区(例如 America/New_York)。

    有关设置此选项的影响的更多信息,请参阅 使用时间戳和时区 (本主题内容)。

sfCompress

如果设置为 on (默认),则在 Snowflake 和 Spark 之间传递的数据将被压缩。

s3MaxFileSize

将数据从 Snowflake 移动到 Spark 时使用的文件大小。默认为 10MB。

preactions

在 Spark 和 Snowflake 之间传输数据之前执行的以分号分隔的 SQL 命令列表。

如果 SQL 命令包含 %s,则将 %s 替换为操作所引用的表名。

postactions

Spark 和 Snowflake 之间传输数据后执行的以分号分隔的 SQL 命令列表。

如果 SQL 命令包含 %s,则将其替换为操作所引用的表名。

truncate_columns

如果设置为 on (默认),COPY 命令会自动截断超过目标列长度的文本字符串。如果设置为 off,则当加载的字符串超过目标列长度时,该命令会产生错误。

truncate_table

此参数控制 Snowflake 在覆盖 Snowflake 目标表时是否保留该表的架构。

默认情况下,当覆盖 Snowflake 中的目标表时,该目标表的架构也会被覆盖;新架构基于源表的架构(Spark 数据框)。

但是,有时源代码的架构并不理想。例如,用户可能希望 Snowflake 目标表将来能够存储 FLOAT 值,即使初始源列的数据类型是 INTEGER。在这种情况下,不应覆盖 Snowflake 表的架构;只应截断 Snowflake 表,然后与其当前架构一起重复使用。

此参数可能的值为:

  • on

  • off

如果此参数为 on,则保留目标表的原始架构。如果此参数为 off,则忽略表的旧架构,并根据源代码的架构生成新架构。

此参数是可选的。

此参数的默认值为 off (即默认情况下,原始表架构被覆盖)。

有关将 Spark 数据类型映射到 Snowflake 数据类型(反之亦然)的详细信息,请参阅:数据类型映射 (本主题内容)。

continue_on_error

此变量控制用户输入无效数据时 COPY 命令是否中止(例如,变体数据类型列的无效 JSON 格式)。

可能的值:

  • on

  • off

on 表示即使发生错误也要继续。值 off 表示如果遇到错误,则中止。

此参数是可选的。

此参数的默认值为 off

不建议开启此选项。如果在使用 Spark Connector COPYing 到 Snowflake 时报告了任何错误,那么这很可能会导致数据丢失。

备注

如果行被拒绝或缺失,并且这些行在输入源中没有明显的错误,请将其报告给 Snowflake。

usestagingtable

此参数控制数据加载是否使用暂存表。

暂存表是由连接器创建的普通表(具有临时名称);如果数据加载操作成功,则删除原始目标表,并将暂存表重命名为原始目标表的名称。如果数据加载操作失败,则暂存表将被删除,目标表将保留其在操作前拥有的数据。因此,暂存表允许在操作失败时保留原始目标表数据。为了安全起见,Snowflake 强烈建议在大多数情况下使用暂存表。

为了使连接器创建暂存表,通过 Spark Connector 执行 COPY 的用户必须具有足够的权限才能创建表。如果用户没有创建表的权限,则直接加载(即不使用暂存表进行加载)很有用。

此参数可能的值为:

  • on

  • off

如果参数为 on,则使用暂存表。如果此参数为 off,则数据将直接加载到目标表中。

此参数是可选的。

此参数的默认值为 on (即使用暂存表)。

autopushdown

此参数控制是否启用自动查询下推。

如果启用了下推,则当在 Spark 上运行查询时,如果部分查询可以“向下推送”到 Snowflake 服务器,则会将其向下推送。这提高了某些查询的性能。

此参数是可选的。

如果连接器插入兼容版本的 Spark,默认值为 on。否则,默认值为 off

如果连接器插入的 Spark 版本与连接器预期的版本不同(例如,如果连接器的 3.2 版插入 Spark 的 3.3 版),则即使将此参数设置为 on,自动下推也会被禁用。

purge

如果将其设置为 on,则连接器将删除通过外部数据传输从 Spark 传输到 Snowflake 时创建的临时文件。如果将此参数设置为 off,则连接器不会自动删除这些文件。

清除仅适用于从 Spark 到 Snowflake 的传输,不适用于从 Snowflake 到 Spark 的传输。

可能的值:

  • on

  • off

默认值为 off

columnmap

此参数在将数据从 Spark 写入到 Snowflake 时很有用,而 Snowflake 表中的列名与 Spark 表中的列名不匹配。您可以创建一个映射,指明每个 Snowflake 目标列对应的 Spark 源。

该参数是单字符串字面量,形式为:

"Map(col_2 -> col_b, col_3 -> col_a)"

例如,请考虑以下情景:

  • Spark 中名为 df 的数据帧有三列:

    col_1col_2col_3

  • Snowflake 中名为 tb 的表有两列:

    col_acol_b

  • 您希望复制以下值:

    • df.col_2tb.col_b

    • df.col_3tb.col_a

columnmap 参数的值将是:

Map(col_2 -> col_b, col_3 -> col_a)

可以通过执行以下 Scala 代码生成这个值:

Map("col_2"->"col_b","col_3"->"col_a").toString()

此参数的默认值为空。换句话说,默认情况下,源表和目标表中的列名应该匹配。

此参数仅在从 Spark 写入 Snowflake 时使用;从 Snowflake 写入 Spark 时不适用。

keep_column_case

将表从 Spark 写入 Snowflake 时,除非列名使用双引号,否则 Spark Connector 默认将列名中的字母改为大写。

将表从 Snowflake 写入 Spark 时,Spark Connector 默认在包含除大写字母、下划线和数字之外的任何字符的任何列名前后添加双引号。

如果将 keep_column_case 设置为 on,那么 Spark Connector 将不会进行这些更改。

可能的值:

  • on

  • off

默认值为 off

column_mapping

连接器必须将 Spark 数据框中的列映射到 Snowflake 表。这可以根据列名(无论顺序如何)来完成,也可以根据列顺序来完成(即无论列名如何,数据框中的第一列都映射到表中的第一列)。

默认情况下,映射是根据顺序完成的。可以通过将此参数设置为 name 来替换此设置,这会告诉连接器根据列名映射列。(名称映射不区分大小写。)

此参数可能的值为:

  • order

  • name

默认值为 order

column_mismatch_behavior

此参数仅在 column_mapping 参数设置为 name 时适用。

如果 Spark 数据框和 Snowflake 表中的列名不匹配,那么:

  • 如果 column_mismatch_behaviorerror,则 Spark Connector 会报告错误。

  • 如果 column_mismatch_behaviorignore,则 Spark Connector 会忽略该错误。

    • 驱动程序会丢弃 Spark 数据框中在 Snowflake 表中没有相应列的任何列。

    • 驱动程序将 NULLs 插入 Snowflake 表中任何在 Spark 数据框中没有对应列的列。

潜在的错误包括:

  • Spark 数据框可能包含除大小写(大写/小写)之外相同的列。由于列名映射不区分大小写,因此无法确定从数据框到表的正确映射。

  • Snowflake 表可能包含除大小写(大写/小写)之外相同的列。由于列名映射不区分大小写,因此无法确定从数据框到表的正确映射。

  • Spark 数据框和 Snowflake 表可能没有共同的列名。理论上,Spark Connector 可以在每行的每列中插入 NULLs,但这通常没有意义,因此即使 column_mismatch_behavior 设置为 ignore,连接器也会抛出错误。

此参数可能的值为:

  • error

  • ignore

默认值为 error

time_output_format

此参数允许用户指定返回 TIME 数据的格式。

此参数的可能值是 时间格式 中指定的时间格式的可能值。

此参数仅影响输出,不影响输入。

timestamp_ntz_output_format.timestamp_ltz_output_format.timestamp_tz_output_format

这些选项指定时间戳值的输出格式。这些选项的默认值是:

配置选项

默认值

timestamp_ntz_output_format

"YYYY-MM-DD HH24:MI:SS.FF3"

timestamp_ltz_output_format

"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"

timestamp_tz_output_format

"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"

如果将这些选项设置为 "sf_current",则连接器将使用为会话指定的格式。

partition_size_in_mb

当查询结果集非常大且需要拆分为多个 DataFrame 分区时,使用此参数。此参数为每个 DataFrame 分区指定建议的未压缩大小。要减少分区数量,请增大此大小。

此大小用作推荐大小;分区的实际大小可以更小或更大。

仅当 use_copy_unload 参数为 FALSE 时,此选项才适用。

此参数是可选的。

默认值为 100 (MB)。

use_copy_unload

如果为 FALSE,Snowflake 在选择 SELECTing 时使用箭头数据格式。如果设置为 TRUE,则 Snowflake 将恢复使用 COPY UNLOAD 命令传输选定数据的旧行为。

此参数是可选的。

默认值为 FALSE

treat_decimal_as_long

如果为 TRUE,则将 Spark Connector 配置为针对返回 Decimal(precision, 0) 类型的查询返回 Long 值(而不是 BigDecimal 值)。

默认值为 FALSE

此选项是在 Spark Connector 版本 2.11.1 中添加的。

s3_stage_vpce_dns_name

指定用于访问内部暂存区的 VPC 端点的 DNS 名称。

此选项是在 Spark Connector 版本 2.11.1 中添加的。

support_share_connection

如果为 FALSE,则配置 Spark Connector 为使用相同 Spark Connector 选项访问 Snowflake 的每个作业或操作创建新的 JDBC 连接。

默认值为 TRUE,这意味着如果不同的作业和操作使用相同的 Spark Connector 选项访问 Snowflake,则它们共享相同的 JDBC 连接。

如果需要以编程方式启用或禁用此设置,请使用以下全局静态函数:

  • SparkConnectorContext.disableSharedConnection()

  • SparkConnectorContext.enableSharingJDBCConnection()

备注

在以下特殊情况下,Spark Connector 不使用共享 JDBC 连接:

  • 如果设置了前操作或后操作,并且这些前操作或后操作不是 CREATE TABLE、DROP TABLE 或 MERGE INTO,则 Spark Connector 不使用共享连接。

  • Utils 中的实用程序函数(例如 Utils.runQuery()Utils.getJDBCConnection())不使用共享连接。

此选项是在 Spark Connector 版本 2.11.2 中添加的。

force_skip_pre_post_action_check_for_shared_session

如果为 TRUE,则将 Spark Connector 配置为禁用会话共享的前操作和后操作验证。

默认值为 FALSE

重要

在设置此选项之前,请确保前操作和后操作中的查询不会影响会话设置。否则,结果可能会出现问题。

此选项是在 Spark Connector 版本 2.11.3 中添加的。

使用密钥对身份验证和密钥对轮换

Spark Connector 支持密钥对身份验证和密钥轮换。

  1. 要开始,请完成密钥对身份验证的初始配置,如 密钥对身份验证和密钥对轮换 所示。

  2. 使用 pem_private_key 连接选项发送私钥的 未加密 副本。

注意

出于安全原因,应该在从安全来源读取密钥后动态设置参数,而不是在应用程序中对 pem_private_key 进行硬编码。如果密钥已加密,则对其进行解密并发送解密后的版本。

在 Python 示例中,请注意 pem_private_key 文件 rsa_key.p8 是:

  • 使用环境变量 PRIVATE_KEY_PASSPHRASE 直接从受密码保护的文件中读取。

  • sfOptions 字符串中使用表达式 pkb

要进行连接,可以将 Python 示例保存到文件(即 <file.py>),然后执行以下命令:

spark-submit --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3 <file.py>
Copy

Python

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os

with open("<path>/rsa_key.p8", "rb") as key_file:
  p_key = serialization.load_pem_private_key(
    key_file.read(),
    password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
    backend = default_backend()
    )

pkb = p_key.private_bytes(
  encoding = serialization.Encoding.PEM,
  format = serialization.PrivateFormat.PKCS8,
  encryption_algorithm = serialization.NoEncryption()
  )

pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')

sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.cn",
  "sfUser" : "<user_name>",
  "pem_private_key" : pkb,
  "sfDatabase" : "<database>",
  "sfSchema" : "schema",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "COLORS") \
    .load()

df.show()
Copy

使用外部 OAuth

从 Spark Connector 版本 2.7.0 开始,可以使用 Scala 示例程序或 Python 示例脚本使用 外部 OAuth 对 Snowflake 进行身份验证。

在使用 External OAuth 和 Spark Connector对 Snowflake 进行身份验证之前,请为一个受支持的 External OAuth 授权服务器或 External OAuth 自定义客户端 配置 External OAuth 安全集成。

在 Scala 和 Python 示例中,请注意将 sfPassword 参数替换为 sfAuthenticatorsfToken 参数。

Scala:

// spark connector version

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME
import org.apache.spark.sql.DataFrame

var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.cn",
    "sfUser" -> "<username>",
    "sfAuthenticator" -> "oauth",
    "sfToken" -> "<external_oauth_access_token>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "region")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

Python:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.cn",
  "sfUser" : "<user_name>",
  "sfAuthenticator" : "oauth",
  "sfToken" : "<external_oauth_access_token>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()
Copy

用于外部数据传输的 AWS 选项

这些选项用于指定存储临时数据的 Amazon S3 位置,并为访问该位置提供身份验证详细信息。只有 在进行外部数据传输时才需要这些选项。如果满足以下任一条件,则需要使用外部数据传输:

  • 您使用的是 Spark Connector 的 2.1.x 或更低版本(不支持内部传输),或者

  • 您的传输可能需要 36 小时或更长时间(内部传输使用在 36 小时后过期的临时凭据)。

tempDir

存储中间数据的 S3 位置(例如 s3n://xy12345-bucket/spark-snowflake-tmp/)。

如果 tempDir 已指定,则还必须指定:

  • awsAccessKeyawsSecretKey .

  • temporary_aws_access_key_idtemporary_aws_secret_access_keytemporary_aws_session_token

awsAccessKeyawsSecretKey

这些是标准的 AWS 凭据,允许访问 tempDir 中指定的位置。请注意,这两个选项必须同时设置。

如果已设置,则可以从现有 SparkContext 对象中检索它们。

如果指定这些变量,则还必须指定 tempDir

还应为 Hadoop 集群设置这些凭据。

temporary_aws_access_key_idtemporary_aws_secret_access_keytemporary_aws_session_token

这些是允许访问 tempDir 中指定位置的临时 AWS 凭据。请注意,必须同时设置这三个选项。

此外,如果设置了这些选项,则它们优先于 awsAccessKeyawsSecretKey 选项。

如果指定 temporary_aws_access_key_idtemporary_aws_secret_access_keytemporary_aws_session_token,则还必须指定 tempDir。否则,这些参数将被忽略。

check_bucket_configuration

如果设置为 on (默认),则连接器检查用于数据传输的桶是否配置了生命周期策略(有关更多信息,请参阅 准备一个 AWS 外部 S3 桶)。如果不存在生命周期策略,则会记录警告。

禁用此选项(通过设置为 off)将跳过此检查。如果用户可以访问桶数据操作,但不能访问桶生命周期策略,则这会很有用。禁用该选项还可以稍微减少查询的执行时间。

有关详细信息,请参阅 对 S3 进行身份验证以进行数据交换 (本主题内容)。

用于外部数据传输的 Azure 选项

本节介绍在进行外部数据传输时适用于 Azure Blob 存储的参数。如果满足以下任一条件,则需要使用外部数据传输:

  • 您使用的是 Spark Connector 的 2.1.x 或更低版本(不支持内部传输),或者

  • 您的传输可能需要 36 小时或更长时间(内部传输使用在 36 小时后过期的临时凭据)。

当使用带有 Azure Blob 存储的外部传输时,使用下面描述的参数指定 Azure 容器的位置和该容器的 SAS(共享访问签名)。

tempDir

存储中间数据的 Azure Blob 存储容器。该容器采用 URL 的形式,例如:

wasb://<azure_container>@<azure_account>.<azure_endpoint>/

temporary_azure_sas_token

为 Azure Blob 存储指定 SAS 令牌。

有关详细信息,请参阅 对 Azure 进行身份验证以进行数据交换 (本主题内容)。

在 Spark 中为临时存储指定 Azure 信息

使用 Azure Blob 存储提供临时存储以在 Spark 和 Snowflake 之间传输数据时,必须向 Spark 以及 Snowflake Spark Connector 提供临时存储的位置和凭据。

要为 Spark 提供临时存储位置,请在 Spark 集群上执行与以下类似的命令:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Copy

请注意,最后一个命令包含以下变量:

  • <container> and <account>:这些是 Azure 部署的容器和账户名称。

  • <azure_endpoint>: This is the endpoint for your Azure deployment location. For example, if you are using an Azure US deployment, the endpoint is likely to be blob.core.windows.net

  • <azure_sas>:这是共享访问签名安全令牌。

将这些变量替换为 Azure Blob 存储账户的正确信息。

将 Snowflake 会话参数作为连接器的选项传递

适用于 Spark 的 Snowflake Connector 支持向 Snowflake 发送任意会话级别的参数(有关更多信息,请参阅 会话参数)。这可以通过向 options 对象添加 ("<key>" -> "<value>") 对来实现,其中 <key> 是会话参数名称,<value> 是值。

备注

<value> 应该是用双引号括起来的字符串,即使参数接受数字或布尔值(例如 "1""true")。

例如,以下代码示例传递值为 "false"USE_CACHED_RESULT 会话参数,该参数禁止使用以前执行的查询的结果:

// ... assuming sfOptions contains Snowflake connector options

// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")

// ... now use sfOptions with the .options() method
Copy

安全注意事项

客户应确保在多节点 Spark 系统中,节点之间的通信是安全的。Spark master 向 Spark worker 发送 Snowflake 凭据,以便这些 worker 可以访问 Snowflake 暂存区。如果 Spark master 和 Spark worker 之间的通信不安全,则未经授权的第三方可能会读取凭据。

对 S3 进行身份验证以进行数据交换

本节介绍在使用 S3 进行数据交换时如何进行身份验证。

只有 在下列任一情况下才需要执行此任务:

  • Snowflake Connector for Spark 版本为 2.1.x(或更低版本)。从 v2.2.0 开始,该连接器使用 Snowflake 内部临时暂存区进行数据交换。如果当前没有使用该连接器的版本 v2.2.0(或更高版本),Snowflake 强烈建议升级到最新版本。

  • Snowflake Connector for Spark 版本为 2.2.0(或更高版本),但您的作业时长通常会超过 36 小时。这是连接器使用 AWS 令牌访问内部暂存区进行数据交换的最大持续时间。

如果您使用的是旧版本的连接器,则需要准备一个 S3 位置,连接器可以使用该位置在 Snowflake 和 Spark 之间交换数据。

为了允许访问用于在 Spark 和 Snowflake 之间交换数据的 S3 桶/目录(如为 tempDir 指定的那样),支持两种身份验证方法:

  • 永久 AWS 凭据(也用于配置 Hadoop/Spark 身份验证以访问 S3)

  • 临时 AWS 凭据

使用永久 AWS 凭据

这是标准的 AWS 身份验证方法。需要一对 awsAccessKeyawsSecretKey 值。

备注

这些值还应用于配置 Hadoop/Spark 以访问 S3。有关更多信息(包括示例),请参阅 使用 S3A 或 S3N 对 Hadoop/Spark 进行身份验证 (本主题内容)。

例如:

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>")

// Then, configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.cn",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>",
    "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
    "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
    "tempdir" -> "s3n://<temp-bucket-name>"
)
Copy

有关 sfOptions 支持的选项的详细信息,请参阅 用于外部数据传输的 AWS 选项 (本主题内容)。

使用 S3A 或 S3N 对 Hadoop/Spark 进行身份验证

Hadoop/Spark 生态系统支持 2 种`访问 S3 <https://wiki.apache.org/hadoop/AmazonS3/ (https://wiki.apache.org/hadoop/AmazonS3/)>`_ 的 URI 架构:

s3a://

推荐的新方法(适用于 Hadoop 2.7 及更高版本)

要使用此方法,请修改本主题中的 Scala 示例,添加以下 Hadoop 配置选项:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", <accessKey>)
hadoopConf.set("fs.s3a.secret.key", <secretKey>)
Copy

确保 tempdir 选项也使用 s3a://

s3n://

较旧的方法(适用于 Hadoop 2.6 及更低版本)

在某些系统中,必须明确指定,如以下 Scala 示例所示:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>)
hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)
Copy

使用临时 AWS 凭据

此方法使用连接器的 temporary_aws_access_key_idtemporary_aws_secret_access_keytemporary_aws_session_token 配置选项。

此方法仅向 Snowflake 提供临时访问用于数据交换的 S3 桶/目录的权限,从而提高了安全性。

备注

临时凭据只能用于为连接器配置 S3 身份验证;不能用于配置 Hadoop/Spark 身份验证。

此外,如果您提供临时凭据,则它们优先于已提供的任何永久凭据。

以下 Scala 代码示例提供了使用临时凭据进行身份验证的示例:

import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest

import net.snowflake.spark.snowflake.Parameters

// ...

val sts_client = new AWSSecurityTokenServiceClient()
val session_token_request = new GetSessionTokenRequest()

// Set the token duration to 2 hours.

session_token_request.setDurationSeconds(7200)
val session_token_result = sts_client.getSessionToken(session_token_request)
val session_creds = session_token_result.getCredentials()

// Create a new set of Snowflake connector options, based on the existing
// sfOptions definition, with additional temporary credential options that override
// the credential options in sfOptions.
// Note that constants from Parameters are used to guarantee correct
// key names, but literal values, such as temporary_aws_access_key_id are, of course,
// also allowed.

var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions
sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId())
sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey())
sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())
Copy

sfOptions2 现在可以与 options() DataFrame 方法一起使用。

对 Azure 进行身份验证以进行数据交换

本节介绍在使用 Azure Blob 存储进行数据交换时如何进行身份验证。

只有 在下列任一情况下才需要以这种方式进行身份验证:

  • Snowflake Connector for Spark 版本为 2.1.x(或更低版本)。从 v2.2.0 开始,该连接器使用 Snowflake 内部临时暂存区进行数据交换。如果当前没有使用该连接器的版本 v2.2.0(或更高版本),Snowflake 强烈建议升级到最新版本。

  • Snowflake Connector for Spark 版本为 2.2.0(或更高版本),但您的作业时长通常会超过 36 小时。这是连接器使用的 Azure 令牌访问内部暂存区进行数据交换的最大持续时间。

需要准备一个 Azure Blob 存储容器,供该连接器用于在 Snowflake 和 Spark 之间交换数据。

使用 Azure 凭据

这是标准的 Azure Blob 存储身份验证方法。需要一对值:tempDir (一个 URL)和 temporary_azure_sas_token 值。

备注

这些值还应用于配置 Hadoop/Spark 以访问 Azure Blob 存储。有关更多信息(包括示例),请参阅 使用 Azure 对 Hadoop/Spark 进行身份验证 (本主题内容)。

例如:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

// Then, configure your Snowflake environment
//
val sfOptions = Map(
  "sfURL" -> "<account_identifier>.snowflakecomputing.cn",
  "sfUser" -> "<user_name>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database_name>",
  "sfSchema" -> "<schema_name>",
  "sfWarehouse" -> "<warehouse_name>",
  "sfCompress" -> "on",
  "sfSSL" -> "on",
  "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/",
  "temporary_azure_sas_token" -> "<azure_sas>"
)
Copy

有关 sfOptions 支持的选项的详细信息,请参阅 用于外部数据传输的 Azure 选项 (本主题内容)。

使用 Azure 对 Hadoop/Spark 进行身份验证

要使用此方法,请修改本主题中的 Scala 示例,添加以下 Hadoop 配置选项:

val hadoopConf = sc.hadoopConfiguration
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Copy

确保 tempdir 选项也使用 wasb://

不支持通过浏览器进行身份验证

使用 Spark Connector 时,使用任何会打开浏览器窗口要求用户提供凭据的身份验证形式都是不切实际的。该窗口不一定会出现在客户端计算机上。因此,Spark Connector 不支持任何类型的身份验证,包括会调用浏览器窗口的 MFA (多重身份验证)或 SSO (单点登录)。

语言: 中文