使用 Spark Connector¶
该连接器遵循标准的 Spark API,但增加了特定于 Snowflake 的选项,这将在本主题中介绍。
在本主题中,术语 COPY 指的是以下两者:
- COPY INTO <table> (used to transfer data from an internal or external stage into a table).
- COPY INTO <location> (used to transfer data from a table into an internal or external stage).
使用 SnowCD 验证与 Snowflake 的网络连接¶
After configuring your driver, you can evaluate and troubleshoot your network connectivity to Snowflake using SnowCD.
可以在初始配置过程中使用 SnowCD,也可以根据需要随时使用,以评估与 Snowflake 的网络连接并进行故障排除。
下推
Spark Connector 通过捕获和分析 SQL 运算的 Spark 逻辑计划来应用谓词和查询下推。当数据源是 Snowflake 时,运算被转换为 SQL 查询,然后在 Snowflake 中执行以提高性能。
但是,由于这种转换几乎需要将 Spark SQL 运算符一对一地转换为 Snowflake 表达式,因此并非所有 Spark SQL 运算符都可以下推。当下推失败时,连接器会退回到优化程度较低的执行计划。不支持的运算改为在 Spark 中执行。
Note
If you need pushdown for all operations, consider writing your code to use Snowpark API instead.
以下是支持的下推运算列表(以下所有函数都使用其 Spark 名称)。如果某个函数不在此列表中,则使用该函数的 Spark 计划可能会在 Spark 上执行,而不是下推到 Snowflake 中。
-
聚合函数
- Average
- Corr
- CovPopulation
- CovSample
- Count
- Max
- Min
- StddevPop
- StddevSamp
- Sum
- VariancePop
- VarianceSamp
-
布尔运算符
- And
- Between
- Contains
- EndsWith
- EqualTo
- GreaterThan
- GreaterThanOrEqual
- In
- IsNull
- IsNotNull
- LessThan
- LessThanOrEqual
- Not
- Or
- StartsWith
-
日期、时间和时间戳函数
- DateAdd
- DateSub
- Month
- Quarter
- TruncDate
- TruncTimestamp
- Year
-
数学函数
- 算术运算符“+”(加法)、“-”(减法)、“*”(乘法)、“/”(除法)和“-”(一元否定)。
- Abs
- Acos
- Asin
- Atan
- Ceil
- CheckOverflow
- Cos
- Cosh
- Exp
- Floor
- Greatest
- Least
- Log
- Pi
- Pow
- PromotePrecision
- Rand
- Round
- Sin
- Sinh
- Sqrt
- Tan
- Tanh
-
其他运算符
- 别名(AS 表达式)
- BitwiseAnd
- BitwiseNot
- BitwiseOr
- BitwiseXor
- CaseWhen
- Cast(child, t, _)
- Coalesce
- If
- MakeDecimal
- ScalarSubquery
- ShiftLeft
- ShiftRight
- SortOrder
- UnscaledValue
-
关系运算符
- 聚合函数和分组依据子句
- Distinct
- Filters
- In
- InSet
- Joins
- Limits
- Projections
- Sorts (ORDER BY)
- Union 和 Union All
- 窗口函数和开窗子句
-
字符串函数
- Ascii
- Concat(children)
- Length
- Like
- Lower
- StringLPad
- StringRPad
- StringTranslate
- StringTrim
- StringTrimLeft
- StringTrimRight
- Substring
- Upper
-
窗口函数(注意:这些不适用于 Spark 2.2)
- DenseRank
- Rank
- RowNumber
在 Scala 中使用连接器¶
指定数据源类名
To use Snowflake as a data source in Spark, use the .format option to provide the Snowflake connector class name that defines the data source.
net.snowflake.spark.snowflake
为了确保对类名进行编译时检查,Snowflake 强烈建议为类名定义一个变量。例如:
Also, for convenience, the Utils class provides the variable, which can be imported as follows:
Note
All examples in this topic use SNOWFLAKE_SOURCE_NAME as the class definition.
在会话中启用/禁用下推
连接器版本 2.1.0(及更高版本)支持查询下推,当 Snowflake 是 Spark 数据源时,通过将查询处理推送到 Snowflake,可以显著提高性能。
默认情况下,下推处于启用状态。
要在 Spark 会话中禁用给定 DataFrame 的下推,请执行以下操作:
-
After instantiating a
SparkSessionobject, call theSnowflakeConnectorUtils.disablePushdownSessionstatic method, passing in theSparkSessionobject. For example:Where
sparkis yourSparkSessionobject. -
Create a DataFrame with the autopushdown option set to
off. For example:Note that you can also set the
autopushdownoption in aMapthat you pass to theoptionsmethod (e.g. insfOptionsin the example above).
To enable pushdown again after disabling it, call the SnowflakeConnectorUtils.enablePushdownSession static method
(passing in the SparkSession object), and create a DataFrame with autopushdown enabled.
将数据从 Snowflake 移动到 Spark¶
Note
使用 DataFrames 时,Snowflake 连接器仅支持 SELECT 查询。
要将数据从 Snowflake 读入 Spark DataFrame,请执行以下操作:
- Use the
read()method of theSqlContextobject to construct aDataFrameReader. - Specify
SNOWFLAKE_SOURCE_NAMEusing theformat()method. For the definition, see 指定数据源类名 (in this topic). - Specify the connector options using either the
option()oroptions()method. For more information, see 为连接器设置配置选项 (in this topic). - 为要读取的表数据指定以下选项之一:
dbtable: The name of the table to be read. All columns and records are retrieved (i.e. it is equivalent toSELECT * FROM db_table).query: The exact query (SELECT statement) to run.
使用说明
- 目前,在使用 DataFrames 时,连接器不支持其他类型的查询(例如,SHOW 或 DESC,或 DML 语句)。
- There is an upper limit to the size of an individual row. For more details, see Limits on Query Text Size.
性能注意事项
在 Snowflake 和 Spark 之间传输数据时,使用以下方法来分析/提高性能:
- Use the
net.snowflake.spark.snowflake.Utils.getLastSelect()method to see the actual query issued when moving data from Snowflake to Spark. - If you use the
filterorwherefunctionality of the Spark DataFrame, check that the respective filters are present in the issued SQL query. The Snowflake connector tries to translate all the filters requested by Spark to SQL.
但是,当今的 Spark 基础设施无法将某些形式的筛选器传递给 Snowflake 连接器。因此,在某些情况下,会要求 Snowflake 提供大量不必要的记录。
- 如果只需要列的子集,请确保在 SQL 查询中反映该子集。
- In general, if the SQL query issued does not match what you expect based on the DataFrame operations, use the
queryoption to provide the exact SQL syntax you want.
示例
阅读整个表:
阅读查询结果:
将数据从 Spark 移动到 Snowflake¶
将 DataFrame 的内容保存到 Snowflake 表的步骤类似于从 Snowflake 写入 Spark:
-
Use the
write()method of theDataFrameto construct aDataFrameWriter. -
Specify
SNOWFLAKE_SOURCE_NAMEusing theformat()method. For the definition, see 指定数据源类名 (in this topic). -
Specify the connector options using either the
option()oroptions()method. For more information, see 为连接器设置配置选项 (in this topic). -
Use the
dbtableoption to specify the table to which data is written. -
Use the
mode()method to specify the save mode for the content.For more information, see SaveMode (https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/SaveMode.html) (Spark documentation).
示例
将 JSON 从 Spark 导出到 Snowflake¶
Spark DataFrames 可以包含 JSON 对象,序列化为字符串。以下代码提供了一个将常规 DataFrame 转换为包含 JSON 数据的 DataFrame 的示例:
Note that the resulting jsonDataFrame contains a single column of type StringType. As a result, when this DataFrame is exported to Snowflake with the common SaveMode.Overwrite mode, a new table in Snowflake is created with a single column of type VARCHAR.
To load jsonDataFrame into a VARIANT column:
-
Create a Snowflake table (connecting to Snowflake in Java using the Snowflake JDBC Driver). For explanations of the connection parameters used in the example, see JDBC Driver connection parameter reference.
-
Instead of using
SaveMode.Overwrite, useSaveMode.Append, to reuse the existing table. When the string value representing JSON is loaded into Snowflake, because the target column is of type VARIANT, it is parsed as JSON. For example:
执行 DDL/DML SQL 语句¶
Use the runQuery() method of the Utils object to execute DDL/DML SQL statements, in addition to queries, for example:
where sfOptions is the parameters map used to read/write DataFrames.
The runQuery method returns only TRUE or FALSE. It is intended for statements that do not return a result set,
for example DDL statements like CREATE TABLE and DML statements like INSERT, UPDATE, and DELETE.
It is not useful for statements that return a result set, such as SELECT or SHOW.
使用时间戳和时区
Spark 仅提供一种时间戳,等同于 Scala/Java 时间戳类型。其行为几乎与 Snowflake 中的 TIMESTAMP_LTZ (本地时区)数据类型相同。因此,在 Spark 和 Snowflake 之间传输数据时,Snowflake 建议使用以下方法来正确保存相对于时区的时间:
-
在 Snowflake 中仅使用 TIMESTAMP_LTZ 数据类型。
Note
The default timestamp data type mapping is TIMESTAMP_NTZ (no time zone), so you must explicitly set the TIMESTAMP_TYPE_MAPPING parameter to use TIMESTAMP_LTZ.
-
Set the Spark time zone to
UTCand use this time zone in Snowflake (i.e. don’t set thesfTimezoneoption for the connector, and don’t explicitly set a time zone in Snowflake). In this scenario, TIMESTAMP_LTZ and TIMESTAMP_NTZ are effectively equivalent.
要设置时区,请将以下行添加到 Spark 代码中:
如果没有实现这两种方法,可能会发生不必要的时间修改。例如,请考虑以下情景:
-
The time zone in Spark is set to
America/New_York. -
The time zone in Snowflake is set to
Europe/Warsaw, which can happen by either:- Setting
sfTimezonetoEurope/Warsawfor the connector. - Setting
sfTimezonetosnowflakefor the connector and setting the TIMEZONE session parameter in Snowflake toEurope/Warsaw.
- Setting
-
TIMESTAMP_NTZ 和 TIMESTAMP_LTZ 都在 Snowflake 中使用。
在这种情况下:
- If a value representing
12:00:00in a TIMESTAMP_NTZ column in Snowflake is sent to Spark, this value doesn’t carry any time zone information. Spark treats the value as12:00:00in New York. - If Spark sends this value
12:00:00(in New York) back to Snowflake to be loaded into a TIMESTAMP_LTZ column, it is automatically converted and loaded as18:00:00(for the Warsaw time zone). - If this value is then converted to TIMESTAMP_NTZ in Snowflake, the user sees
18:00:00, which is different from the original value,12:00:00.
To summarize, Snowflake recommends strictly following at least one of these rules:
- Use the same time zone, ideally
UTC, for both Spark and Snowflake. - Use only the TIMESTAMP_LTZ data type for transferring data between Spark and Snowflake.
Scala 示例程序¶
Important
This sample program assumes you are using version 2.2.0 (or higher) of the connector, which uses a Snowflake internal stage for storing temporary data and, therefore, does not require an S3 location for
storing temporary data. If you are using an earlier version, you must have an existing S3 location and include values for tempdir, awsAccessKey, awsSecretKey for sfOptions.
For more details, see 用于外部数据传输的 AWS 选项 (in this topic).
The following Scala program provides a full use case for the Snowflake Connector for Spark. Before using the code, replace the following strings with the appropriate values, as described in 为连接器设置配置选项 (in this topic):
<account_identifier>: Your account identifier.<user_name>,<password>: Login credentials for the Snowflake user.<database>,<schema>,<warehouse>: Defaults for the Snowflake session.
The sample Scala program uses basic authentication (i.e. username and password). If you wish to authenticate with OAuth, see Using External OAuth (in this topic).
将连接器与 Python 结合使用¶
将连接器与 Python 结合使用与 Scala 的用法非常相似。
We recommend using the bin/pyspark script included in the Spark distribution.
Configuring the pyspark Script¶
The pyspark script must be configured similarly to the spark-shell script, using the --packages or --jars options. For example:
不要忘记在 CLASSPATH 环境变量中包含 Snowflake Spark Connector 和 JDBC Connector .jar 文件。
For more information about configuring the spark-shell script, see Step 4: Configure the Local Spark Cluster or Amazon EMR-hosted Spark Environment.
在会话中启用/禁用下推
连接器版本 2.1.0(及更高版本)支持查询下推,当 Snowflake 是 Spark 数据源时,通过将查询处理推送到 Snowflake,可以显著提高性能。
默认情况下,下推处于启用状态。
要在 Spark 会话中禁用给定 DataFrame 的下推,请执行以下操作:
-
After instantiating a
SparkSessionobject, call theSnowflakeConnectorUtils.disablePushdownSessionstatic method, passing in theSparkSessionobject. For example: -
Create a DataFrame with the autopushdown option set to
off. For example:Note that you can also set the
autopushdownoption in aDictionarythat you pass to theoptionsmethod (e.g. insfOptionsin the example above).
To enable pushdown again after disabling it, call the SnowflakeConnectorUtils.enablePushdownSession static method
(passing in the SparkSession object), and create a DataFrame with autopushdown enabled.
Python 示例脚本¶
Important
This sample script assumes you are using version 2.2.0 (or higher) of the connector, which uses a Snowflake internal stage for storing temporary data and, therefore, does not require an S3 location for
storing this data. If you are using an earlier version, you must have an existing S3 location and include values for tempdir, awsAccessKey, awsSecretKey for sfOptions. For more
details, see 用于外部数据传输的 AWS 选项 (in this topic).
Once the pyspark script has been configured, you can perform SQL queries and other operations. Here’s an example Python script that performs a simple SQL query. This script illustrates basic connector
usage. Most of the Scala examples in this document can be adapted with minimal effort/changes for use with Python.
The sample Python script uses basic authentication (i.e. username and password). If you wish to authenticate with OAuth, see Using External OAuth (in this topic).
Tip
Note the usage of sfOptions and SNOWFLAKE_SOURCE_NAME. This simplifies the code and reduces the chance of errors.
For details about the supported options for sfOptions, see 为连接器设置配置选项 (in this topic).
数据类型映射
Spark Connector 支持在许多常见数据类型之间进行转换。
从 Spark SQL 到 Snowflake¶
Spark Data Type Snowflake Data Type ArrayTypeVARIANT BinaryTypeNot supported BooleanTypeBOOLEAN ByteTypeINTEGER. Snowflake does not support the BYTE type. DateTypeDATE DecimalTypeDECIMAL DoubleTypeDOUBLE FloatTypeFLOAT IntegerTypeINTEGER LongTypeINTEGER MapTypeVARIANT ShortTypeINTEGER StringTypeIf length is specified, VARCHAR(N); otherwise, VARCHAR StructTypeVARIANT TimestampTypeTIMESTAMP
从 Snowflake 到 Spark SQL¶
Snowflake Data Type Spark Data Type ARRAY StringTypeBIGINT DecimalType(38, 0)BINARY Not supported BLOB Not supported BOOLEAN BooleanTypeCHAR StringTypeCLOB StringTypeDATE DateTypeDECIMAL DecimalTypeDOUBLE DoubleTypeFLOAT DoubleTypeINTEGER DecimalType(38, 0)OBJECT StringTypeTIMESTAMP TimestampTypeTIME StringType(Spark Connector Version 2.4.14 or later)VARIANT StringType
调用 DataFrame.show 方法¶
If you are calling the DataFrame.show method and passing in a number that is less than the number of rows in the
DataFrame, construct a DataFrame that just contains the rows to show in a sorted order.
要这样做,请执行以下操作:
- Call the
sortmethod first to return a DataFrame that contains sorted rows. - Call the
limitmethod on that DataFrame to return a DataFrame that just contains the rows that you want to show. - Call the
showmethod on the returned DataFrame.
For example, if you want to show 5 rows and want the results sorted by the column my_col:
Otherwise, if you call show to display a subset of rows in the DataFrame, different executions of the code might result in
different rows being shown.
为连接器设置配置选项
以下小节列出了您为配置连接器行为而设置的选项:
To set these options, call the .option(<key>, <value>) or .options(<map>) method of the
Spark DataframeReader (https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/DataFrameReader.html) class.
Tip
To facilitate using the options, Snowflake recommends specifying the options in a single Map object and calling
.options(<map>) to set the options.
必需的连接选项
连接到 Snowflake 需要以下选项:
sfUrl使用以下格式为账户指定 主机名:
account_identifier.snowflakecomputing.cnaccount_identifieris your account identifier.sfUserSnowflake 用户的登录名。
还必须使用以下选项之一进行身份验证:
sfPassword
Snowflake 用户的密码。
-
pem_private_keyPrivate key (in PEM format) for key pair authentication. For instructions, see Key-pair authentication and key-pair rotation.
-
sfAuthenticatorSpecifies using External OAuth to authenticate to Snowflake. Set the value to
oauth.Using External OAuth requires setting the
sfTokenparameter.
sfToken(如果使用外部 OAuth,则为必填项)将该值设置为外部 OAuth 访问令牌。
This connection parameter requires setting the
sfAuthenticatorparameter value tooauth.默认为无。
必需的上下文选项
设置会话的数据库和架构上下文需要以下选项:
sfDatabase连接后用于会话的数据库。
sfSchema连接后用于会话的架构。
其他上下文选项
本节中列出的选项不是必需的。
sfAccountAccount identifier (e.g.
myorganization-myaccount). This option is no longer required because the account identifier is specified insfUrl. It is documented here only for backward compatibility.sfWarehouse连接后用于会话的默认虚拟仓库。
sfRole连接后用于会话的默认安全角色。
代理选项
本节中列出的选项不是必需的。
use_proxy指定连接器是否应使用代理:
truespecifies that the connector should use a proxy.falsespecifies that the connector should not use a proxy.
The default value is
false.proxy_host(Required if
use_proxyistrue) Specifies the hostname of the proxy server to use.proxy_port(Required if
use_proxyistrue) Specifies the port number of the proxy server to use.proxy_protocol指定用于连接代理服务器的协议。您可以指定以下值中的一个:
httphttps
The default value is
http.这仅支持 AWS 上的 Snowflake。
此选项是在 Spark Connector 版本 2.11.1 中添加的。
proxy_user指定用于向代理服务器进行身份验证的用户名。如果代理服务器需要身份验证,请设置此项。
这仅支持 AWS 上的 Snowflake。
proxy_passwordSpecifies the password of
proxy_userfor authenticating to the proxy server. Set this if the proxy server requires authentication.这仅支持 AWS 上的 Snowflake。
non_proxy_hosts指定连接器应绕过代理服务器直接连接的主机列表。
Separate the hostnames with a URL-escaped pipe symbol (
%7C). You can also use an asterisk (*) as a wildcard这仅支持 AWS 上的 Snowflake。
其他选项
本节中列出的选项不是必需的。
sfTimezone使用 Spark 时由 Snowflake 使用的时区。请注意,该参数仅在 Snowflake 中设置时区;Spark 环境保持不变。支持的值为:
-
spark: Use the time zone from Spark (default). -
snowflake: Use the current time zone for Snowflake. -
sf_default: Use the default time zone for the Snowflake user who is connecting. -
time_zone: Use a specific time zone (e.g.America/New_York), if valid.For more information about the impact of setting this option, see 使用时间戳和时区 (in this topic).
-
sfCompressIf set to
on(default), the data passed between Snowflake and Spark is compressed.s3MaxFileSize将数据从 Snowflake 移动到 Spark 时使用的文件大小。默认为 10MB。
preactions在 Spark 和 Snowflake 之间传输数据之前执行的以分号分隔的 SQL 命令列表。
If a SQL command contains
%s, the%sis replaced with the table name referenced for the operation.postactionsSpark 和 Snowflake 之间传输数据后执行的以分号分隔的 SQL 命令列表。
If a SQL command contains
%s, it is replaced with the table name referenced for the operation.truncate_columnsIf set to
on(default), a COPY command automatically truncates text strings that exceed the target column length. If set tooff, the command produces an error if a loaded string exceeds the target column length.truncate_table此参数控制 Snowflake 在覆盖 Snowflake 目标表时是否保留该表的架构。
默认情况下,当覆盖 Snowflake 中的目标表时,该目标表的架构也会被覆盖;新架构基于源表的架构(Spark 数据框)。
但是,有时源代码的架构并不理想。例如,用户可能希望 Snowflake 目标表将来能够存储 FLOAT 值,即使初始源列的数据类型是 INTEGER。在这种情况下,不应覆盖 Snowflake 表的架构;只应截断 Snowflake 表,然后与其当前架构一起重复使用。
此参数可能的值为:
onoff
If this parameter is
on, the original schema of the target table is kept. If this parameter isoff, then the old schema of the table is ignored, and a new schema is generated based on the schema of the source.此参数是可选的。
The default value of this parameter is
off(i.e. by default the original table schema is overwritten).For details about mapping Spark data types to Snowflake data types (and vice versa), see: 数据类型映射 (in this topic).
continue_on_error此变量控制用户输入无效数据时 COPY 命令是否中止(例如,变体数据类型列的无效 JSON 格式)。
可能的值:
onoff
The value
onmeans continue even if an error occurs. The valueoffmeans abort if an error is hit.此参数是可选的。
The default value of this parameter is
off.不建议开启此选项。如果在使用 Spark Connector COPYing 到 Snowflake 时报告了任何错误,那么这很可能会导致数据丢失。
Note
如果行被拒绝或缺失,并且这些行在输入源中没有明显的错误,请将其报告给 Snowflake。
usestagingtable此参数控制数据加载是否使用暂存表。
暂存表是由连接器创建的普通表(具有临时名称);如果数据加载操作成功,则删除原始目标表,并将暂存表重命名为原始目标表的名称。如果数据加载操作失败,则暂存表将被删除,目标表将保留其在操作前拥有的数据。因此,暂存表允许在操作失败时保留原始目标表数据。为了安全起见,Snowflake 强烈建议在大多数情况下使用暂存表。
为了使连接器创建暂存表,通过 Spark Connector 执行 COPY 的用户必须具有足够的权限才能创建表。如果用户没有创建表的权限,则直接加载(即不使用暂存表进行加载)很有用。
此参数可能的值为:
onoff
If the parameter is
on, a staging table is used. If this parameter isoff, then the data is loaded directly into the target table.此参数是可选的。
The default value of this parameter is
on(i.e. use a staging table).
autopushdown此参数控制是否启用自动查询下推。
如果启用了下推,则当在 Spark 上运行查询时,如果部分查询可以“向下推送”到 Snowflake 服务器,则会将其向下推送。这提高了某些查询的性能。
此参数是可选的。
The default value is
onif the connector is plugged into a compatible version of Spark. Otherwise, the default value isoff.If the connector is plugged into a different version of Spark than the connector is intended for (e.g. if version 3.2 of the connector is plugged into version 3.3 of Spark), then auto-pushdown is disabled even if this parameter is set to
on.purgeIf this is set to
on, then the connector deletes temporary files created when transferring from Spark to Snowflake via external data transfer. If this parameter is set tooff, then those files are not automatically deleted by the connector.清除仅适用于从 Spark 到 Snowflake 的传输,不适用于从 Snowflake 到 Spark 的传输。
可能的值:
onoff
The default value is
off.columnmap此参数在将数据从 Spark 写入到 Snowflake 时很有用,而 Snowflake 表中的列名与 Spark 表中的列名不匹配。您可以创建一个映射,指明每个 Snowflake 目标列对应的 Spark 源。
该参数是单字符串字面量,形式为:
"Map(col_2 -> col_b, col_3 -> col_a)"例如,请考虑以下情景:
-
A Dataframe named
dfin Spark has three columns:col_1,col_2,col_3 -
A table named
tbin Snowflake has two columns:col_a,col_b -
您希望复制以下值:
- From
df.col_2totb.col_b. - From
df.col_3totb.col_a.
- From
The value of the
columnmapparameter would be:Map(col_2 -> col_b, col_3 -> col_a)可以通过执行以下 Scala 代码生成这个值:
Map("col_2"->"col_b","col_3"->"col_a").toString()此参数的默认值为空。换句话说,默认情况下,源表和目标表中的列名应该匹配。
此参数仅在从 Spark 写入 Snowflake 时使用;从 Snowflake 写入 Spark 时不适用。
-
keep_column_case将表从 Spark 写入 Snowflake 时,除非列名使用双引号,否则 Spark Connector 默认将列名中的字母改为大写。
将表从 Snowflake 写入 Spark 时,Spark Connector 默认在包含除大写字母、下划线和数字之外的任何字符的任何列名前后添加双引号。
If you set keep_column_case to
on, then the Spark connector will not make these changes.可能的值:
onoff
The default value is
off.column_mapping连接器必须将 Spark 数据框中的列映射到 Snowflake 表。这可以根据列名(无论顺序如何)来完成,也可以根据列顺序来完成(即无论列名如何,数据框中的第一列都映射到表中的第一列)。
By default, the mapping is done based on order. You can override that by setting this parameter to
name, which tells the connector to map columns based on column names. (The name mapping is case-insensitive.)此参数可能的值为:
ordername
The default value is
order.column_mismatch_behaviorThis parameter applies only when the
column_mappingparameter is set toname.如果 Spark 数据框和 Snowflake 表中的列名不匹配,那么:
- If
column_mismatch_behavioriserror, then the Spark Connector reports an error. - If
column_mismatch_behaviorisignore, then the Spark Connector ignores the error.- The driver discards any column in the Spark data frame that does not have a corresponding column in the Snowflake table.
- The driver inserts NULLs into any column in the Snowflake table that does not have a corresponding column in the Spark data frame.
潜在的错误包括:
- The Spark data frame could contain columns that are identical except for case (uppercase/lowercase). Because column name mapping is case-insensitive, it is not possible to determine the correct mapping from the data frame to the table.
- The Snowflake table could contain columns that are identical except for case (uppercase/lowercase). Because column name mapping is case-insensitive, it is not possible to determine the correct mapping from the data frame to the table.
- The Spark data frame and the Snowflake table might have no column names in common. In theory, the Spark Connector
could insert NULLs into every column of every row, but this is usually pointless, so the connector throws an
error even if the
column_mismatch_behavioris set toignore.
此参数可能的值为:
errorignore
The default value is
error.- If
time_output_formatThis parameter allows the user to specify the format for
TIMEdata returned.The possible values of this parameter are the possible values for time formats specified at Time formats.
此参数仅影响输出,不影响输入。
timestamp_ntz_output_format,
timestamp_ltz_output_format,
timestamp_tz_output_format这些选项指定时间戳值的输出格式。这些选项的默认值是:
Configuration Option Default Value timestamp_ntz_output_format"YYYY-MM-DD HH24:MI:SS.FF3"timestamp_ltz_output_format"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"timestamp_tz_output_format"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"If these options are set to "sf_current", the connector uses the formats specified for the session.partition_size_in_mb当查询结果集非常大且需要拆分为多个 DataFrame 分区时,使用此参数。此参数为每个 DataFrame 分区指定建议的未压缩大小。要减少分区数量,请增大此大小。
此大小用作推荐大小;分区的实际大小可以更小或更大。
This option applies only when the use_copy_unload parameter is FALSE.
此参数是可选的。
The default value is
100(MB).
use_copy_unloadIf this is
FALSE, Snowflake uses the Arrow data format when SELECTing data. If this is set toTRUE, then Snowflake reverts to the old behavior of using theCOPY UNLOADcommand to transmit selected data.此参数是可选的。
The default value is
FALSE.treat_decimal_as_longIf
TRUE, configures the Spark Connector to returnLongvalues (rather thanBigDecimalvalues) for queries that return the typeDecimal(precision, 0).The default value is
FALSE.此选项是在 Spark Connector 版本 2.11.1 中添加的。
s3_stage_vpce_dns_name指定用于访问内部暂存区的 VPC 端点的 DNS 名称。
此选项是在 Spark Connector 版本 2.11.1 中添加的。
support_share_connectionIf
FALSE, configures the Spark Connector to create a new JDBC connection for each job or action that uses the same Spark Connector options to access Snowflake.The default value is
TRUE, which means that the different jobs and actions share the same JDBC connection if they use the same Spark Connector options to access Snowflake.如果需要以编程方式启用或禁用此设置,请使用以下全局静态函数:
SparkConnectorContext.disableSharedConnection()SparkConnectorContext.enableSharingJDBCConnection()
Note
在以下特殊情况下,Spark Connector 不使用共享 JDBC 连接:
- If preactions or postactions are set, and those preactions or postactions are not CREATE TABLE, DROP TABLE, or MERGE INTO, the Spark Connector does not use the shared connection.
- Utility functions in Utils such as
Utils.runQuery()andUtils.getJDBCConnection()do not use the shared connection.
此选项是在 Spark Connector 版本 2.11.2 中添加的。
force_skip_pre_post_action_check_for_shared_sessionIf
TRUE, configures the Spark Connector to disable the validation of preactions and postactions for session sharing.The default value is
FALSE.Important
在设置此选项之前,请确保前操作和后操作中的查询不会影响会话设置。否则,结果可能会出现问题。
此选项是在 Spark Connector 版本 2.11.3 中添加的。
使用密钥对身份验证和密钥对轮换
Spark Connector 支持密钥对身份验证和密钥轮换。
- To start, complete the initial configuration for key pair authentication as shown in Key-pair authentication and key-pair rotation.
- Send an unencrypted copy of the private key using the
pem_private_keyconnection option.
Attention
For security reasons, rather than hard-coding the pem_private_key in your application, you should set the parameter dynamically after reading the key from a secure source. If the key is encrypted, then decrypt it and send the decrypted version.
In the Python example, note that the pem_private_key file, rsa_key.p8, is:
- Being read directly from a password-protected file, using the environment variable
PRIVATE_KEY_PASSPHRASE. - Using the expression
pkbin thesfOptionsstring.
To connect, you can save the Python example to a file (i.e. <file.py>) and then execute the following command:
Python
使用外部 OAuth¶
Starting with Spark Connector version 2.7.0, you can use External OAuth to authenticate to Snowflake using either the sample Scala program or the sample Python script.
Before using External OAuth and the Spark Connector to authenticate to Snowflake, configure an External OAuth security integration for one of the supported External OAuth authorization servers or an External OAuth custom client.
In the Scala and Python examples, note the replacement of the sfPassword parameter with the sfAuthenticator and sfToken parameters.
Scala:
Python:
用于外部数据传输的 AWS 选项¶
These options are used to specify the Amazon S3 location where temporary data is stored and provide authentication details for accessing the location. They are required only if you are doing an external data transfer. External data transfers are required if either of the following is true:
- You are using version 2.1.x or lower of the Spark Connector (which does not support internal transfers), or
- Your transfer is likely to take 36 hours or more (internal transfers use temporary credentials that expire after 36 hours).
tempDirThe S3 location where intermediate data is stored (e.g.
s3n://xy12345-bucket/spark-snowflake-tmp/).If
tempDiris specified, you must also specify either:awsAccessKey,awsSecretKey
ortemporary_aws_access_key_id,temporary_aws_secret_access_key,temporary_aws_session_token
awsAccessKey,awsSecretKeyThese are standard AWS credentials that allow access to the location specified in
tempDir. Note that both of these options must be set together.If they are set, they can be retrieved from the existing
SparkContextobject.If you specify these variables, you must also specify
tempDir.还应为 Hadoop 集群设置这些凭据。
temporary_aws_access_key_id,temporary_aws_secret_access_key,temporary_aws_session_tokenThese are temporary AWS credentials that allow access to the location specified in
tempDir. Note that all three of these options must be set together.Also, if these options are set, they take precedence over the
awsAccessKeyandawsSecretKeyoptions.If you specify
temporary_aws_access_key_id,temporary_aws_secret_access_key, andtemporary_aws_session_token, you must also specifytempDir. Otherwise, these parameters are ignored.check_bucket_configurationIf set to
on(default), the connector checks if the bucket used for data transfer has a lifecycle policy configured (see Preparing an AWS External S3 Bucket for more information). If there is no lifecycle policy present, a warning is logged.Disabling this option (by setting to
off) skips this check. This can be useful if a user can access the bucket data operations, but not the bucket lifecycle policies. Disabling the option can also speed up query execution times slightly.
For details, see 对 S3 进行身份验证以进行数据交换 (in this topic).
用于外部数据传输的 Azure 选项¶
本节介绍在进行外部数据传输时适用于 Azure Blob 存储的参数。如果满足以下任一条件,则需要使用外部数据传输:
- You are using version 2.1.x or lower of the Spark Connector (which does not support internal transfers), or
- Your transfer is likely to take 36 hours or more (internal transfers use temporary credentials that expire after 36 hours).
当使用带有 Azure Blob 存储的外部传输时,使用下面描述的参数指定 Azure 容器的位置和该容器的 SAS(共享访问签名)。
tempDir存储中间数据的 Azure Blob 存储容器。该容器采用 URL 的形式,例如:
wasb://<azure_container>@<azure_account>.<azure_endpoint>/temporary_azure_sas_token为 Azure Blob 存储指定 SAS 令牌。
For details, see 对 Azure 进行身份验证以进行数据交换 (in this topic).
在 Spark 中为临时存储指定 Azure 信息¶
使用 Azure Blob 存储提供临时存储以在 Spark 和 Snowflake 之间传输数据时,必须向 Spark 以及 Snowflake Spark Connector 提供临时存储的位置和凭据。
要为 Spark 提供临时存储位置,请在 Spark 集群上执行与以下类似的命令:
请注意,最后一个命令包含以下变量:
<container>and<account>: These are the container and account name for your Azure deployment.<azure_endpoint>: This is the endpoint for your Azure deployment location. For example, if you are using an Azure US deployment, the endpoint is likely to beblob.core.windows.net.<azure_sas>: This is the Shared Access Signature security token.
将这些变量替换为 Azure Blob 存储账户的正确信息。
将 Snowflake 会话参数作为连接器的选项传递¶
The Snowflake Connector for Spark supports sending arbitrary session-level parameters to Snowflake (see Session parameters for more info). This can be achieved by adding a
("<key>" -> "<value>") pair to the options object, where <key> is the session parameter name and <value> is the value.
Note
The <value> should be a string enclosed in double quotes, even for parameters that accept numbers or Boolean values (e.g. "1" or "true").
For example, the following code sample passes the USE_CACHED_RESULT session parameter with a value of "false", which disables using the results of previously-executed queries:
安全注意事项
客户应确保在多节点 Spark 系统中,节点之间的通信是安全的。Spark master 向 Spark worker 发送 Snowflake 凭据,以便这些 worker 可以访问 Snowflake 暂存区。如果 Spark master 和 Spark worker 之间的通信不安全,则未经授权的第三方可能会读取凭据。
对 S3 进行身份验证以进行数据交换¶
本节介绍在使用 S3 进行数据交换时如何进行身份验证。
This task is required only in either of the following circumstances:
- Snowflake Connector for Spark 版本为 2.1.x(或更低版本)。从 v2.2.0 开始,该连接器使用 Snowflake 内部临时暂存区进行数据交换。如果当前没有使用该连接器的版本 v2.2.0(或更高版本),Snowflake 强烈建议升级到最新版本。
- Snowflake Connector for Spark 版本为 2.2.0(或更高版本),但您的作业时长通常会超过 36 小时。这是连接器使用 AWS 令牌访问内部暂存区进行数据交换的最大持续时间。
如果您使用的是旧版本的连接器,则需要准备一个 S3 位置,连接器可以使用该位置在 Snowflake 和 Spark 之间交换数据。
To allow access to the S3 bucket/directory used to exchange data between Spark and Snowflake (as specified for tempDir), two authentication methods are supported:
- 永久 AWS 凭据(也用于配置 Hadoop/Spark 身份验证以访问 S3)
- 临时 AWS 凭据
使用永久 AWS 凭据¶
This is the standard AWS authentication method. It requires a pair of awsAccessKey and awsSecretKey values.
Note
These values should also be used to configure Hadoop/Spark for accessing S3. For more information, including examples, see 使用 S3A 或 S3N 对 Hadoop/Spark 进行身份验证 (in this topic).
例如:
For details about the options supported by sfOptions, see 用于外部数据传输的 AWS 选项 (in this topic).
使用 S3A 或 S3N 对 Hadoop/Spark 进行身份验证¶
Hadoop/Spark ecosystems support 2 URI schemes for accessing S3 (https://wiki.apache.org/hadoop/AmazonS3/):
s3a://推荐的新方法(适用于 Hadoop 2.7 及更高版本)
要使用此方法,请修改本主题中的 Scala 示例,添加以下 Hadoop 配置选项:
Make sure the
tempdiroption usess3a://as well.s3n://较旧的方法(适用于 Hadoop 2.6 及更低版本)
在某些系统中,必须明确指定,如以下 Scala 示例所示:
使用临时 AWS 凭据¶
This method uses the temporary_aws_access_key_id, temporary_aws_secret_access_key, and temporary_aws_session_token configuration options for the connector.
此方法仅向 Snowflake 提供临时访问用于数据交换的 S3 桶/目录的权限,从而提高了安全性。
Note
临时凭据只能用于为连接器配置 S3 身份验证;不能用于配置 Hadoop/Spark 身份验证。
此外,如果您提供临时凭据,则它们优先于已提供的任何永久凭据。
以下 Scala 代码示例提供了使用临时凭据进行身份验证的示例:
sfOptions2 can now be used with the options() DataFrame method.
对 Azure 进行身份验证以进行数据交换¶
本节介绍在使用 Azure Blob 存储进行数据交换时如何进行身份验证。
Authenticating this way is required only in either of the following circumstances:
- The Snowflake Connector for Spark version is 2.1.x (or lower). Starting with v2.2.0, the connector uses a Snowflake internal temporary stage for data exchange. If you are not currently using version 2.2.0 (or higher) of the connector, Snowflake strongly recommends upgrading to the latest version.
- The Snowflake Connector for Spark version is 2.2.0 (or higher), but your jobs regularly exceed 36 hours in length. This is the maximum duration for the Azure token used by the connector to access the internal stage for data exchange.
需要准备一个 Azure Blob 存储容器,供该连接器用于在 Snowflake 和 Spark 之间交换数据。
使用 Azure 凭据¶
This is the standard Azure Blob storage authentication method. It requires a pair of
values: tempDir (a URL) and temporary_azure_sas_token values.
Note
These values should also be used to configure Hadoop/Spark for accessing Azure Blob storage. For more information, including examples, see 使用 Azure 对 Hadoop/Spark 进行身份验证 (in this topic).
例如:
For details about the options supported by sfOptions, see 用于外部数据传输的 Azure 选项 (in this topic).
使用 Azure 对 Hadoop/Spark 进行身份验证¶
要使用此方法,请修改本主题中的 Scala 示例,添加以下 Hadoop 配置选项:
Make sure the tempdir option uses wasb:// as well.
不支持通过浏览器进行身份验证
使用 Spark Connector 时,使用任何会打开浏览器窗口要求用户提供凭据的身份验证形式都是不切实际的。该窗口不一定会出现在客户端计算机上。因此,Spark Connector 不支持任何类型的身份验证,包括会调用浏览器窗口的 MFA (多重身份验证)或 SSO (单点登录)。