使用 Spark Connector

该连接器遵循标准的 Spark API,但增加了特定于 Snowflake 的选项,这将在本主题中介绍。

在本主题中,术语 COPY 指的是以下两者:

  • COPY INTO <table> (used to transfer data from an internal or external stage into a table).
  • COPY INTO <location> (used to transfer data from a table into an internal or external stage).

使用 SnowCD 验证与 Snowflake 的网络连接

After configuring your driver, you can evaluate and troubleshoot your network connectivity to Snowflake using SnowCD.

可以在初始配置过程中使用 SnowCD,也可以根据需要随时使用,以评估与 Snowflake 的网络连接并进行故障排除。

下推

Spark Connector 通过捕获和分析 SQL 运算的 Spark 逻辑计划来应用谓词和查询下推。当数据源是 Snowflake 时,运算被转换为 SQL 查询,然后在 Snowflake 中执行以提高性能。

但是,由于这种转换几乎需要将 Spark SQL 运算符一对一地转换为 Snowflake 表达式,因此并非所有 Spark SQL 运算符都可以下推。当下推失败时,连接器会退回到优化程度较低的执行计划。不支持的运算改为在 Spark 中执行。

Note

If you need pushdown for all operations, consider writing your code to use Snowpark API instead.

以下是支持的下推运算列表(以下所有函数都使用其 Spark 名称)。如果某个函数不在此列表中,则使用该函数的 Spark 计划可能会在 Spark 上执行,而不是下推到 Snowflake 中。

  • 聚合函数

    • Average
    • Corr
    • CovPopulation
    • CovSample
    • Count
    • Max
    • Min
    • StddevPop
    • StddevSamp
    • Sum
    • VariancePop
    • VarianceSamp
  • 布尔运算符

    • And
    • Between
    • Contains
    • EndsWith
    • EqualTo
    • GreaterThan
    • GreaterThanOrEqual
    • In
    • IsNull
    • IsNotNull
    • LessThan
    • LessThanOrEqual
    • Not
    • Or
    • StartsWith
  • 日期、时间和时间戳函数

    • DateAdd
    • DateSub
    • Month
    • Quarter
    • TruncDate
    • TruncTimestamp
    • Year
  • 数学函数

    • 算术运算符“+”(加法)、“-”(减法)、“*”(乘法)、“/”(除法)和“-”(一元否定)。
    • Abs
    • Acos
    • Asin
    • Atan
    • Ceil
    • CheckOverflow
    • Cos
    • Cosh
    • Exp
    • Floor
    • Greatest
    • Least
    • Log
    • Pi
    • Pow
    • PromotePrecision
    • Rand
    • Round
    • Sin
    • Sinh
    • Sqrt
    • Tan
    • Tanh
  • 其他运算符

    • 别名(AS 表达式)
    • BitwiseAnd
    • BitwiseNot
    • BitwiseOr
    • BitwiseXor
    • CaseWhen
    • Cast(child, t, _)
    • Coalesce
    • If
    • MakeDecimal
    • ScalarSubquery
    • ShiftLeft
    • ShiftRight
    • SortOrder
    • UnscaledValue
  • 关系运算符

    • 聚合函数和分组依据子句
    • Distinct
    • Filters
    • In
    • InSet
    • Joins
    • Limits
    • Projections
    • Sorts (ORDER BY)
    • Union 和 Union All
    • 窗口函数和开窗子句
  • 字符串函数

    • Ascii
    • Concat(children)
    • Length
    • Like
    • Lower
    • StringLPad
    • StringRPad
    • StringTranslate
    • StringTrim
    • StringTrimLeft
    • StringTrimRight
    • Substring
    • Upper
  • 窗口函数(注意:这些不适用于 Spark 2.2)

    • DenseRank
    • Rank
    • RowNumber

在 Scala 中使用连接器

指定数据源类名

To use Snowflake as a data source in Spark, use the .format option to provide the Snowflake connector class name that defines the data source.

net.snowflake.spark.snowflake

为了确保对类名进行编译时检查,Snowflake 强烈建议为类名定义一个变量。例如:

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

Also, for convenience, the Utils class provides the variable, which can be imported as follows:

import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME

Note

All examples in this topic use SNOWFLAKE_SOURCE_NAME as the class definition.

在会话中启用/禁用下推

连接器版本 2.1.0(及更高版本)支持查询下推,当 Snowflake 是 Spark 数据源时,通过将查询处理推送到 Snowflake,可以显著提高性能。

默认情况下,下推处于启用状态。

要在 Spark 会话中禁用给定 DataFrame 的下推,请执行以下操作:

  1. After instantiating a SparkSession object, call the SnowflakeConnectorUtils.disablePushdownSession static method, passing in the SparkSession object. For example:

    SnowflakeConnectorUtils.disablePushdownSession(spark)

    Where spark is your SparkSession object.

  2. Create a DataFrame with the autopushdown option set to off. For example:

    val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME)
      .options(sfOptions)
      .option("query", query)
      .option("autopushdown", "off")
      .load()

    Note that you can also set the autopushdown option in a Map that you pass to the options method (e.g. in sfOptions in the example above).

To enable pushdown again after disabling it, call the SnowflakeConnectorUtils.enablePushdownSession static method (passing in the SparkSession object), and create a DataFrame with autopushdown enabled.

将数据从 Snowflake 移动到 Spark

Note

使用 DataFrames 时,Snowflake 连接器仅支持 SELECT 查询。

要将数据从 Snowflake 读入 Spark DataFrame,请执行以下操作:

  1. Use the read() method of the SqlContext object to construct a DataFrameReader.
  2. Specify SNOWFLAKE_SOURCE_NAME using the format() method. For the definition, see 指定数据源类名 (in this topic).
  3. Specify the connector options using either the option() or options() method. For more information, see 为连接器设置配置选项 (in this topic).
  4. 为要读取的表数据指定以下选项之一:
    • dbtable: The name of the table to be read. All columns and records are retrieved (i.e. it is equivalent to SELECT * FROM db_table).
    • query: The exact query (SELECT statement) to run.

使用说明

  • 目前,在使用 DataFrames 时,连接器不支持其他类型的查询(例如,SHOW 或 DESC,或 DML 语句)。
  • There is an upper limit to the size of an individual row. For more details, see Limits on Query Text Size.

性能注意事项

在 Snowflake 和 Spark 之间传输数据时,使用以下方法来分析/提高性能:

  • Use the net.snowflake.spark.snowflake.Utils.getLastSelect() method to see the actual query issued when moving data from Snowflake to Spark.
  • If you use the filter or where functionality of the Spark DataFrame, check that the respective filters are present in the issued SQL query. The Snowflake connector tries to translate all the filters requested by Spark to SQL.

但是,当今的 Spark 基础设施无法将某些形式的筛选器传递给 Snowflake 连接器。因此,在某些情况下,会要求 Snowflake 提供大量不必要的记录。

  • 如果只需要列的子集,请确保在 SQL 查询中反映该子集。
  • In general, if the SQL query issued does not match what you expect based on the DataFrame operations, use the query option to provide the exact SQL syntax you want.

示例

阅读整个表:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

阅读查询结果:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1")
    .load()

将数据从 Spark 移动到 Snowflake

将 DataFrame 的内容保存到 Snowflake 表的步骤类似于从 Snowflake 写入 Spark:

  1. Use the write() method of the DataFrame to construct a DataFrameWriter.

  2. Specify SNOWFLAKE_SOURCE_NAME using the format() method. For the definition, see 指定数据源类名 (in this topic).

  3. Specify the connector options using either the option() or options() method. For more information, see 为连接器设置配置选项 (in this topic).

  4. Use the dbtable option to specify the table to which data is written.

  5. Use the mode() method to specify the save mode for the content.

    For more information, see SaveMode (https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/SaveMode.html) (Spark documentation).

示例

df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

将 JSON 从 Spark 导出到 Snowflake

Spark DataFrames 可以包含 JSON 对象,序列化为字符串。以下代码提供了一个将常规 DataFrame 转换为包含 JSON 数据的 DataFrame 的示例:

val rdd = myDataFrame.toJSON
val schema = new StructType(Array(StructField("JSON", StringType)))
val jsonDataFrame = sqlContext.createDataFrame(
            rdd.map(s => Row(s)), schema)

Note that the resulting jsonDataFrame contains a single column of type StringType. As a result, when this DataFrame is exported to Snowflake with the common SaveMode.Overwrite mode, a new table in Snowflake is created with a single column of type VARCHAR.

To load jsonDataFrame into a VARIANT column:

  1. Create a Snowflake table (connecting to Snowflake in Java using the Snowflake JDBC Driver). For explanations of the connection parameters used in the example, see JDBC Driver connection parameter reference.

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class SnowflakeJDBCExample {
      public static void main(String[] args) throws Exception {
     String jdbcUrl = "jdbc:snowflake://myorganization-myaccount.snowflakecomputing.cn/";
    
     Properties properties = new Properties();
     properties.put("user", "peter");
     properties.put("password", "test");
     properties.put("account", "myorganization-myaccount");
     properties.put("warehouse", "mywh");
     properties.put("db", "mydb");
     properties.put("schema", "public");
    
     // get connection
     System.out.println("Create JDBC connection");
     Connection connection = DriverManager.getConnection(jdbcUrl, properties);
     System.out.println("Done creating JDBC connection\n");
     // create statement
     System.out.println("Create JDBC statement");
     Statement statement = connection.createStatement();
     System.out.println("Done creating JDBC statement\n");
     // create a table
     System.out.println("Create my_variant_table table");
     statement.executeUpdate("create or replace table my_variant_table(json VARIANT)");
     statement.close();
     System.out.println("Done creating demo table\n");
    
     connection.close();
     System.out.println("Close connection\n");
      }
    }
  2. Instead of using SaveMode.Overwrite, use SaveMode.Append, to reuse the existing table. When the string value representing JSON is loaded into Snowflake, because the target column is of type VARIANT, it is parsed as JSON. For example:

    df.write
     .format(SNOWFLAKE_SOURCE_NAME)
     .options(sfOptions)
     .option("dbtable", "my_variant_table")
     .mode(SaveMode.Append)
     .save()

执行 DDL/DML SQL 语句

Use the runQuery() method of the Utils object to execute DDL/DML SQL statements, in addition to queries, for example:

var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.cn",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
    )
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")

where sfOptions is the parameters map used to read/write DataFrames.

The runQuery method returns only TRUE or FALSE. It is intended for statements that do not return a result set, for example DDL statements like CREATE TABLE and DML statements like INSERT, UPDATE, and DELETE. It is not useful for statements that return a result set, such as SELECT or SHOW.

使用时间戳和时区

Spark 仅提供一种时间戳,等同于 Scala/Java 时间戳类型。其行为几乎与 Snowflake 中的 TIMESTAMP_LTZ (本地时区)数据类型相同。因此,在 Spark 和 Snowflake 之间传输数据时,Snowflake 建议使用以下方法来正确保存相对于时区的时间:

  • 在 Snowflake 中仅使用 TIMESTAMP_LTZ 数据类型。

    Note

    The default timestamp data type mapping is TIMESTAMP_NTZ (no time zone), so you must explicitly set the TIMESTAMP_TYPE_MAPPING parameter to use TIMESTAMP_LTZ.

  • Set the Spark time zone to UTC and use this time zone in Snowflake (i.e. don’t set the sfTimezone option for the connector, and don’t explicitly set a time zone in Snowflake). In this scenario, TIMESTAMP_LTZ and TIMESTAMP_NTZ are effectively equivalent.

要设置时区,请将以下行添加到 Spark 代码中:

java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))

如果没有实现这两种方法,可能会发生不必要的时间修改。例如,请考虑以下情景:

  • The time zone in Spark is set to America/New_York.

  • The time zone in Snowflake is set to Europe/Warsaw, which can happen by either:

    • Setting sfTimezone to Europe/Warsaw for the connector.
    • Setting sfTimezone to snowflake for the connector and setting the TIMEZONE session parameter in Snowflake to Europe/Warsaw.
  • TIMESTAMP_NTZ 和 TIMESTAMP_LTZ 都在 Snowflake 中使用。

在这种情况下:

  1. If a value representing 12:00:00 in a TIMESTAMP_NTZ column in Snowflake is sent to Spark, this value doesn’t carry any time zone information. Spark treats the value as 12:00:00 in New York.
  2. If Spark sends this value 12:00:00 (in New York) back to Snowflake to be loaded into a TIMESTAMP_LTZ column, it is automatically converted and loaded as 18:00:00 (for the Warsaw time zone).
  3. If this value is then converted to TIMESTAMP_NTZ in Snowflake, the user sees 18:00:00, which is different from the original value, 12:00:00.

To summarize, Snowflake recommends strictly following at least one of these rules:

  • Use the same time zone, ideally UTC, for both Spark and Snowflake.
  • Use only the TIMESTAMP_LTZ data type for transferring data between Spark and Snowflake.

Scala 示例程序

Important

This sample program assumes you are using version 2.2.0 (or higher) of the connector, which uses a Snowflake internal stage for storing temporary data and, therefore, does not require an S3 location for storing temporary data. If you are using an earlier version, you must have an existing S3 location and include values for tempdir, awsAccessKey, awsSecretKey for sfOptions. For more details, see 用于外部数据传输的 AWS 选项 (in this topic).

The following Scala program provides a full use case for the Snowflake Connector for Spark. Before using the code, replace the following strings with the appropriate values, as described in 为连接器设置配置选项 (in this topic):

  • <account_identifier>: Your account identifier.
  • <user_name> , <password>: Login credentials for the Snowflake user.
  • <database> , <schema> , <warehouse>: Defaults for the Snowflake session.

The sample Scala program uses basic authentication (i.e. username and password). If you wish to authenticate with OAuth, see Using External OAuth (in this topic).

import org.apache.spark.sql._

//
// Configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.cn",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "select c1, count(*) from t1 group by c1")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

将连接器与 Python 结合使用

将连接器与 Python 结合使用与 Scala 的用法非常相似。

We recommend using the bin/pyspark script included in the Spark distribution.

Configuring the pyspark Script

The pyspark script must be configured similarly to the spark-shell script, using the --packages or --jars options. For example:

bin/pyspark --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3

不要忘记在 CLASSPATH 环境变量中包含 Snowflake Spark Connector 和 JDBC Connector .jar 文件。

For more information about configuring the spark-shell script, see Step 4: Configure the Local Spark Cluster or Amazon EMR-hosted Spark Environment.

在会话中启用/禁用下推

连接器版本 2.1.0(及更高版本)支持查询下推,当 Snowflake 是 Spark 数据源时,通过将查询处理推送到 Snowflake,可以显著提高性能。

默认情况下,下推处于启用状态。

要在 Spark 会话中禁用给定 DataFrame 的下推,请执行以下操作:

  1. After instantiating a SparkSession object, call the SnowflakeConnectorUtils.disablePushdownSession static method, passing in the SparkSession object. For example:

    sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
  2. Create a DataFrame with the autopushdown option set to off. For example:

    df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
      .options(**sfOptions) \
      .option("query",  query) \
      .option("autopushdown", "off") \
      .load()

    Note that you can also set the autopushdown option in a Dictionary that you pass to the options method (e.g. in sfOptions in the example above).

To enable pushdown again after disabling it, call the SnowflakeConnectorUtils.enablePushdownSession static method (passing in the SparkSession object), and create a DataFrame with autopushdown enabled.

Python 示例脚本

Important

This sample script assumes you are using version 2.2.0 (or higher) of the connector, which uses a Snowflake internal stage for storing temporary data and, therefore, does not require an S3 location for storing this data. If you are using an earlier version, you must have an existing S3 location and include values for tempdir, awsAccessKey, awsSecretKey for sfOptions. For more details, see 用于外部数据传输的 AWS 选项 (in this topic).

Once the pyspark script has been configured, you can perform SQL queries and other operations. Here’s an example Python script that performs a simple SQL query. This script illustrates basic connector usage. Most of the Scala examples in this document can be adapted with minimal effort/changes for use with Python.

The sample Python script uses basic authentication (i.e. username and password). If you wish to authenticate with OAuth, see Using External OAuth (in this topic).

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("Simple App").getOrCreate()

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.cn",
  "sfUser" : "<user_name>",
  "sfPassword" : "<password>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>",
  "sfRole" : "Accountadmin"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()

Tip

Note the usage of sfOptions and SNOWFLAKE_SOURCE_NAME. This simplifies the code and reduces the chance of errors.

For details about the supported options for sfOptions, see 为连接器设置配置选项 (in this topic).

数据类型映射

Spark Connector 支持在许多常见数据类型之间进行转换。

从 Spark SQL 到 Snowflake

Spark Data TypeSnowflake Data Type
ArrayTypeVARIANT
BinaryTypeNot supported
BooleanTypeBOOLEAN
ByteTypeINTEGER. Snowflake does not support the BYTE type.
DateTypeDATE
DecimalTypeDECIMAL
DoubleTypeDOUBLE
FloatTypeFLOAT
IntegerTypeINTEGER
LongTypeINTEGER
MapTypeVARIANT
ShortTypeINTEGER
StringTypeIf length is specified, VARCHAR(N); otherwise, VARCHAR
StructTypeVARIANT
TimestampTypeTIMESTAMP

从 Snowflake 到 Spark SQL

Snowflake Data TypeSpark Data Type
ARRAYStringType
BIGINTDecimalType(38, 0)
BINARYNot supported
BLOBNot supported
BOOLEANBooleanType
CHARStringType
CLOBStringType
DATEDateType
DECIMALDecimalType
DOUBLEDoubleType
FLOATDoubleType
INTEGERDecimalType(38, 0)
OBJECTStringType
TIMESTAMPTimestampType
TIMEStringType (Spark Connector Version 2.4.14 or later)
VARIANTStringType

调用 DataFrame.show 方法

If you are calling the DataFrame.show method and passing in a number that is less than the number of rows in the DataFrame, construct a DataFrame that just contains the rows to show in a sorted order.

要这样做,请执行以下操作:

  1. Call the sort method first to return a DataFrame that contains sorted rows.
  2. Call the limit method on that DataFrame to return a DataFrame that just contains the rows that you want to show.
  3. Call the show method on the returned DataFrame.

For example, if you want to show 5 rows and want the results sorted by the column my_col:

val dfWithRowsToShow = originalDf.sort("my_col").limit(5)
dfWithRowsToShow.show(5)

Otherwise, if you call show to display a subset of rows in the DataFrame, different executions of the code might result in different rows being shown.

为连接器设置配置选项

以下小节列出了您为配置连接器行为而设置的选项:

To set these options, call the .option(<key>, <value>) or .options(<map>) method of the Spark DataframeReader (https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/DataFrameReader.html) class.

Tip

To facilitate using the options, Snowflake recommends specifying the options in a single Map object and calling .options(<map>) to set the options.

必需的连接选项

连接到 Snowflake 需要以下选项:

sfUrl

使用以下格式为账户指定 主机名

account_identifier.snowflakecomputing.cn

account_identifier is your account identifier.

sfUser

Snowflake 用户的登录名。

还必须使用以下选项之一进行身份验证:

  • sfPassword

Snowflake 用户的密码。

  • pem_private_key

    Private key (in PEM format) for key pair authentication. For instructions, see Key-pair authentication and key-pair rotation.

  • sfAuthenticator

    Specifies using External OAuth to authenticate to Snowflake. Set the value to oauth.

    Using External OAuth requires setting the sfToken parameter.

sfToken

(如果使用外部 OAuth,则为必填项)将该值设置为外部 OAuth 访问令牌。

This connection parameter requires setting the sfAuthenticator parameter value to oauth.

默认为无。

必需的上下文选项

设置会话的数据库和架构上下文需要以下选项:

sfDatabase

连接后用于会话的数据库。

sfSchema

连接后用于会话的架构。

其他上下文选项

本节中列出的选项不是必需的。

sfAccount

Account identifier (e.g. myorganization-myaccount). This option is no longer required because the account identifier is specified in sfUrl. It is documented here only for backward compatibility.

sfWarehouse

连接后用于会话的默认虚拟仓库。

sfRole

连接后用于会话的默认安全角色。

代理选项

本节中列出的选项不是必需的。

use_proxy

指定连接器是否应使用代理:

  • true specifies that the connector should use a proxy.
  • false specifies that the connector should not use a proxy.

The default value is false.

proxy_host

(Required if use_proxy is true) Specifies the hostname of the proxy server to use.

proxy_port

(Required if use_proxy is true) Specifies the port number of the proxy server to use.

proxy_protocol

指定用于连接代理服务器的协议。您可以指定以下值中的一个:

  • http
  • https

The default value is http.

这仅支持 AWS 上的 Snowflake。

此选项是在 Spark Connector 版本 2.11.1 中添加的。

proxy_user

指定用于向代理服务器进行身份验证的用户名。如果代理服务器需要身份验证,请设置此项。

这仅支持 AWS 上的 Snowflake。

proxy_password

Specifies the password of proxy_user for authenticating to the proxy server. Set this if the proxy server requires authentication.

这仅支持 AWS 上的 Snowflake。

non_proxy_hosts

指定连接器应绕过代理服务器直接连接的主机列表。

Separate the hostnames with a URL-escaped pipe symbol (%7C). You can also use an asterisk (*) as a wildcard

这仅支持 AWS 上的 Snowflake。

其他选项

本节中列出的选项不是必需的。

sfTimezone

使用 Spark 时由 Snowflake 使用的时区。请注意,该参数仅在 Snowflake 中设置时区;Spark 环境保持不变。支持的值为:

  • spark: Use the time zone from Spark (default).

  • snowflake: Use the current time zone for Snowflake.

  • sf_default: Use the default time zone for the Snowflake user who is connecting.

  • time_zone: Use a specific time zone (e.g. America/New_York), if valid.

    For more information about the impact of setting this option, see 使用时间戳和时区 (in this topic).

sfCompress

If set to on (default), the data passed between Snowflake and Spark is compressed.

s3MaxFileSize

将数据从 Snowflake 移动到 Spark 时使用的文件大小。默认为 10MB。

preactions

在 Spark 和 Snowflake 之间传输数据之前执行的以分号分隔的 SQL 命令列表。

If a SQL command contains %s, the %s is replaced with the table name referenced for the operation.

postactions

Spark 和 Snowflake 之间传输数据后执行的以分号分隔的 SQL 命令列表。

If a SQL command contains %s, it is replaced with the table name referenced for the operation.

truncate_columns

If set to on (default), a COPY command automatically truncates text strings that exceed the target column length. If set to off, the command produces an error if a loaded string exceeds the target column length.

truncate_table

此参数控制 Snowflake 在覆盖 Snowflake 目标表时是否保留该表的架构。

默认情况下,当覆盖 Snowflake 中的目标表时,该目标表的架构也会被覆盖;新架构基于源表的架构(Spark 数据框)。

但是,有时源代码的架构并不理想。例如,用户可能希望 Snowflake 目标表将来能够存储 FLOAT 值,即使初始源列的数据类型是 INTEGER。在这种情况下,不应覆盖 Snowflake 表的架构;只应截断 Snowflake 表,然后与其当前架构一起重复使用。

此参数可能的值为:

  • on
  • off

If this parameter is on, the original schema of the target table is kept. If this parameter is off, then the old schema of the table is ignored, and a new schema is generated based on the schema of the source.

此参数是可选的。

The default value of this parameter is off (i.e. by default the original table schema is overwritten).

For details about mapping Spark data types to Snowflake data types (and vice versa), see: 数据类型映射 (in this topic).

continue_on_error

此变量控制用户输入无效数据时 COPY 命令是否中止(例如,变体数据类型列的无效 JSON 格式)。

可能的值:

  • on
  • off

The value on means continue even if an error occurs. The value off means abort if an error is hit.

此参数是可选的。

The default value of this parameter is off.

不建议开启此选项。如果在使用 Spark Connector COPYing 到 Snowflake 时报告了任何错误,那么这很可能会导致数据丢失。

Note

如果行被拒绝或缺失,并且这些行在输入源中没有明显的错误,请将其报告给 Snowflake。

usestagingtable

此参数控制数据加载是否使用暂存表。

暂存表是由连接器创建的普通表(具有临时名称);如果数据加载操作成功,则删除原始目标表,并将暂存表重命名为原始目标表的名称。如果数据加载操作失败,则暂存表将被删除,目标表将保留其在操作前拥有的数据。因此,暂存表允许在操作失败时保留原始目标表数据。为了安全起见,Snowflake 强烈建议在大多数情况下使用暂存表。

为了使连接器创建暂存表,通过 Spark Connector 执行 COPY 的用户必须具有足够的权限才能创建表。如果用户没有创建表的权限,则直接加载(即不使用暂存表进行加载)很有用。

此参数可能的值为:

  • on
  • off

If the parameter is on, a staging table is used. If this parameter is off, then the data is loaded directly into the target table.

此参数是可选的。

The default value of this parameter is on (i.e. use a staging table).

autopushdown

此参数控制是否启用自动查询下推。

如果启用了下推,则当在 Spark 上运行查询时,如果部分查询可以“向下推送”到 Snowflake 服务器,则会将其向下推送。这提高了某些查询的性能。

此参数是可选的。

The default value is on if the connector is plugged into a compatible version of Spark. Otherwise, the default value is off.

If the connector is plugged into a different version of Spark than the connector is intended for (e.g. if version 3.2 of the connector is plugged into version 3.3 of Spark), then auto-pushdown is disabled even if this parameter is set to on.

purge

If this is set to on, then the connector deletes temporary files created when transferring from Spark to Snowflake via external data transfer. If this parameter is set to off, then those files are not automatically deleted by the connector.

清除仅适用于从 Spark 到 Snowflake 的传输,不适用于从 Snowflake 到 Spark 的传输。

可能的值:

  • on
  • off

The default value is off.

columnmap

此参数在将数据从 Spark 写入到 Snowflake 时很有用,而 Snowflake 表中的列名与 Spark 表中的列名不匹配。您可以创建一个映射,指明每个 Snowflake 目标列对应的 Spark 源。

该参数是单字符串字面量,形式为:

"Map(col_2 -> col_b, col_3 -> col_a)"

例如,请考虑以下情景:

  • A Dataframe named df in Spark has three columns: col_1 , col_2 , col_3

  • A table named tb in Snowflake has two columns: col_a , col_b

  • 您希望复制以下值:

    • From df.col_2 to tb.col_b.
    • From df.col_3 to tb.col_a.

The value of the columnmap parameter would be:

Map(col_2 -> col_b, col_3 -> col_a)

可以通过执行以下 Scala 代码生成这个值:

Map("col_2"->"col_b","col_3"->"col_a").toString()

此参数的默认值为空。换句话说,默认情况下,源表和目标表中的列名应该匹配。

此参数仅在从 Spark 写入 Snowflake 时使用;从 Snowflake 写入 Spark 时不适用。

keep_column_case

将表从 Spark 写入 Snowflake 时,除非列名使用双引号,否则 Spark Connector 默认将列名中的字母改为大写。

将表从 Snowflake 写入 Spark 时,Spark Connector 默认在包含除大写字母、下划线和数字之外的任何字符的任何列名前后添加双引号。

If you set keep_column_case to on, then the Spark connector will not make these changes.

可能的值:

  • on
  • off

The default value is off.

column_mapping

连接器必须将 Spark 数据框中的列映射到 Snowflake 表。这可以根据列名(无论顺序如何)来完成,也可以根据列顺序来完成(即无论列名如何,数据框中的第一列都映射到表中的第一列)。

By default, the mapping is done based on order. You can override that by setting this parameter to name, which tells the connector to map columns based on column names. (The name mapping is case-insensitive.)

此参数可能的值为:

  • order
  • name

The default value is order.

column_mismatch_behavior

This parameter applies only when the column_mapping parameter is set to name.

如果 Spark 数据框和 Snowflake 表中的列名不匹配,那么:

  • If column_mismatch_behavior is error, then the Spark Connector reports an error.
  • If column_mismatch_behavior is ignore, then the Spark Connector ignores the error.
    • The driver discards any column in the Spark data frame that does not have a corresponding column in the Snowflake table.
    • The driver inserts NULLs into any column in the Snowflake table that does not have a corresponding column in the Spark data frame.

潜在的错误包括:

  • The Spark data frame could contain columns that are identical except for case (uppercase/lowercase). Because column name mapping is case-insensitive, it is not possible to determine the correct mapping from the data frame to the table.
  • The Snowflake table could contain columns that are identical except for case (uppercase/lowercase). Because column name mapping is case-insensitive, it is not possible to determine the correct mapping from the data frame to the table.
  • The Spark data frame and the Snowflake table might have no column names in common. In theory, the Spark Connector could insert NULLs into every column of every row, but this is usually pointless, so the connector throws an error even if the column_mismatch_behavior is set to ignore.

此参数可能的值为:

  • error
  • ignore

The default value is error.

time_output_format

This parameter allows the user to specify the format for TIME data returned.

The possible values of this parameter are the possible values for time formats specified at Time formats.

此参数仅影响输出,不影响输入。

timestamp_ntz_output_format,
timestamp_ltz_output_format,
timestamp_tz_output_format

这些选项指定时间戳值的输出格式。这些选项的默认值是:

Configuration OptionDefault Value
timestamp_ntz_output_format"YYYY-MM-DD HH24:MI:SS.FF3"
timestamp_ltz_output_format"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"
timestamp_tz_output_format"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"
If these options are set to "sf_current", the connector uses the formats specified for the session.
partition_size_in_mb

当查询结果集非常大且需要拆分为多个 DataFrame 分区时,使用此参数。此参数为每个 DataFrame 分区指定建议的未压缩大小。要减少分区数量,请增大此大小。

此大小用作推荐大小;分区的实际大小可以更小或更大。

This option applies only when the use_copy_unload parameter is FALSE.

此参数是可选的。

The default value is 100 (MB).

use_copy_unload

If this is FALSE, Snowflake uses the Arrow data format when SELECTing data. If this is set to TRUE, then Snowflake reverts to the old behavior of using the COPY UNLOAD command to transmit selected data.

此参数是可选的。

The default value is FALSE.

treat_decimal_as_long

If TRUE, configures the Spark Connector to return Long values (rather than BigDecimal values) for queries that return the type Decimal(precision, 0).

The default value is FALSE.

此选项是在 Spark Connector 版本 2.11.1 中添加的。

s3_stage_vpce_dns_name

指定用于访问内部暂存区的 VPC 端点的 DNS 名称。

此选项是在 Spark Connector 版本 2.11.1 中添加的。

support_share_connection

If FALSE, configures the Spark Connector to create a new JDBC connection for each job or action that uses the same Spark Connector options to access Snowflake.

The default value is TRUE, which means that the different jobs and actions share the same JDBC connection if they use the same Spark Connector options to access Snowflake.

如果需要以编程方式启用或禁用此设置,请使用以下全局静态函数:

  • SparkConnectorContext.disableSharedConnection()
  • SparkConnectorContext.enableSharingJDBCConnection()

Note

在以下特殊情况下,Spark Connector 不使用共享 JDBC 连接:

  • If preactions or postactions are set, and those preactions or postactions are not CREATE TABLE, DROP TABLE, or MERGE INTO, the Spark Connector does not use the shared connection.
  • Utility functions in Utils such as Utils.runQuery() and Utils.getJDBCConnection() do not use the shared connection.

此选项是在 Spark Connector 版本 2.11.2 中添加的。

force_skip_pre_post_action_check_for_shared_session

If TRUE, configures the Spark Connector to disable the validation of preactions and postactions for session sharing.

The default value is FALSE.

Important

在设置此选项之前,请确保前操作和后操作中的查询不会影响会话设置。否则,结果可能会出现问题。

此选项是在 Spark Connector 版本 2.11.3 中添加的。

使用密钥对身份验证和密钥对轮换

Spark Connector 支持密钥对身份验证和密钥轮换。

  1. To start, complete the initial configuration for key pair authentication as shown in Key-pair authentication and key-pair rotation.
  2. Send an unencrypted copy of the private key using the pem_private_key connection option.

Attention

For security reasons, rather than hard-coding the pem_private_key in your application, you should set the parameter dynamically after reading the key from a secure source. If the key is encrypted, then decrypt it and send the decrypted version.

In the Python example, note that the pem_private_key file, rsa_key.p8, is:

  • Being read directly from a password-protected file, using the environment variable PRIVATE_KEY_PASSPHRASE.
  • Using the expression pkb in the sfOptions string.

To connect, you can save the Python example to a file (i.e. <file.py>) and then execute the following command:

spark-submit --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3 <file.py>

Python

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os

with open("<path>/rsa_key.p8", "rb") as key_file:
  p_key = serialization.load_pem_private_key(
    key_file.read(),
    password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
    backend = default_backend()
    )

pkb = p_key.private_bytes(
  encoding = serialization.Encoding.PEM,
  format = serialization.PrivateFormat.PKCS8,
  encryption_algorithm = serialization.NoEncryption()
  )

pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')

sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.cn",
  "sfUser" : "<user_name>",
  "pem_private_key" : pkb,
  "sfDatabase" : "<database>",
  "sfSchema" : "schema",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "COLORS") \
    .load()

df.show()

使用外部 OAuth

Starting with Spark Connector version 2.7.0, you can use External OAuth to authenticate to Snowflake using either the sample Scala program or the sample Python script.

Before using External OAuth and the Spark Connector to authenticate to Snowflake, configure an External OAuth security integration for one of the supported External OAuth authorization servers or an External OAuth custom client.

In the Scala and Python examples, note the replacement of the sfPassword parameter with the sfAuthenticator and sfToken parameters.

Scala:

// spark connector version

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME
import org.apache.spark.sql.DataFrame

var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.cn",
    "sfUser" -> "<username>",
    "sfAuthenticator" -> "oauth",
    "sfToken" -> "<external_oauth_access_token>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "region")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Python:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.cn",
  "sfUser" : "<user_name>",
  "sfAuthenticator" : "oauth",
  "sfToken" : "<external_oauth_access_token>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()

用于外部数据传输的 AWS 选项

These options are used to specify the Amazon S3 location where temporary data is stored and provide authentication details for accessing the location. They are required only if you are doing an external data transfer. External data transfers are required if either of the following is true:

  • You are using version 2.1.x or lower of the Spark Connector (which does not support internal transfers), or
  • Your transfer is likely to take 36 hours or more (internal transfers use temporary credentials that expire after 36 hours).
tempDir

The S3 location where intermediate data is stored (e.g. s3n://xy12345-bucket/spark-snowflake-tmp/).

If tempDir is specified, you must also specify either:

  • awsAccessKey , awsSecretKey
    or
  • temporary_aws_access_key_id , temporary_aws_secret_access_key, temporary_aws_session_token
awsAccessKey , awsSecretKey

These are standard AWS credentials that allow access to the location specified in tempDir. Note that both of these options must be set together.

If they are set, they can be retrieved from the existing SparkContext object.

If you specify these variables, you must also specify tempDir.

还应为 Hadoop 集群设置这些凭据。

temporary_aws_access_key_id , temporary_aws_secret_access_key, temporary_aws_session_token

These are temporary AWS credentials that allow access to the location specified in tempDir. Note that all three of these options must be set together.

Also, if these options are set, they take precedence over the awsAccessKey and awsSecretKey options.

If you specify temporary_aws_access_key_id , temporary_aws_secret_access_key, and temporary_aws_session_token , you must also specify tempDir. Otherwise, these parameters are ignored.

check_bucket_configuration

If set to on (default), the connector checks if the bucket used for data transfer has a lifecycle policy configured (see Preparing an AWS External S3 Bucket for more information). If there is no lifecycle policy present, a warning is logged.

Disabling this option (by setting to off) skips this check. This can be useful if a user can access the bucket data operations, but not the bucket lifecycle policies. Disabling the option can also speed up query execution times slightly.

For details, see 对 S3 进行身份验证以进行数据交换 (in this topic).

用于外部数据传输的 Azure 选项

本节介绍在进行外部数据传输时适用于 Azure Blob 存储的参数。如果满足以下任一条件,则需要使用外部数据传输:

  • You are using version 2.1.x or lower of the Spark Connector (which does not support internal transfers), or
  • Your transfer is likely to take 36 hours or more (internal transfers use temporary credentials that expire after 36 hours).

当使用带有 Azure Blob 存储的外部传输时,使用下面描述的参数指定 Azure 容器的位置和该容器的 SAS(共享访问签名)。

tempDir

存储中间数据的 Azure Blob 存储容器。该容器采用 URL 的形式,例如:

wasb://<azure_container>@<azure_account>.<azure_endpoint>/

temporary_azure_sas_token

为 Azure Blob 存储指定 SAS 令牌。

For details, see 对 Azure 进行身份验证以进行数据交换 (in this topic).

在 Spark 中为临时存储指定 Azure 信息

使用 Azure Blob 存储提供临时存储以在 Spark 和 Snowflake 之间传输数据时,必须向 Spark 以及 Snowflake Spark Connector 提供临时存储的位置和凭据。

要为 Spark 提供临时存储位置,请在 Spark 集群上执行与以下类似的命令:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

请注意,最后一个命令包含以下变量:

  • <container> and <account>: These are the container and account name for your Azure deployment.
  • <azure_endpoint>: This is the endpoint for your Azure deployment location. For example, if you are using an Azure US deployment, the endpoint is likely to be blob.core.windows.net.
  • <azure_sas>: This is the Shared Access Signature security token.

将这些变量替换为 Azure Blob 存储账户的正确信息。

将 Snowflake 会话参数作为连接器的选项传递

The Snowflake Connector for Spark supports sending arbitrary session-level parameters to Snowflake (see Session parameters for more info). This can be achieved by adding a ("<key>" -> "<value>") pair to the options object, where <key> is the session parameter name and <value> is the value.

Note

The <value> should be a string enclosed in double quotes, even for parameters that accept numbers or Boolean values (e.g. "1" or "true").

For example, the following code sample passes the USE_CACHED_RESULT session parameter with a value of "false", which disables using the results of previously-executed queries:

// ... assuming sfOptions contains Snowflake connector options

// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")

// ... now use sfOptions with the .options() method

安全注意事项

客户应确保在多节点 Spark 系统中,节点之间的通信是安全的。Spark master 向 Spark worker 发送 Snowflake 凭据,以便这些 worker 可以访问 Snowflake 暂存区。如果 Spark master 和 Spark worker 之间的通信不安全,则未经授权的第三方可能会读取凭据。

对 S3 进行身份验证以进行数据交换

本节介绍在使用 S3 进行数据交换时如何进行身份验证。

This task is required only in either of the following circumstances:

  • Snowflake Connector for Spark 版本为 2.1.x(或更低版本)。从 v2.2.0 开始,该连接器使用 Snowflake 内部临时暂存区进行数据交换。如果当前没有使用该连接器的版本 v2.2.0(或更高版本),Snowflake 强烈建议升级到最新版本。
  • Snowflake Connector for Spark 版本为 2.2.0(或更高版本),但您的作业时长通常会超过 36 小时。这是连接器使用 AWS 令牌访问内部暂存区进行数据交换的最大持续时间。

如果您使用的是旧版本的连接器,则需要准备一个 S3 位置,连接器可以使用该位置在 Snowflake 和 Spark 之间交换数据。

To allow access to the S3 bucket/directory used to exchange data between Spark and Snowflake (as specified for tempDir), two authentication methods are supported:

  • 永久 AWS 凭据(也用于配置 Hadoop/Spark 身份验证以访问 S3)
  • 临时 AWS 凭据

使用永久 AWS 凭据

This is the standard AWS authentication method. It requires a pair of awsAccessKey and awsSecretKey values.

Note

These values should also be used to configure Hadoop/Spark for accessing S3. For more information, including examples, see 使用 S3A 或 S3N 对 Hadoop/Spark 进行身份验证 (in this topic).

例如:

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>")

// Then, configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.cn",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>",
    "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
    "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
    "tempdir" -> "s3n://<temp-bucket-name>"
)

For details about the options supported by sfOptions, see 用于外部数据传输的 AWS 选项 (in this topic).

使用 S3A 或 S3N 对 Hadoop/Spark 进行身份验证

Hadoop/Spark ecosystems support 2 URI schemes for accessing S3 (https://wiki.apache.org/hadoop/AmazonS3/):

s3a://

推荐的新方法(适用于 Hadoop 2.7 及更高版本)

要使用此方法,请修改本主题中的 Scala 示例,添加以下 Hadoop 配置选项:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", <accessKey>)
hadoopConf.set("fs.s3a.secret.key", <secretKey>)

Make sure the tempdir option uses s3a:// as well.

s3n://

较旧的方法(适用于 Hadoop 2.6 及更低版本)

在某些系统中,必须明确指定,如以下 Scala 示例所示:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>)
hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)

使用临时 AWS 凭据

This method uses the temporary_aws_access_key_id, temporary_aws_secret_access_key, and temporary_aws_session_token configuration options for the connector.

此方法仅向 Snowflake 提供临时访问用于数据交换的 S3 桶/目录的权限,从而提高了安全性。

Note

临时凭据只能用于为连接器配置 S3 身份验证;不能用于配置 Hadoop/Spark 身份验证。

此外,如果您提供临时凭据,则它们优先于已提供的任何永久凭据。

以下 Scala 代码示例提供了使用临时凭据进行身份验证的示例:

import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest

import net.snowflake.spark.snowflake.Parameters

// ...

val sts_client = new AWSSecurityTokenServiceClient()
val session_token_request = new GetSessionTokenRequest()

// Set the token duration to 2 hours.

session_token_request.setDurationSeconds(7200)
val session_token_result = sts_client.getSessionToken(session_token_request)
val session_creds = session_token_result.getCredentials()

// Create a new set of Snowflake connector options, based on the existing
// sfOptions definition, with additional temporary credential options that override
// the credential options in sfOptions.
// Note that constants from Parameters are used to guarantee correct
// key names, but literal values, such as temporary_aws_access_key_id are, of course,
// also allowed.

var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions
sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId())
sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey())
sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())

sfOptions2 can now be used with the options() DataFrame method.

对 Azure 进行身份验证以进行数据交换

本节介绍在使用 Azure Blob 存储进行数据交换时如何进行身份验证。

Authenticating this way is required only in either of the following circumstances:

  • The Snowflake Connector for Spark version is 2.1.x (or lower). Starting with v2.2.0, the connector uses a Snowflake internal temporary stage for data exchange. If you are not currently using version 2.2.0 (or higher) of the connector, Snowflake strongly recommends upgrading to the latest version.
  • The Snowflake Connector for Spark version is 2.2.0 (or higher), but your jobs regularly exceed 36 hours in length. This is the maximum duration for the Azure token used by the connector to access the internal stage for data exchange.

需要准备一个 Azure Blob 存储容器,供该连接器用于在 Snowflake 和 Spark 之间交换数据。

使用 Azure 凭据

This is the standard Azure Blob storage authentication method. It requires a pair of values: tempDir (a URL) and temporary_azure_sas_token values.

Note

These values should also be used to configure Hadoop/Spark for accessing Azure Blob storage. For more information, including examples, see 使用 Azure 对 Hadoop/Spark 进行身份验证 (in this topic).

例如:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

// Then, configure your Snowflake environment
//
val sfOptions = Map(
  "sfURL" -> "<account_identifier>.snowflakecomputing.cn",
  "sfUser" -> "<user_name>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database_name>",
  "sfSchema" -> "<schema_name>",
  "sfWarehouse" -> "<warehouse_name>",
  "sfCompress" -> "on",
  "sfSSL" -> "on",
  "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/",
  "temporary_azure_sas_token" -> "<azure_sas>"
)

For details about the options supported by sfOptions, see 用于外部数据传输的 Azure 选项 (in this topic).

使用 Azure 对 Hadoop/Spark 进行身份验证

要使用此方法,请修改本主题中的 Scala 示例,添加以下 Hadoop 配置选项:

val hadoopConf = sc.hadoopConfiguration
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

Make sure the tempdir option uses wasb:// as well.

不支持通过浏览器进行身份验证

使用 Spark Connector 时,使用任何会打开浏览器窗口要求用户提供凭据的身份验证形式都是不切实际的。该窗口不一定会出现在客户端计算机上。因此,Spark Connector 不支持任何类型的身份验证,包括会调用浏览器窗口的 MFA (多重身份验证)或 SSO (单点登录)。