在 Snowpark Java 中使用 DataFrames¶
在 Snowpark 中,主要通过 DataFrame 来查询和处理数据。本主题说明如何使用 DataFrames。
要检索和操作数据,需要使用 ` DataFrame `_ 类。DataFrame 表示延迟评估的关系型数据集,延迟评估是指仅在触发特定操作时执行。从某种意义上说, DataFrame 就像一个需要评估才能检索数据的查询。
要将数据检索到 DataFrame 之中,请执行以下步骤:
-
例如,您可以创建一个 DataFrame,以保存来自表和外部 CSV 文件的数据或执行 SQL 语句时产生的数据。
-
例如,可以指定应该选择哪些列、如何筛选行、如何对结果进行排序和分组等。
-
为了将数据检索到 DataFrame 之中,必须调用执行操作的方法(例如
collect()方法)。
接下来的部分将更详细地介绍这些步骤。
设置本部分的示例¶
本部分的一些示例使用 DataFrame 查询名为 sample_product_data 的表。若要运行这些示例,可以执行以下 SQL 语句,以创建此表并用一些数据来填充此表:
要验证表是否已创建,请运行:
构造 DataFrame¶
要构造 DataFrame,可以使用 Session 类中的方法。以下每种方法都基于不同类型的数据源构造 DataFrame :
要基于表、视图或流中的数据创建 DataFrame,请调用
table方法:备注
table方法返回一个Updatable对象。Updatable扩展了DataFrame,并提供了用于处理表中数据的其他方法(例如,用于更新和删除数据的方法)。请参阅 更新、删除和合并表中的行。要基于指定值创建 DataFrame,请执行以下步骤:
构造一个由包含值的
Row对象组成的数组。构造一个
StructType对象,以描述这些值的数据类型。调用
createDataFrame方法,并传入数组和StructType对象。
备注
构造 DataFrame 时,Snowflake 的保留字不能用作有效的列名。有关保留字的列表,请参阅 保留和受限关键字。
要创建包含特定值范围的 DataFrame,请调用
range方法:要 为暂存区中的文件创建 DataFrame,请调用
read来获取DataFrameReader对象。在DataFrameReader对象中,调用与文件中的数据格式对应的方法:要创建 DataFrame 来保存 SQL 查询的结果,请调用
sql方法:注意:虽然可以使用此方法执行 SELECT 语句,以从表和暂存文件中检索数据,但应改用
table和read方法。在开发工具中,table和read之类的方法可以提供更好的语法突出显示、错误突出显示和智能代码补全效果。
指定应该如何转换数据集¶
要指定应选择哪些列,以及应如何对结果进行筛选、排序、分组等,请调用能转换数据集的 DataFrame 方法。要在这些方法中标识列,请使用 Functions.col 静态方法或计算结果为列的表达式。(请参阅 指定列和表达式。)
例如:
若要指定应返回的行,请调用
filter方法:若要指定应选择的列,请调用
select方法:
每个方法都返回一个经过转换的新 DataFrame 对象。(该方法不会影响原始 DataFrame 对象。)这就意味着,如果要应用多个转换,可以 将多个方法调用链接起来,基于前一个方法调用所返回的新 DataFrame 对象来调用每个后续转换方法。
请注意,这些转换方法不会从 Snowflake 数据库中检索数据。( 执行操作以计算 DataFrame 中描述的操作方法会执行数据检索。)转换方法只是指定应如何构造 SQL 语句。
指定列和表达式¶
调用这些转换方法时,可能需要指定列或者使用列的表达式。例如,调用 select 方法时,需要指定应选择的列。
要引用列,请通过调用 ` Functions.col `_ 静态方法来创建 ` Column `_ 对象。
备注
要为字面量创建 Column 对象,请参阅 将字面量用作列对象。
指定筛选器、投影、联接条件等时,可以在表达式中使用 Column 对象。例如:
可以将
Column对象与filter方法一起使用,以指定筛选条件:可以将
Column对象与select方法一起使用,以定义别名:可以将
Column对象与join方法一起使用,以定义联接条件:
引用不同 DataFrames 中的列¶
引用两个不同 DataFrame 对象中具有相同名称的列时(例如,基于该列联接 DataFrames ),可以在每个 DataFrame 对象中使用 col 方法引用该对象中的列(例如 df1.col("name") 和 df2.col("name"))。
下面的示例演示了如何使用 col 方法来引用特定 DataFrame 中的列。该示例联接两个 DataFrame 对象,两者均具有名为 value 的列。该示例使用 Column 对象的 as 方法来更改新创建的 DataFrame 中的列名称。
将对象标识符(表名称、列名称等)放在双引号里¶
您指定的数据库、架构、表和暂存区的名称必须符合 Snowflake 标识符要求。指定名称时,Snowflake 会将该名称视为大写形式。例如,以下调用是等效的:
如果名称不符合标识符要求,则必须将名称放在双引号里 (")。对于 Scala 字符串字面量中的双引号字符,请使用反斜杠 (\) 进行转义。例如,以下表名称并非以字母或下划线开头,因此必须将该名称放在双引号里:
请注意,指定 :emph:` 列 ` 的名称时,不需要将名称放在双引号里。如果列名称不符合标识符要求,Snowpark 库会自动将列名称放在双引号里:
如果已在列名称两侧添加了双引号,则该库不会在列名称两侧插入其他双引号。
在某些情况下,列名称可能包含双引号字符:
如 标识符要求 中所述,对于带双引号的标识符中的每个双引号字符,都必须使用两个双引号字符(例如 "name_with_""air""_quotes" 和 """column_name_quoted"""):
请注意,如果将标识符放在双引号里(无论是您显式添加了引号,还是由库为您添加了引号), Snowflake 都会将标识符视为区分大小写:
将字面量用作列对象¶
要在传入 Column 对象的方法中使用字面量,请将字面量传递给 Functions 类中的 lit 静态方法,从而为字面量创建 Column 对象。例如:
如果字面量是 Java 中的浮点或双精度值(例如 0.05 默认情况下被视为双精度值),则 Snowpark 库会生成 SQL,以将该值隐式转换为相应的 Snowpark 数据类型(例如 0.05::DOUBLE)。这可能会产生与指定的确切数字不同的近似值。
例如,以下代码不显示匹配的行,即使筛选器(匹配大于或等于 0.05 的值)应匹配 DataFrame 中的行:
问题在于 Functions.lit(0.06) 和 Functions.lit(0.01) 为 0.06 和 0.01 产生近似值而不是确切值。
要避免此问题,请 将字面量转换为要使用的 Snowpark 类型。例如,要使用精度为 5、小数位数为 2 的 NUMBER,请运行以下语句:
将列对象的类型转换为特定类型¶
要将 Column 对象的类型转换为特定类型,请调用 ` cast _ 方法,然后从 ` com.snowflake.snowpark_java.types 包 `_ 传入类型对象。例如,要将字面量转换为精度为 5、小数位数为 2 的 :ref:`label-data_type_number,请运行以下语句:
链接多个方法调用¶
由于每个 转换 DataFrame 对象的方法 都会返回一个应用了转换的新 DataFrame 对象,因此您可以 链接多个方法调用 (link removed),以生成以其他方式转换的新 DataFrame。
下面的示例返回为如下目的而配置的 DataFrame :
查询
sample_product_data表。返回
id = 1的行。选择
name和serial_number列。
在此示例中:
session.table("sample_product_data")返回sample_product_data表的 DataFrame。尽管 DataFrame 尚且不包含表中的数据,但该对象确实包含表列的定义。
filter(Functions.col("id").equal_to(Functions.lit(1)))返回sample_product_data表的 DataFrame(设置为返回id = 1的行)。请再次注意, DataFrame 尚未包含表中匹配的行。在 :ref:` 调用操作方法 <label-snowpark_java_dataframe_action_method>` 之前,不会检索匹配的行。
select(Functions.col("name"), Functions.col("serial_number"))返回一个 DataFrame,包含sample_product_data表中id = 1的行的name和serial_number列。
在链接多个方法调用时,请注意调用顺序非常重要。每个方法调用都返回一个已转换的 DataFrame。确保后续调用使用已转换的 DataFrame。
例如,在下面的代码中,select 方法返回仅包含两列的 DataFrame:name 和 serial_number。对此 DataFrame 的 filter 方法调用会失败,因为它使用的 id 列不在已转换的 DataFrame 中。
相反,以下代码会成功执行,因为对包含 sample_product_data 表中所有列(包括 id 列)的 DataFrame 调用了 filter() 方法:
请注意,可能需要按照不同于在 SQL 语句中使用等效关键字(SELECT 和 WHERE)时的顺序执行 select 和 filter 方法调用。
限制 DataFrame 中的行数¶
要限制 DataFrame 中的行数,可以使用 ` limit `_ 转换方法。
Snowpark API 还提供了以下操作方法,可检索和打印出有限数量的行:
` first
_ 操作方法(用于执行查询并返回前 :samp:`{n}行)` show
_ 操作方法(用于执行查询并打印前 :samp:`{n}行)
这些方法能有效地将 LIMIT 子句添加到执行的 SQL 语句中。
如 :ref:` LIMIT 使用说明 <label-limit_cmd_usage_notes>` 中所述,除非将排序顺序 (ORDER BY) 与 LIMIT 一起指定,否则结果是不确定的。
要使 ORDER BY 子句与 LIMIT 子句一起出现(从而避免 ORDER BY 出现在另一个子查询等地方),必须调用 limit 方法,以限制 sort 方法返回的 DataFrame 中的结果数。
例如,如果 链接多个方法调用:
检索列定义¶
要在 DataFrame 的数据集中检索列的定义,请调用 schema 方法。此方法会返回一个 StructType 对象,其中包含 StructField 对象的 Array。每个 StructField 对象都包含一列的定义。
在返回的 StructType 对象中,列名称始终是规范化的。不带引号的标识符以大写形式返回,带引号的标识符以其定义所用的确切大小写形式返回。
下面的示例创建一个 DataFrame,其中包含名为 ID 和 3rd 的列。对于名为 3rd 的列,Snowpark 库会自动将名称放在双引号里(即 "3rd"),因为 该名称不符合标识符要求。
该示例调用 schema 方法,随后对返回的 StructType 对象调用 names 方法,以获取列名称的数组。这些名称使用 schema 方法返回的 StructType 进行规范化。
联接 DataFrames¶
若要联接 DataFrame 对象,请调用 ` join `_ 方法。
以下部分说明如何使用 DataFrames 执行联接:
设置联接的示例数据¶
后面部分中的示例使用示例数据,您可以通过执行以下 SQL 语句来设置这些数据:
指定联接的列¶
利用 DataFrame.join 方法,可以通过以下方式之一指定要使用的列:
指定描述联接条件的列表达式。
指定应用作联接中的公共列的一个或多个列。
以下示例对名为 id_a 的列执行内部联接:
请注意,该示例使用 DataFrame.col 方法来指定要用于联接的条件。有关此方法的更多信息,请参阅 指定列和表达式。
这将打印以下输出:
联接结果中重复的相同列名称¶
在联接产生的 DataFrame 中,Snowpark 库使用在已联接的表中找到的列名称,即使这些列名称在表之间相同也是如此。发生这种情况时,这些列名称将在联接产生的 DataFrame 中重复。要按名称访问重复的列,请对表示列的原始表的 DataFrame 调用 col 方法。(有关指定列的更多信息,请参阅 引用不同 DataFrames 中的列。)
以下示例中的代码联接两个 DataFrames,然后对已联接的 DataFrame 调用 select 方法。此代码通过从表示相应 DataFrame 对象( dfRhs 和 dfLhs)的变量调用 col 方法来指定要选择的列。它使用 as 方法为 select 方法创建的 DataFrame 中的列提供新名称。
这将打印以下输出:
在保存或缓存之前删除重复列¶
请注意,当联接产生的 DataFrame 包含重复的列名称时,必须通过删除重复列或重命名列来移除 DataFrame 中的重复项,然后才能将结果保存到表中或者缓存 DataFrame。对于保存到表或缓存的 DataFrame 中的重复列名称,Snowpark 库会将重复的列名称替换为别名,让它们不再重复。
对于缓存的 DataFrame,下例说明了其输出在以下情况下将如何显示:列名称 ID_A 和 VALUE 在两个表的联接结果中重复,之后,在缓存结果之前未删除重复列或未重命名列。
执行自然联接¶
要执行 :ref:` 自然联接 <label-querying_join_natural>` (在具有相同名称的列上联接 DataFrames),请调用 ` naturalJoin `_ 方法。
以下示例在 sample_a 和 sample_b 的公共列( id_a 列)上联接这两个表的 DataFrames :
这将打印以下输出:
指定联接类型¶
默认情况下, DataFrame.join 方法创建内部联接。要指定其他类型的联接,请将 joinType 实参设置为以下值之一:
联接类型 |
|
|---|---|
内部联接 |
:code:`inner`(默认) |
交叉联接 |
|
完整外部联接 |
|
左外部联接 |
|
左反联接 |
|
左半联接 |
|
右外部联接 |
|
例如:
这将打印以下输出:
联接多个表¶
要联接多个表,请执行以下步骤:
为每个表创建一个 DataFrame。
对第一个 DataFrame 调用
DataFrame.join方法,并传入第二个 DataFrame。使用
join方法返回的 DataFrame 来调用join方法,并传入第三个 DataFrame。
可以 链接 join 调用,如下所示:
这将打印以下输出:
执行自联接¶
如果需要基于不同的列将表与其自身联接,则不能使用单个 DataFrame 执行自联接。以下使用单个 DataFrame 执行自联接的示例最终失败,因为联接的左侧和右侧都存在 "id" 的列表达式:
上面两个示例都失败了,并引发以下异常:
请改用 ` clone `_ 方法创建 DataFrame 对象的克隆,并使用这两个 DataFrame 对象执行联接:
如果要基于同一列执行自联接,请调用 join 方法,为 USING 子句传入列的名称(或列名称的数组):
执行操作以计算 DataFrame¶
如前所述, DataFrame 是延迟计算的,也就是说,在您执行操作之前, SQL 语句不会发送到服务器执行。执行操作会导致对 DataFrame 进行计算,并将相应的 SQL 语句发送到服务器执行。
以下部分介绍如何同步和异步地对 DataFrame 执行操作:
同步执行操作¶
要同步执行操作,请调用以下操作方法之一:
同步执行操作的方法 |
描述 |
|---|---|
|
计算 DataFrame,并将生成的数据集作为 ` Row |
|
计算 DataFrame,并返回 ` Row |
|
计算 DataFrame 并返回行数。 |
|
计算 DataFrame,并将行打印到控制台。请注意,此方法将行数限制为 10 行(默认值)。请参阅 打印 DataFrame 中的行。 |
|
执行查询,创建临时表,并将结果放入表中。该方法返回一个 |
|
将 DataFrame 中的数据保存到指定表中。请参阅 将数据保存到表中。 |
|
将 DataFrame 中的数据复制到指定表中。请参阅 将数据从文件复制到表中。 |
|
删除指定表中的行。请参阅 更新、删除和合并表中的行。 |
|
更新指定表中的行。请参阅 更新、删除和合并表中的行。 |
|
将行合并到指定的表中。请参阅 更新、删除和合并表中的行。 |
例如,要执行查询并返回结果数,请调用 count 方法:
还可以调用操作方法来:
注意:如果调用 schema 方法来获取 DataFrame 中列的定义,则无需调用操作方法。
异步执行操作¶
备注
此功能是在 Snowpark 0.11.0 中引入的。
要异步执行操作,请调用 async 方法以返回“异步执行者”对象(例如 DataFrameAsyncActor),然后在该对象中调用异步操作方法。
异步执行者对象的这些操作方法会返回一个 TypedAsyncJob 对象,您可以使用该对象检查异步操作的状态和检索操作的结果。
接下来的部分将介绍如何异步执行操作和检查结果。
了解异步操作的基本流程¶
可以使用以下方法异步执行操作:
异步执行操作的方法 |
描述 |
|---|---|
|
异步计算 DataFrame,以 ` Row |
|
异步计算 DataFrame,以检索 ` Row |
|
异步计算 DataFrame 以检索行数。 |
|
将 DataFrame 中的数据异步保存到指定表中。请参阅 将数据保存到表中。 |
|
将 DataFrame 中的数据异步复制到指定的表中。请参阅 将数据从文件复制到表中。 |
|
异步删除指定表中的行。请参阅 更新、删除和合并表中的行。 |
|
异步更新指定表中的行。请参阅 更新、删除和合并表中的行。 |
从返回的 ` TypedAsyncJob `_ 对象中,您可以执行以下操作:
要确定操作是否已完成,请调用
isDone方法。要获取与操作对应的查询 ID,请调用
getQueryId方法。要返回操作的结果(例如,对于
collect方法为Row对象的Array;对于count方法为行数),请调用getResult方法。请注意,
getResult是阻塞调用。要取消操作,请调用
cancel方法。
例如,要异步执行查询,并以 Row 对象的 Array 的形式检索结果,请调用 async().collect():
要异步执行查询并检索结果数,请调用 async().count():
指定等待的最大秒数¶
调用 getResult 方法时,可以使用 maxWaitTimeInSeconds 实参指定在尝试检索结果之前等待查询完成的最大秒数。例如:
如果省略此实参,该方法将等待 :ref:` snowpark_request_timeout_in_seconds <label-snowpark_java_request_timeout_in_seconds>` 配置属性指定的最大秒数。( :ref:` 创建 Session 对象 <label-snowpark_java_creating_session>` 时可以设置此属性。)
通过 ID 访问异步查询¶
如果有之前提交的异步查询的查询 ID,则可以调用 Session.createAsyncJob 方法创建一个 ` AsyncJob `_ 对象,该对象可用于检查查询状态、检索查询结果或取消查询。
请注意,与 TypedAsyncJob 不同的是, AsyncJob 没有提供用于检索结果的 getResult 方法。如果需要检索结果,请改为调用 getRows 或 getIterator 方法。
例如:
将行检索到 DataFrame 之中¶
指定 DataFrame 应如何转换 来执行查询并返回结果。可以返回 Array 中的所有行,也可以返回一个 Iterator,以便逐行遍历结果。在后一种情况下,如果数据量较大,则行将按块加载到内存中,以免将大量数据加载到内存中。
返回所有行¶
要同时返回所有行,请调用 collect 方法。此方法返回 ` Row _ 对象的数组。要从行中检索值,请调用 :samp:`get{Type} 方法(例如 getString、getInt 等)。
例如:
返回行的迭代器¶
如果要使用 Iterator 遍历结果中的 ` Row _ 对象,请调用 `toLocalIterator。如果结果中的数据量很大,该方法将按块加载行,以免同时将所有行加载到内存中。
例如:
返回前 n 行¶
要返回前 n 行,请调用 first 方法,并传入要返回的行数。
如 限制 DataFrame 中的行数 中所述,结果是非确定性的。如果希望结果是确定性的,请对经过排序的 DataFrame 调用此方法 (df.sort().first())。
例如:
打印 DataFrame 中的行¶
要将 DataFrame 中的前 10 行打印到控制台,请调用 show 方法。要打印出其他数量的行,请传入要打印的行数。
如 限制 DataFrame 中的行数 中所述,结果是非确定性的。如果希望结果是确定性的,请对经过排序的 DataFrame 调用此方法 (df.sort().show())。
例如:
更新、删除和合并表中的行¶
备注
此功能是在 Snowpark 0.7.0 中引入的。
当您调用 Session.table 为表创建 DataFrame 对象时,该方法将返回一个 Updatable 对象,此对象通过用于更新和删除表中数据的其他方法扩展了 DataFrame。(请参阅 ` Updatable `_。)
如果需要更新或删除表中的行,可以使用 Updatable 类的以下方法:
调用
update或updateColumn,以更新表中的现有行。请参阅 更新表中的行。调用
delete以删除表中的行。请参阅 删除表中的行。调用
merge,以根据另一个表或子查询中的数据在一个表中插入、更新和删除行。(这等效于 SQL 中的 MERGE 命令。)请参阅 将行合并到一个表中。
更新表中的行¶
要更新表中的行,请调用 update 或 updateColumn 方法,并传入 Map (它关联要更新的列和要分配给这些列的相应值):
要将列名称指定为
Map中的字符串,请调用updateColumn。要在
Map中指定Column对象,请调用update。
这两种方法都返回一个 UpdateResult 对象,其中包含已更新的行数。(请参阅 UpdateResult。)
备注
这两种方法都是 操作方法,这意味着调用方法会将 SQL 语句发送到服务器执行。
例如,要将名为 count 的列中的值替换为值 1,并且要使用将列名称 (String) 与相应值关联的 Map,请调用 updateColumn:
如果要在 Map 中使用 Column 对象来标识要更新的列,请调用 update:
如果仅在满足条件时才应进行更新,则可以将该条件指定为实参。例如,对于 category_id 列的值为 20 的行,要将名为 count 的列中的值替换为 2,请运行以下语句:
如果需要让条件基于与另一个 DataFrame 对象进行的联接,则可以将该 DataFrame 作为实参传入,并在条件中使用该 DataFrame。例如,对于 category_id 列与 DataFrame dfParts 中的 category_id 匹配的行,要将名为 count 的列中的值替换为 3,请运行以下语句:
删除表中的行¶
对于 delete 方法,可以指定一个条件来标识要删除的行,并且可以使该条件基于与另一个 DataFrame 进行的联接。delete 会返回一个 DeleteResult 对象,其中包含已删除的行数。(请参阅 DeleteResult。)
备注
delete 是 操作方法,这意味着调用方法会将 SQL 语句发送到服务器执行。
例如,要删除 category_id 列中的值为 1 的行,请运行以下语句:
如果条件引用另一个 DataFrame 中的列,则将该 DataFrame 作为第二个实参传入。例如,要删除 category_id 列与 DataFrame dfParts 中的 category_id 匹配的行,请将 dfParts 作为第二个实参传入:
将行合并到一个表中¶
要根据另一个表或子查询中的值在一个表中插入、更新和删除行(等效于 SQL 中的 MERGE 命令),请执行以下步骤:
在要将数据合并到的表的
Updatable对象中,调用merge方法,并传入另一个表的DataFrame对象和联接条件的列表达式。这将返回一个
MergeBuilder对象,您可以使用该对象指定要对匹配的行和不匹配的行执行的操作(例如插入、更新或删除)。(请参阅 MergeBuilder。)使用
MergeBuilder对象:要指定应对匹配的行执行的更新或删除,请调用
whenMatched方法。如果需要指定有关何时应更新或删除行的附加条件,则可以传入该条件的列表达式。
此方法返回一个
MatchedClauseBuilder对象,可用于指定要执行的操作。(请参阅 MatchedClauseBuilder。)调用
MatchedClauseBuilder对象中的update或delete方法,以指定应对匹配的行执行的更新或删除操作。这些方法返回一个MergeBuilder对象,可用于指定其他子句。要指定应在行不匹配时执行的插入,请调用
whenNotMatched方法。如果需要指定有关何时应插入行的附加条件,则可以传入该条件的列表达式。
此方法返回一个
NotMatchedClauseBuilder对象,可用于指定要执行的操作。(请参阅 NotMatchedClauseBuilder。)调用
NotMatchedClauseBuilder对象中的insert方法,以指定应在行不匹配时执行的插入操作。这些方法返回一个MergeBuilder对象,可用于指定其他子句。
指定了应执行的插入、更新和删除后,调用
MergeBuilder对象的collect方法,以对表执行指定的插入、更新和删除。collect会返回一个MergeResult对象,其中包含已插入、更新和删除的行数。(请参阅 ``_。)
以下示例将 source 表中带有 id 和 value 列的行插入到 target 表中(如果 target 表未包含具有匹配 ID 的行):
以下示例使用 source 表中具有同一 ID 的行的 value 列值更新 target 表中的行:
将数据保存到表中¶
可以将 DataFrame 的内容保存到新表或现有表中。为此,您必须具有以下权限:
对架构的 CREATE TABLE 权限(如果表不存在)。
对表的 INSERT 权限。
要将 DataFrame 的内容保存到表中,请执行以下操作:
调用 DataFrame 的 write 方法,以获取 DataFrameWriter 对象。
调用
DataFrameWriter对象的 ` mode `_ 方法,并传入一个 ` SaveMode `_ 对象,以指定有关写入表的首选项:要插入行,请传入
SaveMode.Append。要覆盖现有表,请传入
SaveMode.Overwrite。
此方法返回使用指定模式进行了配置的相同
DataFrameWriter对象。如果要在现有表中插入行 (
SaveMode.Append),并且 DataFrame 中的列名称与表中的列名称匹配,请调用 ` DataFrameWriter.option_,并传入 :code:"columnOrder"` 和"name"作为实参。备注
此方法是在 Snowpark 1.4.0 中引入的。
默认情况下,
columnOrder选项设置为"index",这意味着DataFrameWriter按照列的出现顺序插入值。例如,DataFrameWriter在表中第一列内插入 DataFrame 中第一列的值,在表中第二列内插入 DataFrame 中第二列的值,依此类推。此方法返回使用指定选项进行了配置的同一个
DataFrameWriter对象。调用
DataFrameWriter对象的 ` saveAsTable `_ 方法,以将 DataFrame 的内容保存到指定的表中。无需调用单独的方法(例如
collect)来执行将数据保存到表中的 SQL 语句。saveAsTable是执行 SQL 语句的 操作方法。
以下示例使用 DataFrame df 的内容覆盖现有表(由 tableName 变量标识):
以下示例将 DataFrame df 中的行插入到现有表(由 tableName 变量标识)。在此示例中,表和 DataFrame 都包含列 c1 和 c2。
该示例展示了这两种做法的区别:将 columnOrder 选项设置为 "name" (这会将值插入到与 DataFrame 列同名的表列中),以及使用默认的 columnOrder 选项(这会根据 DataFrame 中列的顺序将值插入到表列中)。
基于 DataFrame 创建视图¶
要基于 DataFrame 创建视图,请调用 createOrReplaceView 方法:
请注意,调用 createOrReplaceView 会立即创建新视图。更重要的是,它不会导致对 DataFrame 进行计算。(在您 :ref:` 执行操作 <label-snowpark_java_dataframe_action_method>` 之前,不会对 DataFrame 本身进行计算。)
通过调用 createOrReplaceView 创建的视图是持久保留的。如果不再需要该视图,可以 手动删除视图。
如果只需为会话创建临时视图,请改为调用 createOrReplaceTempView 方法:
缓存 DataFrame¶
在某些情况下,可能需要执行复杂的查询,并将结果保留下来,以供后续操作使用(而不必再次执行相同的查询)。在此类情况下,可以调用 cacheResult 方法来缓存 DataFrame 的内容。
此方法会:
运行查询。
在调用
cacheResult之前,无需 调用单独的操作方法来检索结果。cacheResult是执行查询的操作方法。将结果保存在临时表中
由于
cacheResult会创建临时表,因此,您必须对正在使用的架构具有 CREATE TABLE 权限。返回一个 ` HasCachedResult `_ 对象,该对象提供对临时表中结果的访问权。
由于
HasCachedResult扩展了DataFrame,因此,可以对此缓存数据执行一些同样可以对 DataFrame 执行的操作。
备注
由于 cacheResult 执行查询并将结果保存到表中,因此该方法可能会导致计算和存储成本增加。
例如:
请注意,调用该方法时,原始的 DataFrame 不受影响。例如,假设 dfTable 是 sample_product_data 表的 DataFrame :
调用 cacheResult 后, dfTable 仍指向 sample_product_data 表,而且您可以继续使用 dfTable 来查询和更新该表。
要使用临时表中的缓存数据,请使用 dfTempTable (即 cacheResult 返回的 HasCachedResult 对象)。
处理暂存区中的文件¶
Snowpark 库提供了一些类和方法,可让您通过使用暂存区中的文件 将数据加载到 Snowflake 中,以及 从 Snowflake 卸载数据。
备注
为了在暂存区上使用这些类和方法,您必须具有 使用暂存区所需的权限。
接下来的部分将说明如何使用这些类和方法:
在暂存区中上传和下载文件¶
要在暂存区中上传和下载文件,请使用 ` FileOperation _ 对象的 :code:`put 和 get 方法:
将文件上传到暂存区¶
要将文件上传到暂存区,请执行以下步骤:
验证您是否具有 将文件上传到暂存区的权限。
调用
FileOperation对象的 ` put `_ 方法将文件上传到暂存区。此方法执行 SQL PUT 命令。
要为 PUT 命令指定任何 可选参数,请创建参数和值的
Map,然后传入该Map作为options实参。例如:在
localFileName实参中,可以使用通配符(*和?)来标识要上传的一组文件。例如:
检查
put方法返回的 ` PutResult_ 对象的 :code:`Array,以确定文件是否上传成功。例如,要打印该文件的文件名和 PUT 操作的状态,请运行以下语句:
从暂存区下载文件¶
要从暂存区下载文件,请执行以下步骤:
使用输入流在暂存区中上传和下载数据¶
备注
此功能是在 Snowpark 1.4.0 中引入的。
要使用输入流将数据上传到暂存区上的文件,以及从暂存区上的文件下载数据,请使用 ` FileOperation _ 对象的 :code:`uploadStream 和 downloadStream 方法:
使用输入流将数据上传到暂存区上的文件¶
要将数据从 ` java.io.InputStream `_ 对象上传到暂存区上的文件,请执行以下步骤:
验证您是否具有 将文件上传到暂存区的权限。
调用
FileOperation对象的 ` uploadStream `_ 方法。传入应写入数据的暂存区上文件的完整路径和
InputStream对象。此外,使用compress实参,指定在上传数据之前是否应压缩数据。
例如:
使用输入流从暂存区上的文件下载数据¶
要将数据从暂存区上的文件下载到 ` java.io.InputStream `_ 对象,请执行以下操作:
验证您是否具有 从暂存区下载文件的权限。
调用
FileOperation对象的 ` downloadStream `_ 方法。传入包含要下载的数据的暂存区文件的完整路径。使用
decompress实参,指定是否要压缩文件中的数据。
例如:
为暂存区中的文件设置 DataFrame¶
本部分介绍如何为 Snowflake 暂存区中的文件设置 DataFrame。创建此 DataFrame 后,您可以使用 DataFrame 来执行以下操作:
要为 Snowflake 暂存区中的文件设置 DataFrame,请使用 DataFrameReader 类:
验证您是否具有以下权限:
以下其中一项:
架构的 CREATE TABLE 权限(如果您计划指定用于确定如何从暂存文件复制数据的 复制选项 )。
否则确保您具备架构的 CREATE FILE FORMAT 权限。
调用
Session类中的read方法,以访问DataFrameReader对象。如果文件采用 CSV 格式,请描述文件中的字段。要这样做,请执行以下操作:
创建一个 ` StructType `_ 对象,该对象包含描述文件中字段的 ` StructField `_ 对象的数组。
对于每个
StructField对象,请指定以下内容:字段的名称。
字段的数据类型(指定为
com.snowflake.snowpark_java.types包中的对象)。字段是否可为 null。
例如:
调用
DataFrameReader对象中的schema方法,传入StructType对象。例如:
schema方法会返回一个DataFrameReader对象,该对象配置为读取包含指定字段的文件。请注意,对于其他格式(如 JSON)的文件,无需执行此操作。对于这些文件,
DataFrameReader会将数据视为字段名称为$1、类型为 VARIANT 的单个字段处理。
如果需要指定有关数据读取方式的其他信息(例如,数据经过压缩,或者 CSV 文件使用分号而非逗号来分隔字段),请调用 ` DataFrameReader.option `_ 或 ` DataFrameReader.options `_ 方法。
传入要设置的选项的名称和值。可以设置以下类型的选项:
有关 CREATE FILE FORMAT 的文档。
COPY INTO TABLE 文档。
请注意,当您 将数据检索到 DataFrame 中时,设置复制选项可能造成费用更昂贵的执行策略。
下面的示例设置了
DataFrameReader对象,以查询未压缩且使用分号作为字段分隔符的 CSV 文件中的数据。option方法返回使用指定选项进行了配置的DataFrameReader对象。若要设置多个选项,可以 链接多个调用 到
option方法(如上面的示例所示),或是调用 ` DataFrameReader.options`_ 方法,并传入选项名称和值的Map。调用与文件格式相对应的方法。您可以调用以下方法之一:
` DataFrameReader.avro `_
` DataFrameReader.csv `_
` DataFrameReader.json `_
` DataFrameReader.orc `_
` DataFrameReader.parquet `_
` DataFrameReader.xml `_
调用这些方法时,传入要读取的文件的暂存区位置。例如:
若要指定以相同前缀开头的多个文件,请在暂存区名称后指定前缀。例如,要从暂存区
@mystage加载具有前缀csv_的文件,请执行以下操作:与文件格式相对应的方法会返回文件的一个 ` CopyableDataFrame
_ 对象。:code:`CopyableDataFrame扩展了DataFrame,并提供用于处理暂存文件中的数据的其他方法。调用操作方法,以执行以下操作:
与表的 DataFrames 一样,在您调用 :ref:` 操作方法 <label-snowpark_java_dataframe_action_method>` 之前,不会将数据检索到 DataFrame 中。
将数据从文件加载到 DataFrame 中¶
在 为暂存区中的文件设置 DataFrame 之后,可以将文件中的数据加载到 DataFrame 中:
使用 DataFrame 对象方法,对 :ref:` 数据集执行所需的任何转换 <label-snowpark_java_dataframe_transform>` (例如,选择特定字段、筛选行等)。
例如,要从名为
mystage的暂存区中名为data.json的 JSON 文件中提取color元素,请运行以下语句:如前所述,对于格式并非 CSV 的文件(例如 JSON 格式的文件),
DataFrameReader会将文件中的数据视为名为$1的单个 VARIANT 列。调用
DataFrame.collect方法以加载数据。例如:
将数据从文件复制到表中¶
为暂存区中的文件设置 DataFrame 之后,可以调用 copyInto 方法将数据复制到表中。此方法执行 COPY INTO <table> 命令。
备注
在调用 copyInto 之前,无需调用 collect 方法。在调用 copyInto 之前,文件中的数据不需要在 DataFrame 之中。
例如,以下代码将 myFileStage 指定的 CSV 文件中的数据加载到 mytable 表中。由于数据位于 CSV 文件中,代码还必须 描述文件中的字段。为此,该示例调用 DataFrameReader 对象的 ` schema _ 方法,并传入 ` StructType `_ 对象 (``schemaForDataFile`),其中包含了描述字段的 ` StructField `_ 对象的数组。
将 DataFrame 保存到暂存区上的文件¶
备注
此功能是在 Snowpark 1.5.0 中引入的。
如果需要将 DataFrame 保存到暂存区上的文件中,可以调用与文件格式对应的 ` DataFrameWriter _ 方法(例如 :code:`csv 方法用于写入 CSV 文件),并传入应将文件保存到的暂存区位置。这些 DataFrameWriter 方法执行 COPY INTO <location> 命令。
备注
在调用这些 DataFrameWriter 方法之前,无需调用 collect 方法。在调用这些方法之前,文件中的数据不需要在 DataFrame 之中。
要将 DataFrame 的内容保存到暂存区上的文件中,请执行以下操作:
调用 DataFrame 对象的 ` write
_ 方法,以获取 ` DataFrameWriter `_ 对象。例如,要获取代表名为 ``sample_product_data`的表的 DataFrame 的DataFrameWriter对象,请运行以下语句:如果要覆盖文件的内容(如果文件存在),请调用
DataFrameWriter对象的 ` mode_ 方法,并传入 :code:`SaveMode.Overwrite。否则,默认情况下,如果暂存区上的指定文件已存在,
DataFrameWriter会报告错误。mode方法返回使用指定模式进行了配置的相同DataFrameWriter对象。例如,要指定
DataFrameWriter应覆盖暂存区上的文件,请运行以下语句:如果需要指定有关应如何保存数据的其他信息(例如,应压缩数据,或者要使用分号来分隔 CSV 文件中的字段),请调用 ` DataFrameWriter.option `_ 方法或 ` DataFrameWriter.options `_ 方法。
传入要设置的选项的名称和值。可以设置以下类型的选项:
有关 COPY INTO <location> 的文档。
有关 COPY INTO <location> 的文档中介绍的 复制选项。
请注意,不能使用
option方法设置以下选项:TYPE 格式类型选项。
OVERWRITE 复制选项。若要设置此选项,请改为调用
mode方法(如上一步所述)。
下面的示例设置
DataFrameWriter对象,以使用分号(而非逗号)作为字段分隔符,以未压缩的形式将数据保存到 CSV 文件中。option方法返回使用指定选项进行了配置的DataFrameWriter对象。要设置多个选项,可以如上例所示 链接多个调用 (调用的是
option方法),或者调用 ` DataFrameWriter.options`_ 方法,并传入选项名称和值的Map。若要返回有关已保存的各文件的详细信息,请将
DETAILED_OUTPUT复制选项 设置为TRUE。默认情况下,
DETAILED_OUTPUT为FALSE,这代表该方法会返回一行输出,其中包含"rows_unloaded"、"input_bytes"和"output_bytes"字段。将
DETAILED_OUTPUT设置为TRUE时,该方法会为所保存的每个文件返回一行输出。每行都包含FILE_NAME、FILE_SIZE和ROW_COUNT字段。调用文件格式对应的方法,将数据保存到文件中。您可以调用以下方法之一:
调用这些方法时,传入应写入数据的文件的暂存区位置(如
@mystage)。默认情况下,该方法会将数据保存到名称带有前缀
data_的文件中(例如@mystage/data_0_0_0.csv)。如果希望使用不同的前缀命名文件,请在暂存区名称后指定前缀。例如:本示例将 DataFrame 的内容保存到名称以
saved_data为前缀(例如@mystage/saved_data_0_0_0.csv)的文件中。检查返回的 ` WriteFileResult `_ 对象,以获取有关写入文件的数据量的信息。
从
WriteFileResult对象,您可以访问 COPY INTO <location> 命令生成的输出:要以 Row 对象数组的形式访问输出行,请调用
getRows方法。要确定行中有哪些字段,请调用
getSchema方法,该方法将返回描述行中字段的 StructType。
例如,要在输出行中打印出字段名称和值,请运行以下语句:
以下示例使用 DataFrame 将名为 car_sales 的表的内容保存到 JSON 文件,这些文件位于 @mystage 暂存区中,名称带有 saved_data 前缀(例如 @mystage/saved_data_0_0_0.json)。示例代码:
覆盖文件(如果暂存区中已存在该文件)。
返回有关保存操作的详细输出。
保存未压缩的数据。
最后,示例代码在返回的输出行中打印出每个字段和值:
使用半结构化数据¶
使用 DataFrame,您可以查询和访问 :doc:` 半结构化数据 </user-guide/semistructured-intro>` (例如 JSON 数据)。接下来的几个部分将介绍如何在 DataFrame 中处理半结构化数据。
备注
这些部分中的示例使用 示例中使用的示例数据 中的示例数据。
遍历半结构化数据¶
若要引用半结构化数据中的特定字段或元素,请使用 ` Column `_ 对象的以下方法:
使用 subField("<field_name>") 返回 OBJECT(或包含 OBJECT 的 VARIANT)中字段的
Column对象。使用 subField(<index>) 返回 ARRAY(或包含 ARRAY 的 VARIANT )中元素的
Column对象。
备注
如果路径中的字段名称或元素不规则,并且导致难以使用 Column.apply 方法,则可以改用 ` Functions.get _、 Functions.get_ignore_case `_ 或 ` Functions.get_path `_。
例如,以下代码会从 :ref:` 示例数据 <label-sample_data_semistructured_data>` 的 src 列的对象内,选择 dealership 字段:
该代码会打印以下输出:
备注
DataFrame 中的值放在双引号之间,因为这些值以字符串字面量的形式返回。若要将这些值的类型转换为特定类型,请参阅 显式转换半结构化数据中的值。
还可以 链接多个方法调用,以遍历特定字段或元素的路径。
例如,以下代码选择 salesperson 对象中的 name 字段:
该代码会打印以下输出:
再举一个例子,下面的代码选择 vehicle 字段的第一个元素,其中包含车辆数组。该示例还会选择第一个元素中的 price 字段。
该代码会打印以下输出:
如果路径中的字段名称或元素不规则,并且导致难以使用 Column.subField 方法,则可以使用 ` Functions.get _、 Functions.get_ignore_case _ 或 ` Functions.get_path `_ 函数来代替 :code:`apply 方法。
例如,以下代码行都会打印对象中指定字段的值:
同样,以下代码行都会打印对象中指定路径处的字段值:
显式转换半结构化数据中的值¶
默认情况下,字段和元素的值以字符串字面量(包括双引号)的形式返回,如上面的示例所示。
若要避免意外结果,请调用 :ref:` cast <label-snowpark_java_dataframe_cols_cast>` 方法,将值转换为特定类型。例如,以下代码会打印出未经类型转换和经过类型转换的值:
该代码会打印以下输出:
将对象数组展平为行¶
如果需要将半结构化数据“展平”为 DataFrame(例如,为数组中的每个对象生成一行),请调用 flatten 方法。此方法与 FLATTEN SQL 函数等效。如果传入对象或数组的路径,该方法会返回一个 DataFrame,其中包含对象或数组中各字段或元素的行。
例如,在 :ref:` 示例数据 <label-sample_data_semistructured_data>` 中, src:customer 是一个包含有关客户的信息的对象数组。每个对象都包含 name 和 address 字段。
如果将此路径传递给 flatten 函数:
该方法返回一个 DataFrame:
从 DataFrame 中,您可以从 VALUE 字段中的每个对象选择 name 和 address 字段:
以下代码 将值类型转换为特定类型 并更改列的名称,从而补充了上一个示例:
执行 SQL 语句¶
若要执行您指定的 SQL 语句,请调用 Session 类中的 sql 方法,然后传入要执行的语句。该方法返回一个 DataFrame。
请注意,在您 :ref:` 调用操作方法 <label-snowpark_java_dataframe_action_method>` 之前, SQL 语句不会执行。
如果要 :ref:` 调用方法来转换 DataFrame <label-snowpark_java_dataframe_transform>` (例如 filter、select 等),请注意,仅当基础 SQL 语句是 SELECT 语句时,这些方法才有效。其他类型的 SQL 语句不支持转换方法。