在 Snowpark Scala 中使用 DataFrames¶
在 Snowpark 中,主要通过 DataFrame 来查询和处理数据。本主题说明如何使用 DataFrames。
要检索和操作数据,需要使用 ` DataFrame `_ 类。DataFrame 表示延迟评估的关系型数据集,延迟评估是指仅在触发特定操作时执行。从某种意义上说, DataFrame 就像一个需要评估才能检索数据的查询。
要将数据检索到 DataFrame 之中,请执行以下步骤:
-
例如,您可以创建一个 DataFrame,以保存来自表和外部 CSV 文件的数据或执行 SQL 语句时产生的数据。
-
例如,可以指定应该选择哪些列、如何筛选行、如何对结果进行排序和分组等。
-
为了将数据检索到 DataFrame 之中,必须调用执行操作的方法(例如
collect()方法)。
接下来的部分将更详细地介绍这些步骤。
设置本部分的示例¶
本部分的一些示例使用 DataFrame 查询名为 sample_product_data 的表。若要运行这些示例,可以执行以下 SQL 语句,以创建此表并用一些数据来填充此表:
要验证表是否已创建,请运行:
构造 DataFrame¶
要构造 DataFrame,可以使用 Session 类中的方法。以下每种方法都基于不同类型的数据源构造 DataFrame :
要基于表、视图或流中的数据创建 DataFrame,请调用
table方法:备注
session.table方法返回一个Updatable对象。Updatable扩展了DataFrame,并提供了用于处理表中数据的其他方法(例如,用于更新和删除数据的方法)。请参阅 更新、删除和合并表中的行。要基于一系列值创建 DataFrame,请调用
createDataFrame方法:备注
构造 DataFrame 时,Snowflake 的保留字不能用作有效的列名。有关保留字的列表,请参阅 保留和受限关键字。
要创建包含特定值范围的 DataFrame,请调用
range方法:要 为暂存区中的文件创建 DataFrame,请调用
read来获取DataFrameReader对象。在DataFrameReader对象中,调用与文件中的数据格式对应的方法:要创建 DataFrame 来保存 SQL 查询的结果,请调用
sql方法:注意:虽然可以使用此方法执行 SELECT 语句,以从表和暂存文件中检索数据,但应改用
table和read方法。在开发工具中,table和read之类的方法可以提供更好的语法突出显示、错误突出显示和智能代码补全效果。
指定应该如何转换数据集¶
要指定应选择哪些列,以及应如何对结果进行筛选、排序、分组等,请调用能转换数据集的 DataFrame 方法。要在这些方法中标识列,请使用 col 函数或计算结果为列的表达式。(请参阅 指定列和表达式。)
例如:
若要指定应返回的行,请调用
filter方法:若要指定应选择的列,请调用
select方法:
每个方法都返回一个经过转换的新 DataFrame 对象。(该方法不会影响原始 DataFrame 对象。)这就意味着,如果要应用多个转换,可以 将多个方法调用链接起来,基于前一个方法调用所返回的新 DataFrame 对象来调用每个后续转换方法。
请注意,这些转换方法不会从 Snowflake 数据库中检索数据。( 执行操作以计算 DataFrame 中描述的操作方法会执行数据检索。)转换方法只是指定应如何构造 SQL 语句。
指定列和表达式¶
调用这些转换方法时,可能需要指定列或者使用列的表达式。例如,调用 select 方法时,需要指定应选择的列。
要引用列,请通过调用 com.snowflake.snowpark.functions 对象中的 ` col `_ 函数来创建 ` Column `_ 对象。
备注
要为字面量创建 Column 对象,请参阅 将字面量用作列对象。
指定筛选器、投影、联接条件等时,可以在表达式中使用 Column 对象。例如:
可以将
Column对象与filter方法一起使用,以指定筛选条件:可以将
Column对象与select方法一起使用,以定义别名:可以将
Column对象与join方法一起使用,以定义联接条件:
引用不同 DataFrames 中的列¶
引用两个不同 DataFrame 对象中具有相同名称的列时(例如,基于该列联接 DataFrames ),可以在一个 DataFrame 对象中使用 DataFrame.col 方法引用该对象中的列(例如 df1.col("name") 和 df2.col("name"))。
下面的示例演示了如何使用 DataFrame.col 方法来引用特定 DataFrame 中的列。该示例联接两个 DataFrame 对象,两者均具有名为 key 的列。该示例使用 Column.as 方法来更改新创建 DataFrame 中的列名称。
使用 apply 方法引用列¶
作为 DataFrame.col 方法的替代方案,可以使用 DataFrame.apply 方法引用特定 DataFrame 中的列。与 DataFrame.col 方法一样, DataFrame.apply 方法接受列名作为输入并返回一个 Column 对象。
请注意,当对象在 Scala 中具有 apply 方法时,可通过像调用函数一样调用对象,来调用 apply 方法。例如,要调用 df.apply("column_name"),只需写成 df("column_name") 即可。以下调用是等效的:
df.col("<column_name>")df.apply("<column_name>")df("<column_name>")
以下示例与前面的示例相同,但使用 DataFrame.apply 方法引用联接操作中的列:
使用列对象的简写¶
作为使用 col 函数的替代方法,您可以通过以下方式之一引用列:
在带引号的列名称前面使用美元符号 (
$"column_name")。在不带引号的列名称前面使用撇号(单引号)(
'column_name)。
为此,请在创建 Session 对象后从 implicits 对象导入名称:
将对象标识符(表名称、列名称等)放在双引号里¶
您指定的数据库、架构、表和暂存区的名称必须符合 Snowflake 标识符要求。指定名称时,Snowflake 会将该名称视为大写形式。例如,以下调用是等效的:
如果名称不符合标识符要求,则必须将名称放在双引号里 (")。对于 Scala 字符串字面量中的双引号字符,请使用反斜杠 (\) 进行转义。例如,以下表名称并非以字母或下划线开头,因此必须将该名称放在双引号里:
请注意,指定 :emph:` 列 ` 的名称时,不需要将名称放在双引号里。如果列名称不符合标识符要求,Snowpark 库会自动将列名称放在双引号里:
如果已在列名称两侧添加了双引号,则该库不会在列名称两侧插入其他双引号。
在某些情况下,列名称可能包含双引号字符:
如 标识符要求 中所述,对于带双引号的标识符中的每个双引号字符,都必须使用两个双引号字符(例如 "name_with_""air""_quotes" 和 """column_name_quoted"""):
请注意,如果将标识符放在双引号里(无论是您显式添加了引号,还是由库为您添加了引号), Snowflake 都会将标识符视为区分大小写:
将字面量用作列对象¶
要在传入 Column 对象的方法中使用字面量,请将字面量传递给 com.snowflake.snowpark.functions 对象中的 lit 函数,从而为字面量创建 Column 对象。例如:
如果字面量是 Scala 中的浮点或双精度值(例如 0.05 ` 默认情况下被视为双精度值 <https://docs.scala-lang.org/overviews/scala-book/built-in-types.html (https://docs.scala-lang.org/overviews/scala-book/built-in-types.html)>`_),则 Snowpark 库会生成 SQL,从而隐式将该值的类型转换为相应的 Snowpark 数据类型(例如 0.05::DOUBLE)。这可能会产生与指定的确切数字不同的近似值。
例如,以下代码不显示匹配的行,即使筛选器(匹配大于或等于 0.05 的值)应匹配 DataFrame 中的行:
问题在于 lit(0.06) 和 lit(0.01) 为 0.06 和 0.01 产生近似值而不是确切值。
若要避免此问题,可使用下列方法之一:
选项 1:将字面量的类型转换为要使用的 Snowpark 类型。例如,要使用精度为 5、小数位数为 2 的 NUMBER,请运行以下语句:
选项 2:将值的类型转换为要使用的类型,然后再将值传递给
lit函数。例如,如果要使用 BigDecimal 类型 (https://docs.scala-lang.org/overviews/scala-book/built-in-types.html#bigint-and-bigdecimal),请执行如下操作:
将列对象的类型转换为特定类型¶
要将 Column 对象的类型转换为特定类型,请调用 ` Column.cast _ 方法,然后从 `com.snowflake.snowpark.types package `_ 传入类型对象。例如,要将字面量转换为精度为 5、小数位数为 2 的 :ref:`label-data_type_number,请运行以下语句:
链接多个方法调用¶
由于每个 转换 DataFrame 对象的方法 都会返回一个应用了转换的新 DataFrame 对象,因此您可以 链接多个方法调用 (link removed),以生成以其他方式转换的新 DataFrame。
下面的示例返回为如下目的而配置的 DataFrame :
查询
sample_product_data表。返回
id = 1的行。选择
name和serial_number列。
在此示例中:
session.table("sample_product_data")返回sample_product_data表的 DataFrame。尽管 DataFrame 尚且不包含表中的数据,但该对象确实包含表列的定义。
filter(col("id") === 1)返回sample_product_data表的 DataFrame(设置为返回id = 1的行)。请再次注意, DataFrame 尚未包含表中匹配的行。在 :ref:` 调用操作方法 <label-snowpark_dataframe_action_method>` 之前,不会检索匹配的行。
select(col("name"), col("serial_number"))返回一个 DataFrame,包含sample_product_data表中id = 1的行的name和serial_number列。
在链接多个方法调用时,请注意调用顺序非常重要。每个方法调用都返回一个已转换的 DataFrame。确保后续调用使用已转换的 DataFrame。
例如,在下面的代码中,select 方法返回仅包含两列的 DataFrame:name 和 serial_number。对此 DataFrame 的 filter 方法调用会失败,因为它使用的 id 列不在已转换的 DataFrame 中。
相反,以下代码会成功执行,因为对包含 sample_product_data 表中所有列(包括 id 列)的 DataFrame 调用了 filter() 方法:
请注意,可能需要按照不同于在 SQL 语句中使用等效关键字(SELECT 和 WHERE)时的顺序执行 select 和 filter 方法调用。
限制 DataFrame 中的行数¶
要限制 DataFrame 中的行数,可以使用 ` DataFrame.limit `_ 转换方法。
Snowpark API 还提供了以下操作方法,可检索和打印出有限数量的行:
` DataFrame.first
_ 操作方法(用于执行查询并返回前 :samp:`{n}行)` DataFrame.show
_ 操作方法(用于执行查询并打印前 :samp:`{n}行)
这些方法能有效地将 LIMIT 子句添加到执行的 SQL 语句中。
如 :ref:` LIMIT 使用说明 <label-limit_cmd_usage_notes>` 中所述,除非将排序顺序 (ORDER BY) 与 LIMIT 一起指定,否则结果是不确定的。
要使 ORDER BY 子句与 LIMIT 子句一起出现(从而避免 ORDER BY 出现在另一个子查询等地方),必须调用 limit 方法,以限制 sort 方法返回的 DataFrame 中的结果数。
例如,如果 链接多个方法调用:
检索列定义¶
要在 DataFrame 的数据集中检索列的定义,请调用 schema 方法。此方法会返回一个 StructType 对象,其中包含 StructField 对象的 Array。每个 StructField 对象都包含一列的定义。
在返回的 StructType 对象中,列名称始终是规范化的。不带引号的标识符以大写形式返回,带引号的标识符以其定义所用的确切大小写形式返回。
下面的示例创建一个 DataFrame,其中包含名为 ID 和 3rd 的列。对于名为 3rd 的列,Snowpark 库会自动将名称放在双引号里(即 "3rd"),因为 该名称不符合标识符要求。
该示例调用 schema 方法,随后对返回的 StructType 对象调用 names 方法,以获取列名称的 ArraySeq。这些名称使用 schema 方法返回的 StructType 进行规范化。
联接 DataFrames¶
若要联接 DataFrame 对象,请调用 ` DataFrame.join `_ 方法。
以下部分说明如何使用 DataFrames 执行联接:
设置联接的示例数据¶
后面部分中的示例使用示例数据,您可以通过执行以下 SQL 语句来设置这些数据:
指定联接的列¶
利用 DataFrame.join 方法,可以通过以下方式之一指定要使用的列:
指定描述联接条件的列表达式。
指定应用作联接中的公共列的一个或多个列。
以下示例对名为 id_a 的列执行内部联接:
请注意,该示例使用 DataFrame.col 方法来指定要用于联接的条件。有关此方法的更多信息,请参阅 指定列和表达式。
这将打印以下输出:
联接结果中重复的相同列名称¶
在联接产生的 DataFrame 中,Snowpark 库使用在已联接的表中找到的列名称,即使这些列名称在表之间相同也是如此。发生这种情况时,这些列名称将在联接产生的 DataFrame 中重复。要按名称访问重复的列,请对表示列的原始表的 DataFrame 调用 col 方法。(有关指定列的更多信息,请参阅 引用不同 DataFrames 中的列。)
以下示例中的代码联接两个 DataFrames,然后对已联接的 DataFrame 调用 select 方法。此代码通过从表示相应 DataFrame 对象( dfRhs 和 dfLhs)的变量调用 col 方法来指定要选择的列。它使用 as 方法为 select 方法创建的 DataFrame 中的列提供新名称。
这将打印以下输出:
在保存或缓存之前删除重复列¶
请注意,当联接产生的 DataFrame 包含重复的列名称时,必须通过删除重复列或重命名列来移除 DataFrame 中的重复项,然后才能将结果保存到表中或者缓存 DataFrame。对于保存到表或缓存的 DataFrame 中的重复列名称,Snowpark 库会将重复的列名称替换为别名,让它们不再重复。
对于缓存的 DataFrame,下例说明了其输出在以下情况下将如何显示:列名称 ID_A 和 VALUE 在两个表的联接结果中重复,之后,在缓存结果之前未删除重复列或未重命名列。
执行自然联接¶
要执行 :ref:` 自然联接 <label-querying_join_natural>` (在具有相同名称的列上联接 DataFrames),请调用 ` DataFrame.naturalJoin `_ 方法。
以下示例在 sample_a 和 sample_b 的公共列( id_a 列)上联接这两个表的 DataFrames :
这将打印以下输出:
指定联接类型¶
默认情况下, DataFrame.join 方法创建内部联接。要指定其他类型的联接,请将 joinType 实参设置为以下值之一:
联接类型 |
|
|---|---|
内部联接 |
:code:`inner`(默认) |
交叉联接 |
|
完整外部联接 |
|
左外部联接 |
|
左反联接 |
|
左半联接 |
|
右外部联接 |
|
例如:
这将打印以下输出:
联接多个表¶
要联接多个表,请执行以下步骤:
为每个表创建一个 DataFrame。
对第一个 DataFrame 调用
DataFrame.join方法,并传入第二个 DataFrame。使用
join方法返回的 DataFrame 来调用join方法,并传入第三个 DataFrame。
可以 链接 join 调用,如下所示:
这将打印以下输出:
执行自联接¶
如果需要基于不同的列将表与其自身联接,则不能使用单个 DataFrame 执行自联接。以下使用单个 DataFrame 执行自联接的示例最终失败,因为联接的左侧和右侧都存在 "id" 的列表达式:
上面两个示例都失败了,并引发以下异常:
请改用 ` DataFrame.clone `_ 方法创建 DataFrame 对象的克隆,并使用这两个 DataFrame 对象执行联接:
如果要对同一列执行自联接,请调用 join 方法,为 USING 子句传入列表达式的 Seq :
执行操作以计算 DataFrame¶
如前所述, DataFrame 是延迟计算的,也就是说,在您执行操作之前, SQL 语句不会发送到服务器执行。执行操作会导致对 DataFrame 进行计算,并将相应的 SQL 语句发送到服务器执行。
以下部分介绍如何同步和异步地对 DataFrame 执行操作:
同步执行操作¶
要同步执行操作,请调用以下操作方法之一:
同步执行操作的方法 |
描述 |
|---|---|
|
计算 DataFrame,并将生成的数据集作为 ` Row |
|
计算 DataFrame,并返回 ` Row |
|
计算 DataFrame 并返回行数。 |
|
计算 DataFrame,并将行打印到控制台。请注意,此方法将行数限制为 10 行(默认值)。请参阅 打印 DataFrame 中的行。 |
|
执行查询,创建临时表,并将结果放入表中。该方法返回一个 |
|
将 DataFrame 中的数据保存到指定表中。请参阅 将数据保存到表中。 |
|
将 DataFrame 保存到暂存区中的指定文件。请参阅 将 DataFrame 保存到暂存区上的文件。 |
|
将 DataFrame 中的数据复制到指定表中。请参阅 将数据从文件复制到表中。 |
|
删除指定表中的行。请参阅 更新、删除和合并表中的行。 |
|
更新指定表中的行。请参阅 更新、删除和合并表中的行。 |
|
将行合并到指定的表中。请参阅 更新、删除和合并表中的行。 |
若要执行查询并返回结果数量,请调用 count 方法:
还可以调用操作方法来:
注意:如果调用 schema 方法来获取 DataFrame 中列的定义,则无需调用操作方法。
异步执行操作¶
备注
此功能是在 Snowpark 0.11.0 中引入的。
要异步执行操作,请调用 async 方法以返回“异步执行者”对象(例如 DataFrameAsyncActor),然后在该对象中调用异步操作方法。
异步执行者对象的这些操作方法会返回一个 TypedAsyncJob 对象,您可以使用该对象检查异步操作的状态和检索操作的结果。
接下来的部分将介绍如何异步执行操作和检查结果。
了解异步操作的基本流程¶
可以使用以下方法异步执行操作:
异步执行操作的方法 |
描述 |
|---|---|
|
异步计算 DataFrame,以 ` Row |
|
异步计算 DataFrame,以检索 ` Row |
|
异步计算 DataFrame 以检索行数。 |
|
将 DataFrame 中的数据异步保存到指定表中。请参阅 将数据保存到表中。 |
|
将 DataFrame 保存到暂存区中的指定文件。请参阅 将 DataFrame 保存到暂存区上的文件。 |
|
将 DataFrame 中的数据异步复制到指定的表中。请参阅 将数据从文件复制到表中。 |
|
异步删除指定表中的行。请参阅 更新、删除和合并表中的行。 |
|
异步更新指定表中的行。请参阅 更新、删除和合并表中的行。 |
|
将行异步合并到指定表中。1.3.0 或更高版本中支持此方法。请参阅 更新、删除和合并表中的行。 |
从返回的 ` TypedAsyncJob `_ 对象中,您可以执行以下操作:
要确定操作是否已完成,请调用
isDone方法。要获取与操作对应的查询 ID,请调用
getQueryId方法。要返回操作的结果(例如,对于
collect方法为Row对象的Array;对于count方法为行数),请调用getResult方法。请注意,
getResult是阻塞调用。要取消操作,请调用
cancel方法。
例如,要异步执行查询,并以 Row 对象的 Array 的形式检索结果,请调用 DataFrame.async.collect:
要异步执行查询并检索结果数,请调用 DataFrame.async.count:
指定等待的最大秒数¶
调用 getResult 方法时,可以使用 maxWaitTimeInSeconds 实参指定在尝试检索结果之前等待查询完成的最大秒数。例如:
如果省略此实参,该方法将等待 :ref:` snowpark_request_timeout_in_seconds <label-snowpark_request_timeout_in_seconds>` 配置属性指定的最大秒数。( :ref:` 创建 Session 对象 <label-snowpark_creating_session>` 时可以设置此属性。)
通过 ID 访问异步查询¶
如果有之前提交的异步查询的查询 ID,则可以调用 Session.createAsyncJob 方法创建一个 ` AsyncJob `_ 对象,该对象可用于检查查询状态、检索查询结果或取消查询。
请注意,与 TypedAsyncJob 不同的是, AsyncJob 没有提供用于检索结果的 getResult 方法。如果需要检索结果,请改为调用 getRows 或 getIterator 方法。
例如:
将行检索到 DataFrame 之中¶
指定 DataFrame 应如何转换 来执行查询并返回结果。可以返回 Array 中的所有行,也可以返回一个 ` Iterator `_,以便逐行遍历访问结果。在后一种情况下,如果数据量较大,则行将按块加载到内存中,以免将大量数据加载到内存中。
返回所有行¶
要同时返回所有行,请调用 ` DataFrame.collect _ 方法。此方法返回 ` Row `_ 对象的数组。要从行中检索值,请调用 :samp:`get{Type} 方法(例如 getString、getInt 等)。
例如:
返回行的迭代器¶
如果要使用 ` Iterator `_ 遍历结果中的 ` Row `_ 对象,请调用 ` DataFrame.toLocalIterator `_。如果结果中的数据量很大,该方法将按块加载行,以免同时将所有行加载到内存中。
例如:
返回前 n 行¶
要返回前 n 行,请调用 ` DataFrame.first `_ 方法,并传入要返回的行数。
如 限制 DataFrame 中的行数 中所述,结果是非确定性的。如果希望结果是确定性的,请对经过排序的 DataFrame 调用此方法 (df.sort().first())。
例如:
打印 DataFrame 中的行¶
要将 DataFrame 中的前 10 行打印到控制台,请调用 ` DataFrame.show `_ 方法。要打印出其他数量的行,请传入要打印的行数。
如 限制 DataFrame 中的行数 中所述,结果是非确定性的。如果希望结果是确定性的,请对经过排序的 DataFrame 调用此方法 (df.sort().show())。
例如:
更新、删除和合并表中的行¶
备注
此功能是在 Snowpark 0.7.0 中引入的。
当您调用 Session.table 为表创建 DataFrame 对象时,该方法将返回一个 Updatable 对象,此对象通过用于更新和删除表中数据的其他方法扩展了 DataFrame。(请参阅 ` Updatable `_。)
如果需要更新或删除表中的行,可以使用 Updatable 类的以下方法:
调用
update,以更新表中的现有行。请参阅 更新表中的行。调用
delete以删除表中的行。请参阅 删除表中的行。调用
merge,以根据另一个表或子查询中的数据在一个表中插入、更新和删除行。(这等效于 SQL 中的 MERGE 命令。)请参阅 将行合并到一个表中。
更新表中的行¶
对于 update 方法,传入一个 Map,它将要更新的列与要分配给这些列的相应值关联起来。update 会返回一个 UpdateResult 对象,其中包含已更新的行数。(请参阅 UpdateResult。)
备注
update 是 操作方法,这意味着调用方法会将 SQL 语句发送到服务器执行。
例如,要将名为 count 的列中的值替换为值 1,请运行以下语句:
上面的示例使用列名称来标识列。您也可以使用列表达式:
如果仅在满足条件时才应进行更新,则可以将该条件指定为实参。例如,对于 category_id 列的值为 20 的行,要替换名为 count 的列中的值,请运行以下语句:
如果需要让条件基于与另一个 DataFrame 对象进行的联接,则可以将该 DataFrame 作为实参传入,并在条件中使用该 DataFrame。例如,对于 category_id 列与 DataFrame dfParts 中的 category_id 匹配的行,要替换名为 count 的列中的值,请运行以下语句:
删除表中的行¶
对于 delete 方法,可以指定一个条件来标识要删除的行,并且可以使该条件基于与另一个 DataFrame 进行的联接。delete 会返回一个 DeleteResult 对象,其中包含已删除的行数。(请参阅 DeleteResult。)
备注
delete 是 操作方法,这意味着调用方法会将 SQL 语句发送到服务器执行。
例如,要删除 category_id 列中的值为 1 的行,请运行以下语句:
如果条件引用另一个 DataFrame 中的列,则将该 DataFrame 作为第二个实参传入。例如,要删除 category_id 列与 DataFrame dfParts 中的 category_id 匹配的行,请将 dfParts 作为第二个实参传入:
将行合并到一个表中¶
要根据另一个表或子查询中的值在一个表中插入、更新和删除行(等效于 SQL 中的 MERGE 命令),请执行以下步骤:
在要将数据合并到的表的
Updatable对象中,调用merge方法,并传入另一个表的DataFrame对象和联接条件的列表达式。这将返回一个
MergeBuilder对象,您可以使用该对象指定要对匹配的行和不匹配的行执行的操作(例如插入、更新或删除)。(请参阅 MergeBuilder。)使用
MergeBuilder对象:要指定应对匹配的行执行的更新或删除,请调用
whenMatched方法。如果需要指定有关何时应更新或删除行的附加条件,则可以传入该条件的列表达式。
此方法返回一个
MatchedClauseBuilder对象,可用于指定要执行的操作。(请参阅 MatchedClauseBuilder。)调用
MatchedClauseBuilder对象中的update或delete方法,以指定应对匹配的行执行的更新或删除操作。这些方法返回一个MergeBuilder对象,可用于指定其他子句。要指定应在行不匹配时执行的插入,请调用
whenNotMatched方法。如果需要指定有关何时应插入行的附加条件,则可以传入该条件的列表达式。
此方法返回一个
NotMatchedClauseBuilder对象,可用于指定要执行的操作。(请参阅 NotMatchedClauseBuilder。)调用
NotMatchedClauseBuilder对象中的insert方法,以指定应在行不匹配时执行的插入操作。这些方法返回一个MergeBuilder对象,可用于指定其他子句。
指定了应执行的插入、更新和删除后,调用
MergeBuilder对象的collect方法,以对表执行指定的插入、更新和删除。collect会返回一个MergeResult对象,其中包含已插入、更新和删除的行数。(请参阅 ``_。)
以下示例将 source 表中带有 id 和 value 列的行插入到 target 表中(如果 target 表未包含具有匹配 ID 的行):
以下示例使用 source 表中具有同一 ID 的行的 value 列值更新 target 表中的行:
将数据保存到表中¶
可以将 DataFrame 的内容保存到新表或现有表中。为此,您必须具有以下权限:
对架构的 CREATE TABLE 权限(如果表不存在)。
对表的 INSERT 权限。
要将 DataFrame 的内容保存到表中,请执行以下操作:
调用 ` DataFrame.write`_ 方法,以获取 ` DataFrameWriter `_ 对象。
调用 ` DataFrameWriter.mode `_ 方法,传入一个 ` SaveMode `_ 对象,该对象指定有关写入表的首选项:
要插入行,请传入
SaveMode.Append。要覆盖现有表,请传入
SaveMode.Overwrite。
此方法返回使用指定模式进行了配置的相同
DataFrameWriter对象。如果要在现有表中插入行 (
SaveMode.Append),并且 DataFrame 中的列名称与表中的列名称匹配,请调用 ` DataFrameWriter.option_ 方法,并传入 :code:"columnOrder"` 和"name"作为实参。备注
此方法是在 Snowpark 1.4.0 中引入的。
默认情况下,
columnOrder选项设置为"index",这意味着DataFrameWriter按照列的出现顺序插入值。例如,DataFrameWriter在表中第一列内插入 DataFrame 中第一列的值,在表中第二列内插入 DataFrame 中第二列的值,依此类推。此方法返回使用指定选项进行了配置的同一个
DataFrameWriter对象。调用 ` DataFrameWriter.`_ saveAsTable 将 DataFrame 的内容保存到指定的表中。
无需调用单独的方法(例如
collect)来执行将数据保存到表中的 SQL 语句。saveAsTable是执行 SQL 语句的 操作方法。
以下示例使用 DataFrame df 的内容覆盖现有表(由 tableName 变量标识):
以下示例将 DataFrame df 中的行插入到现有表(由 tableName 变量标识)。在此示例中,表和 DataFrame 都包含列 c1 和 c2。
该示例展示了这两种做法的区别:将 columnOrder 选项设置为 "name" (这会将值插入到与 DataFrame 列同名的表列中),以及使用默认的 columnOrder 选项(这会根据 DataFrame 中列的顺序将值插入到表列中)。
基于 DataFrame 创建视图¶
要基于 DataFrame 创建视图,请调用 ` DataFrame.createOrReplaceView `_ 方法:
请注意,调用 createOrReplaceView 会立即创建新视图。更重要的是,它不会导致对 DataFrame 进行计算。(在您 :ref:` 执行操作 <label-snowpark_dataframe_action_method>` 之前,不会对 DataFrame 本身进行计算。)
通过调用 createOrReplaceView 创建的视图是持久保留的。如果不再需要该视图,可以 手动删除视图。
如果仅需要为会话创建临时视图,请改为调用 ` DataFrame.createOrReplaceTempView `_ 方法:
缓存 DataFrame¶
在某些情况下,可能需要执行复杂的查询,并将结果保留下来,以供后续操作使用(而不必再次执行相同的查询)。在此类情况下,可以调用 ` DataFrame.cacheResult `_ 方法来缓存 DataFrame 的内容。
此方法会:
运行查询。
在调用
cacheResult之前,无需 调用单独的操作方法来检索结果。cacheResult是执行查询的操作方法。将结果保存在临时表中
由于
cacheResult会创建临时表,因此,您必须对正在使用的架构具有 CREATE TABLE 权限。返回一个 ` HasCachedResult `_ 对象,该对象提供对临时表中结果的访问权。
由于
HasCachedResult扩展了DataFrame,因此,可以对此缓存数据执行一些同样可以对 DataFrame 执行的操作。
备注
由于 cacheResult 执行查询并将结果保存到表中,因此该方法可能会导致计算和存储成本增加。
例如:
请注意,调用该方法时,原始的 DataFrame 不受影响。例如,假设 dfTable 是 sample_product_data 表的 DataFrame :
调用 cacheResult 后, dfTable 仍指向 sample_product_data 表,而且您可以继续使用 dfTable 来查询和更新该表。
要使用临时表中的缓存数据,请使用 dfTempTable (即 cacheResult 返回的 HasCachedResult 对象)。
处理暂存区中的文件¶
Snowpark 库提供了一些类和方法,可让您通过使用暂存区中的文件 将数据加载到 Snowflake 中,以及 从 Snowflake 卸载数据。
备注
为了在暂存区上使用这些类和方法,您必须具有 使用暂存区所需的权限。
接下来的部分将说明如何使用这些类和方法:
在暂存区中上传和下载文件¶
要在暂存区中上传和下载文件,请使用 ` FileOperation `_ 对象:
将文件上传到暂存区¶
要将文件上传到暂存区,请执行以下步骤:
验证您是否具有 将文件上传到暂存区的权限。
调用 ` FileOperation.put `_ 方法将文件上传到暂存区。
此方法执行 SQL PUT 命令。
要为 PUT 命令指定任何 可选参数,请创建参数和值的
Map,然后传入该Map作为options实参。例如:在
localFilePath实参中,可以使用通配符(*和?)来标识要上传的一组文件。例如:
检查
put方法返回的 ` PutResult_ 对象的 :code:`Array,以确定文件是否上传成功。例如,要打印该文件的文件名和 PUT 操作的状态,请运行以下语句:
从暂存区下载文件¶
要从暂存区下载文件,请执行以下步骤:
使用输入流在暂存区中上传和下载数据¶
备注
此功能是在 Snowpark 1.4.0 中引入的。
要使用输入流将数据上传到暂存区上的文件,以及从暂存区上的文件下载数据,请使用 ` FileOperation _ 对象的 :code:`uploadStream 和 downloadStream 方法:
使用输入流将数据上传到暂存区上的文件¶
要将数据从 ` java.io.InputStream `_ 对象上传到暂存区上的文件,请执行以下步骤:
验证您是否具有 将文件上传到暂存区的权限。
调用 ` FileOperation.uploadStream `_ 方法。
传入应写入数据的暂存区上文件的完整路径和
InputStream对象。此外,使用compress实参,指定在上传数据之前是否应压缩数据。
例如:
使用输入流从暂存区上的文件下载数据¶
要将数据从暂存区上的文件下载到 ` java.io.InputStream `_ 对象,请执行以下操作:
验证您是否具有 从暂存区下载文件的权限。
调用 ` FileOperation.downloadStream `_ 方法。
传入包含要下载的数据的暂存区文件的完整路径。使用
decompress实参,指定是否要压缩文件中的数据。
例如:
为暂存区中的文件设置 DataFrame¶
本部分介绍如何为 Snowflake 暂存区中的文件设置 DataFrame。创建此 DataFrame 后,您可以使用 DataFrame 来执行以下操作:
要为 Snowflake 暂存区中的文件设置 DataFrame,请使用 DataFrameReader 类:
验证您是否具有以下权限:
以下其中一项:
架构的 CREATE TABLE 权限(如果您计划指定用于确定如何从暂存文件复制数据的 复制选项 )。
否则确保您具备架构的 CREATE FILE FORMAT 权限。
调用
Session类中的read方法,以访问DataFrameReader对象。如果文件采用 CSV 格式,请描述文件中的字段。要这样做,请执行以下操作:
创建一个 ` StructType `_ 对象,该对象包含描述文件中的字段的一系列 ` StructField `_ 对象。
对于每个
StructField对象,请指定以下内容:字段的名称。
字段的数据类型(指定为
com.snowflake.snowpark.types包中的对象)。字段是否可为 null。
例如:
调用
DataFrameReader对象中的schema方法,传入StructType对象。例如:
schema方法会返回一个DataFrameReader对象,该对象配置为读取包含指定字段的文件。请注意,对于其他格式(如 JSON)的文件,无需执行此操作。对于这些文件,
DataFrameReader会将数据视为字段名称为$1、类型为 VARIANT 的单个字段处理。
如果需要指定有关数据读取方式的其他信息(例如,数据经过压缩,或者 CSV 文件使用分号而非逗号来分隔字段),请调用 ` DataFrameReader.option `_ 或 ` DataFrameReader.options `_ 方法。
传入要设置的选项的名称和值。可以设置以下类型的选项:
有关 CREATE FILE FORMAT 的文档。
COPY INTO TABLE 文档。
请注意,当您 将数据检索到 DataFrame 中时,设置复制选项可能造成费用更昂贵的执行策略。
下面的示例设置了
DataFrameReader对象,以查询未压缩且使用分号作为字段分隔符的 CSV 文件中的数据。option方法返回使用指定选项进行了配置的DataFrameReader对象。若要设置多个选项,可以 链接多个调用 到
option方法(如上面的示例所示),或是调用 ` DataFrameReader.options`_ 方法,并传入选项名称和值的Map。调用与文件格式相对应的方法。您可以调用以下方法之一:
` DataFrameReader.avro `_
` DataFrameReader.csv `_
` DataFrameReader.json `_
` DataFrameReader.orc `_
` DataFrameReader.parquet `_
` DataFrameReader.xml `_
调用这些方法时,传入要读取的文件的暂存区位置。例如:
若要指定以相同前缀开头的多个文件,请在暂存区名称后指定前缀。例如,要从暂存区
@mystage加载具有前缀csv_的文件,请执行以下操作:与文件格式相对应的方法会返回文件的一个 ` CopyableDataFrame
_ 对象。:code:`CopyableDataFrame扩展了DataFrame,并提供用于处理暂存文件中的数据的其他方法。调用操作方法,以执行以下操作:
与表的 DataFrames 一样,在您调用 :ref:` 操作方法 <label-snowpark_dataframe_action_method>` 之前,不会将数据检索到 DataFrame 中。
将数据从文件加载到 DataFrame 中¶
在 为暂存区中的文件设置 DataFrame 之后,可以将文件中的数据加载到 DataFrame 中:
使用 DataFrame 对象方法,对 :ref:` 数据集执行所需的任何转换 <label-snowpark_dataframe_transform>` (例如,选择特定字段、筛选行等)。
例如,要从名为
mystage的暂存区中名为data.json的 JSON 文件中提取color元素,请运行以下语句:如前所述,对于格式并非 CSV 的文件(例如 JSON 格式的文件),
DataFrameReader会将文件中的数据视为名为$1的单个 VARIANT 列。调用
DataFrame.collect方法以加载数据。例如:
将数据从文件复制到表中¶
为暂存区中的文件设置 DataFrame 之后,可以调用 ` CopyableDataFrame.copyInto _ 方法将数据复制到表中。此方法执行 :doc:/sql-reference/sql/copy-into-table` 命令。
备注
在调用 copyInto 之前,无需调用 collect 方法。在调用 copyInto 之前,文件中的数据不需要在 DataFrame 之中。
例如,以下代码将 myFileStage 指定的 CSV 文件中的数据加载到 mytable 表中。由于数据位于 CSV 文件中,代码还必须 描述文件中的字段。为此,该示例调用 ` DataFrameReader.schema _ 方法,并传入包含描述字段的 ` StructField `_ 对象序列的 ` StructType `_ 对象 (``csvFileSchema`)。
将 DataFrame 保存到暂存区上的文件¶
备注
此功能是在 Snowpark 1.5.0 中引入的。
如果需要将 DataFrame 保存到暂存区上的文件中,可以调用与文件格式对应的 ` DataFrameWriter _ 方法(例如 :code:`csv 方法用于写入 CSV 文件),并传入应将文件保存到的暂存区位置。这些 DataFrameWriter 方法执行 COPY INTO <location> 命令。
备注
在调用这些 DataFrameWriter 方法之前,无需调用 collect 方法。在调用这些方法之前,文件中的数据不需要在 DataFrame 之中。
要将 DataFrame 的内容保存到暂存区上的文件中,请执行以下操作:
调用 ` DataFrame.write`_ 方法,以获取 ` DataFrameWriter
_ 对象。例如,要获取代表名为 ``sample_product_data`的表的 DataFrame 的DataFrameWriter对象,请运行以下语句:如果要覆盖文件的内容(如果文件存在),请调用 ` DataFrameWriter.mode
_ 方法,并传入 :code:`SaveMode.Overwrite。否则,默认情况下,如果暂存区上的指定文件已存在,
DataFrameWriter会报告错误。mode方法返回使用指定模式进行了配置的相同DataFrameWriter对象。例如,要指定
DataFrameWriter应覆盖暂存区上的文件,请运行以下语句:如果需要指定有关应如何保存数据的其他信息(例如,应压缩数据,或者要使用分号来分隔 CSV 文件中的字段),请调用 ` DataFrameWriter.option `_ 方法或 ` DataFrameWriter.options `_ 方法。
传入要设置的选项的名称和值。可以设置以下类型的选项:
有关 COPY INTO <location> 的文档。
有关 COPY INTO <location> 的文档中介绍的 复制选项。
请注意,不能使用
option方法设置以下选项:TYPE 格式类型选项。
OVERWRITE 复制选项。若要设置此选项,请改为调用
mode方法(如上一步所述)。
下面的示例设置
DataFrameWriter对象,以使用分号(而非逗号)作为字段分隔符,以未压缩的形式将数据保存到 CSV 文件中。option方法返回使用指定选项进行了配置的DataFrameWriter对象。要设置多个选项,可以如上例所示 链接多个调用 (调用的是
option方法),或者调用 ` DataFrameWriter.options`_ 方法,并传入选项名称和值的Map。若要返回有关已保存的各文件的详细信息,请将
DETAILED_OUTPUT复制选项 设置为TRUE。默认情况下,
DETAILED_OUTPUT为FALSE,这代表该方法会返回一行输出,其中包含"rows_unloaded"、"input_bytes"和"output_bytes"字段。将
DETAILED_OUTPUT设置为TRUE时,该方法会为所保存的每个文件返回一行输出。每行都包含FILE_NAME、FILE_SIZE和ROW_COUNT字段。调用文件格式对应的方法,将数据保存到文件中。您可以调用以下方法之一:
调用这些方法时,传入应写入数据的文件的暂存区位置(如
@mystage)。默认情况下,该方法会将数据保存到名称带有前缀
data_的文件中(例如@mystage/data_0_0_0.csv)。如果希望使用不同的前缀命名文件,请在暂存区名称后指定前缀。例如:本示例将 DataFrame 的内容保存到名称以
saved_data为前缀(例如@mystage/saved_data_0_0_0.csv)的文件中。检查返回的 ` WriteFileResult `_ 对象,以获取有关写入文件的数据量的信息。
从
WriteFileResult对象,您可以访问 COPY INTO <location> 命令生成的输出:若要以 ` Row
_ 对象数组的形式访问输出行,请使用 :code:`rows值成员。若要确定行中有哪些字段,请使用
schema值成员,该成员是描述行中字段的 ` StructType `_。
例如,要在输出行中打印出字段名称和值,请运行以下语句:
以下示例使用 DataFrame 将名为 car_sales 的表的内容保存到 JSON 文件,这些文件位于 @mystage 暂存区中,名称带有 saved_data 前缀(例如 @mystage/saved_data_0_0_0.json)。示例代码:
覆盖文件(如果暂存区中已存在该文件)。
返回有关保存操作的详细输出。
保存未压缩的数据。
最后,示例代码在返回的输出行中打印出每个字段和值:
使用半结构化数据¶
使用 DataFrame,您可以查询和访问 :doc:` 半结构化数据 </user-guide/semistructured-intro>` (例如 JSON 数据)。接下来的几个部分将介绍如何在 DataFrame 中处理半结构化数据。
备注
这些部分中的示例使用 示例中使用的示例数据 中的示例数据。
遍历半结构化数据¶
若要引用半结构化数据中的特定字段或元素,请使用 ` Column `_ 对象的以下方法:
使用 ` Column.apply("<field_name>")
_,返回 OBJECT (或包含 OBJECT 的 VARIANT)中一个字段的 :code:`Column对象。使用 ` Column.apply(<index>)
_,返回 ARRAY (或包含 ARRAY 的 VARIANT )中一个元素的 :code:`Column对象。
备注
如果路径中的字段名称或元素不规则,并且导致难以使用这些 Column.apply 方法,则可以使用 ` get _、 get_ignore_case `_ 或 ` get_path `_ 函数作为替代方法。
如 使用 apply 方法引用列 中所述,可以省略方法名称 apply :
例如,以下代码会从 :ref:` 示例数据 <label-sample_data_semistructured_data>` 的 src 列的对象内,选择 dealership 字段:
该代码会打印以下输出:
备注
DataFrame 中的值放在双引号之间,因为这些值以字符串字面量的形式返回。若要将这些值的类型转换为特定类型,请参阅 显式转换半结构化数据中的值。
还可以 链接多个方法调用,以遍历特定字段或元素的路径。
例如,以下代码选择 salesperson 对象中的 name 字段:
该代码会打印以下输出:
再举一个例子,下面的代码选择 vehicle 字段的第一个元素,其中包含车辆数组。该示例还会选择第一个元素中的 price 字段。
该代码会打印以下输出:
作为 apply 方法的替代方法,如果路径中的字段名称或元素不规则,并且导致难以使用 Column.apply 方法,则可以使用 ` get _、 get_ignore_case `_ 或 ` get_path `_ 函数。
例如,以下代码行都会打印对象中指定字段的值:
同样,以下代码行都会打印对象中指定路径处的字段值:
显式转换半结构化数据中的值¶
默认情况下,字段和元素的值以字符串字面量(包括双引号)的形式返回,如上面的示例所示。
若要避免意外结果,请调用 :ref:` cast <label-snowpark_dataframe_cols_cast>` 方法,将值转换为特定类型。例如,以下代码会打印出未经类型转换和经过类型转换的值:
该代码会打印以下输出:
将对象数组展平为行¶
如果需要将半结构化数据“展平”为 DataFrame (例如,为数组中的每个对象生成一行),请调用 ` DataFrame.flatten _ 方法。此方法与 :doc:/sql-reference/functions/flatten` SQL 函数等效。如果传入对象或数组的路径,该方法会返回一个 DataFrame,其中包含对象或数组中各字段或元素的行。
例如,在 :ref:` 示例数据 <label-sample_data_semistructured_data>` 中, src:customer 是一个包含有关客户的信息的对象数组。每个对象都包含 name 和 address 字段。
如果将此路径传递给 flatten 函数:
该方法返回一个 DataFrame:
从 DataFrame 中,您可以从 VALUE 字段中的每个对象选择 name 和 address 字段:
以下代码 将值类型转换为特定类型 并更改列的名称,从而补充了上一个示例:
执行 SQL 语句¶
若要执行您指定的 SQL 语句,请调用 Session 类中的 sql 方法,然后传入要执行的语句。该方法返回一个 DataFrame。
请注意,在您 :ref:` 调用操作方法 <label-snowpark_dataframe_action_method>` 之前, SQL 语句不会执行。
如果要 :ref:` 调用方法来转换 DataFrame <label-snowpark_dataframe_transform>` (例如 filter、select 等),请注意,仅当基础 SQL 语句是 SELECT 语句时,这些方法才有效。其他类型的 SQL 语句不支持转换方法。