在 Snowpark Java 中使用 DataFrames¶
在 Snowpark 中,主要通过 DataFrame 来查询和处理数据。本主题说明如何使用 DataFrames。
本主题内容:
要检索和操作数据,需要使用 ` DataFrame `_ 类。DataFrame 表示延迟评估的关系型数据集,延迟评估是指仅在触发特定操作时执行。从某种意义上说, DataFrame 就像一个需要评估才能检索数据的查询。
要将数据检索到 DataFrame 之中,请执行以下步骤:
-
例如,您可以创建一个 DataFrame,以保存来自表和外部 CSV 文件的数据或执行 SQL 语句时产生的数据。
-
例如,可以指定应该选择哪些列、如何筛选行、如何对结果进行排序和分组等。
-
为了将数据检索到 DataFrame 之中,必须调用执行操作的方法(例如
collect()
方法)。
接下来的部分将更详细地介绍这些步骤。
设置本部分的示例¶
本部分的一些示例使用 DataFrame 查询名为 sample_product_data
的表。若要运行这些示例,可以执行以下 SQL 语句,以创建此表并用一些数据来填充此表:
CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT, amount NUMBER(12, 2), quantity INT, product_date DATE);
INSERT INTO sample_product_data VALUES
(1, 0, 5, 'Product 1', 'prod-1', 1, 10, 1.00, 15, TO_DATE('2021.01.01', 'YYYY.MM.DD')),
(2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20, 2.00, 30, TO_DATE('2021.02.01', 'YYYY.MM.DD')),
(3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30, 3.00, 45, TO_DATE('2021.03.01', 'YYYY.MM.DD')),
(4, 0, 10, 'Product 2', 'prod-2', 2, 40, 4.00, 60, TO_DATE('2021.04.01', 'YYYY.MM.DD')),
(5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50, 5.00, 75, TO_DATE('2021.05.01', 'YYYY.MM.DD')),
(6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60, 6.00, 90, TO_DATE('2021.06.01', 'YYYY.MM.DD')),
(7, 0, 20, 'Product 3', 'prod-3', 3, 70, 7.00, 105, TO_DATE('2021.07.01', 'YYYY.MM.DD')),
(8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80, 7.25, 120, TO_DATE('2021.08.01', 'YYYY.MM.DD')),
(9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90, 7.50, 135, TO_DATE('2021.09.01', 'YYYY.MM.DD')),
(10, 0, 50, 'Product 4', 'prod-4', 4, 100, 7.75, 150, TO_DATE('2021.10.01', 'YYYY.MM.DD')),
(11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100, 8.00, 165, TO_DATE('2021.11.01', 'YYYY.MM.DD')),
(12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100, 8.50, 180, TO_DATE('2021.12.01', 'YYYY.MM.DD'));
要验证表是否已创建,请运行:
SELECT * FROM sample_product_data;
构造 DataFrame¶
要构造 DataFrame,可以使用 Session
类中的方法。以下每种方法都基于不同类型的数据源构造 DataFrame :
要基于表、视图或流中的数据创建 DataFrame,请调用
table
方法:// Create a DataFrame from the data in the "sample_product_data" table. DataFrame dfTable = session.table("sample_product_data"); // Print out the first 10 rows. dfTable.show();
备注
table
方法返回一个Updatable
对象。Updatable
扩展了DataFrame
,并提供了用于处理表中数据的其他方法(例如,用于更新和删除数据的方法)。请参阅 更新、删除和合并表中的行。要基于指定值创建 DataFrame,请执行以下步骤:
构造一个由包含值的
Row
对象组成的数组。构造一个
StructType
对象,以描述这些值的数据类型。调用
createDataFrame
方法,并传入数组和StructType
对象。
// Import name from the types package, which contains StructType and StructField. import com.snowflake.snowpark_java.types.*; ... // Create a DataFrame containing specified values. Row[] data = {Row.create(1, "a"), Row.create(2, "b")}; StructType schema = StructType.create( new StructField("num", DataTypes.IntegerType), new StructField("str", DataTypes.StringType)); DataFrame df = session.createDataFrame(data, schema); // Print the contents of the DataFrame. df.show();
备注
构造 DataFrame 时,Snowflake 的保留字不能用作有效的列名。有关保留字的列表,请参阅 保留和受限关键字。
要创建包含特定值范围的 DataFrame,请调用
range
方法:// Create a DataFrame from a range DataFrame dfRange = session.range(1, 10, 2); // Print the contents of the DataFrame. dfRange.show();
要 为暂存区中的文件创建 DataFrame,请调用
read
来获取DataFrameReader
对象。在DataFrameReader
对象中,调用与文件中的数据格式对应的方法:// Create a DataFrame from data in a stage. DataFrame dfJson = session.read().json("@mystage2/data1.json"); // Print the contents of the DataFrame. dfJson.show();
要创建 DataFrame 来保存 SQL 查询的结果,请调用
sql
方法:// Create a DataFrame from a SQL query DataFrame dfSql = session.sql("SELECT name from sample_product_data"); // Print the contents of the DataFrame. dfSql.show();
注意:虽然可以使用此方法执行 SELECT 语句,以从表和暂存文件中检索数据,但应改用
table
和read
方法。在开发工具中,table
和read
之类的方法可以提供更好的语法突出显示、错误突出显示和智能代码补全效果。
指定应该如何转换数据集¶
要指定应选择哪些列,以及应如何对结果进行筛选、排序、分组等,请调用能转换数据集的 DataFrame 方法。要在这些方法中标识列,请使用 Functions.col
静态方法或计算结果为列的表达式。(请参阅 指定列和表达式。)
例如:
若要指定应返回的行,请调用
filter
方法:// Create a DataFrame for the rows with the ID 1 // in the "sample_product_data" table. DataFrame df = session.table("sample_product_data").filter( Functions.col("id").equal_to(Functions.lit(1))); df.show();
若要指定应选择的列,请调用
select
方法:// Create a DataFrame that contains the id, name, and serial_number // columns in te "sample_product_data" table. DataFrame df = session.table("sample_product_data").select( Functions.col("id"), Functions.col("name"), Functions.col("serial_number")); df.show();
每个方法都返回一个经过转换的新 DataFrame 对象。(该方法不会影响原始 DataFrame 对象。)这就意味着,如果要应用多个转换,可以 将多个方法调用链接起来,基于前一个方法调用所返回的新 DataFrame 对象来调用每个后续转换方法。
请注意,这些转换方法不会从 Snowflake 数据库中检索数据。( 执行操作以计算 DataFrame 中描述的操作方法会执行数据检索。)转换方法只是指定应如何构造 SQL 语句。
指定列和表达式¶
调用这些转换方法时,可能需要指定列或者使用列的表达式。例如,调用 select
方法时,需要指定应选择的列。
要引用列,请通过调用 ` Functions.col `_ 静态方法来创建 ` Column `_ 对象。
DataFrame dfProductInfo = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
dfProductInfo.show();
备注
要为字面量创建 Column
对象,请参阅 将字面量用作列对象。
指定筛选器、投影、联接条件等时,可以在表达式中使用 Column
对象。例如:
可以将
Column
对象与filter
方法一起使用,以指定筛选条件:// Specify the equivalent of "WHERE id = 12" // in an SQL SELECT statement. DataFrame df = session.table("sample_product_data"); df.filter(Functions.col("id").equal_to(Functions.lit(12))).show();
// Specify the equivalent of "WHERE key + category_id < 10" // in an SQL SELECT statement. DataFrame df2 = session.table("sample_product_data"); df2.filter(Functions.col("key").plus(Functions.col("category_id")).lt(Functions.lit(10))).show();
可以将
Column
对象与select
方法一起使用,以定义别名:// Specify the equivalent of "SELECT key * 10 AS c" // in an SQL SELECT statement. DataFrame df3 = session.table("sample_product_data"); df3.select(Functions.col("key").multiply(Functions.lit(10)).as("c")).show();
可以将
Column
对象与join
方法一起使用,以定义联接条件:// Specify the equivalent of "sample_a JOIN sample_b on sample_a.id_a = sample_b.id_a" // in an SQL SELECT statement. DataFrame dfLhs = session.table("sample_a"); DataFrame dfRhs = session.table("sample_b"); DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a"))); dfJoined.show();
引用不同 DataFrames 中的列¶
引用两个不同 DataFrame 对象中具有相同名称的列时(例如,基于该列联接 DataFrames ),可以在每个 DataFrame 对象中使用 col
方法引用该对象中的列(例如 df1.col("name")
和 df2.col("name")
)。
下面的示例演示了如何使用 col
方法来引用特定 DataFrame 中的列。该示例联接两个 DataFrame 对象,两者均具有名为 value
的列。该示例使用 Column
对象的 as
方法来更改新创建的 DataFrame 中的列名称。
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a"))).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"));
dfJoined.show();
将对象标识符(表名称、列名称等)放在双引号里¶
您指定的数据库、架构、表和暂存区的名称必须符合 Snowflake 标识符要求。指定名称时,Snowflake 会将该名称视为大写形式。例如,以下调用是等效的:
// The following calls are equivalent:
df.select(Functions.col("id123"));
df.select(Functions.col("ID123"));
如果名称不符合标识符要求,则必须将名称放在双引号里 ("
)。对于 Scala 字符串字面量中的双引号字符,请使用反斜杠 (\
) 进行转义。例如,以下表名称并非以字母或下划线开头,因此必须将该名称放在双引号里:
DataFrame df = session.table("\"10tablename\"");
请注意,指定 :emph:` 列 ` 的名称时,不需要将名称放在双引号里。如果列名称不符合标识符要求,Snowpark 库会自动将列名称放在双引号里:
// The following calls are equivalent:
df.select(Functions.col("3rdID"));
df.select(Functions.col("\"3rdID\""));
// The following calls are equivalent:
df.select(Functions.col("id with space"));
df.select(Functions.col("\"id with space\""));
如果已在列名称两侧添加了双引号,则该库不会在列名称两侧插入其他双引号。
在某些情况下,列名称可能包含双引号字符:
describe table quoted;
+------------------------+ ...
| name | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted" | ...
+------------------------+ ...
如 标识符要求 中所述,对于带双引号的标识符中的每个双引号字符,都必须使用两个双引号字符(例如 "name_with_""air""_quotes"
和 """column_name_quoted"""
):
DataFrame dfTable = session.table("quoted");
dfTable.select("\"name_with_\"\"air\"\"_quotes\"");
dfTable.select("\"\"\"column_name_quoted\"\"\"");
请注意,如果将标识符放在双引号里(无论是您显式添加了引号,还是由库为您添加了引号), Snowflake 都会将标识符视为区分大小写:
// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(Functions.col("id with space"));
df.select(Functions.col("ID WITH SPACE"));
将字面量用作列对象¶
要在传入 Column
对象的方法中使用字面量,请将字面量传递给 Functions
类中的 lit
静态方法,从而为字面量创建 Column
对象。例如:
// Show the first 10 rows in which category_id is greater than 5.
// Use `Functions.lit(5)` to create a Column object for the literal 5.
DataFrame df = session.table("sample_product_data");
df.filter(Functions.col("category_id").gt(Functions.lit(5))).show();
如果字面量是 Java 中的浮点或双精度值(例如 0.05
默认情况下被视为双精度值),则 Snowpark 库会生成 SQL,以将该值隐式转换为相应的 Snowpark 数据类型(例如 0.05::DOUBLE
)。这可能会产生与指定的确切数字不同的近似值。
例如,以下代码不显示匹配的行,即使筛选器(匹配大于或等于 0.05
的值)应匹配 DataFrame 中的行:
// Create a DataFrame that contains the value 0.05.
DataFrame df = session.sql("select 0.05 :: Numeric(5, 2) as a");
// Applying this filter results in no matching rows in the DataFrame.
df.filter(Functions.col("a").leq(Functions.lit(0.06).minus(Functions.lit(0.01)))).show();
问题在于 Functions.lit(0.06)
和 Functions.lit(0.01)
为 0.06
和 0.01
产生近似值而不是确切值。
要避免此问题,请 将字面量转换为要使用的 Snowpark 类型。例如,要使用精度为 5、小数位数为 2 的 NUMBER,请运行以下语句:
import com.snowflake.snowpark_java.types.*;
...
df.filter(Functions.col("a").leq(Functions.lit(0.06).cast(DataTypes.createDecimalType(5, 2)).minus(Functions.lit(0.01).cast(DataTypes.createDecimalType(5, 2))))).show();
将列对象的类型转换为特定类型¶
要将 Column
对象的类型转换为特定类型,请调用 ` cast _ 方法,然后从 ` com.snowflake.snowpark_java.types 包 `_ 传入类型对象。例如,要将字面量转换为精度为 5、小数位数为 2 的 :ref:`label-data_type_number,请运行以下语句:
// Import for the DecimalType class..
import com.snowflake.snowpark_java.types.*;
Column decimalValue = Functions.lit(0.05).cast(DataTypes.createDecimalType(5,2));
链接多个方法调用¶
由于每个 转换 DataFrame 对象的方法 都会返回一个应用了转换的新 DataFrame 对象,因此您可以 链接多个方法调用 (link removed),以生成以其他方式转换的新 DataFrame。
下面的示例返回为如下目的而配置的 DataFrame :
查询
sample_product_data
表。返回
id = 1
的行。选择
name
和serial_number
列。
DataFrame dfProductInfo = session.table("sample_product_data").filter(Functions.col("id").equal_to(Functions.lit(1))).select(Functions.col("name"), Functions.col("serial_number"));
dfProductInfo.show();
在此示例中:
session.table("sample_product_data")
返回sample_product_data
表的 DataFrame。尽管 DataFrame 尚且不包含表中的数据,但该对象确实包含表列的定义。
filter(Functions.col("id").equal_to(Functions.lit(1)))
返回sample_product_data
表的 DataFrame(设置为返回id = 1
的行)。请再次注意, DataFrame 尚未包含表中匹配的行。在 :ref:` 调用操作方法 <label-snowpark_java_dataframe_action_method>` 之前,不会检索匹配的行。
select(Functions.col("name"), Functions.col("serial_number"))
返回一个 DataFrame,包含sample_product_data
表中id = 1
的行的name
和serial_number
列。
在链接多个方法调用时,请注意调用顺序非常重要。每个方法调用都返回一个已转换的 DataFrame。确保后续调用使用已转换的 DataFrame。
例如,在下面的代码中,select
方法返回仅包含两列的 DataFrame:name
和 serial_number
。对此 DataFrame 的 filter
方法调用会失败,因为它使用的 id
列不在已转换的 DataFrame 中。
// This fails with the error "invalid identifier 'ID'."
DataFrame dfProductInfo = session.table("sample_product_data").select(Functions.col("name"), Functions.col("serial_number")).filter(Functions.col("id").equal_to(Functions.lit(1)));
dfProductInfo.show();
相反,以下代码会成功执行,因为对包含 sample_product_data
表中所有列(包括 id
列)的 DataFrame 调用了 filter()
方法:
// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
DataFrame dfProductInfo = session.table("sample_product_data").filter(Functions.col("id").equal_to(Functions.lit(1))).select(Functions.col("name"), Functions.col("serial_number"));
dfProductInfo.show();
请注意,可能需要按照不同于在 SQL 语句中使用等效关键字(SELECT 和 WHERE)时的顺序执行 select
和 filter
方法调用。
限制 DataFrame 中的行数¶
要限制 DataFrame 中的行数,可以使用 ` limit `_ 转换方法。
Snowpark API 还提供了以下操作方法,可检索和打印出有限数量的行:
` first _ 操作方法(用于执行查询并返回前 :samp:`{n} 行)
` show _ 操作方法(用于执行查询并打印前 :samp:`{n} 行)
这些方法能有效地将 LIMIT 子句添加到执行的 SQL 语句中。
如 :ref:` LIMIT 使用说明 <label-limit_cmd_usage_notes>` 中所述,除非将排序顺序 (ORDER BY) 与 LIMIT 一起指定,否则结果是不确定的。
要使 ORDER BY 子句与 LIMIT 子句一起出现(从而避免 ORDER BY 出现在另一个子查询等地方),必须调用 limit 方法,以限制 sort
方法返回的 DataFrame 中的结果数。
例如,如果 链接多个方法调用:
DataFrame df = session.table("sample_product_data");
// Limit the number of rows to 5, sorted by parent_id.
DataFrame dfSubset = df.sort(Functions.col("parent_id")).limit(5);
// Return the first 5 rows, sorted by parent_id.
Row[] arrayOfRows = df.sort(Functions.col("parent_id")).first(5);
// Print the first 5 rows, sorted by parent_id.
df.sort(Functions.col("parent_id")).show(5);
检索列定义¶
要在 DataFrame 的数据集中检索列的定义,请调用 schema
方法。此方法会返回一个 StructType
对象,其中包含 StructField
对象的 Array
。每个 StructField
对象都包含一列的定义。
import com.snowflake.snowpark_java.types.*;
...
// Get the StructType object that describes the columns in the
// underlying rowset.
StructType tableSchema = session.table("sample_product_data").schema();
System.out.println("Schema for sample_product_data: " + tableSchema);
在返回的 StructType
对象中,列名称始终是规范化的。不带引号的标识符以大写形式返回,带引号的标识符以其定义所用的确切大小写形式返回。
下面的示例创建一个 DataFrame,其中包含名为 ID
和 3rd
的列。对于名为 3rd
的列,Snowpark 库会自动将名称放在双引号里(即 "3rd"
),因为 该名称不符合标识符要求。
该示例调用 schema
方法,随后对返回的 StructType
对象调用 names
方法,以获取列名称的数组。这些名称使用 schema
方法返回的 StructType
进行规范化。
import java.util.Arrays;
...
// Create a DataFrame containing the "id" and "3rd" columns.
DataFrame dfSelectedColumns = session.table("sample_product_data").select(Functions.col("id"), Functions.col("3rd"));
// Print out the names of the columns in the schema.
System.out.println(Arrays.toString(dfSelectedColumns.schema().names()));
联接 DataFrames¶
若要联接 DataFrame 对象,请调用 ` join `_ 方法。
以下部分说明如何使用 DataFrames 执行联接:
设置联接的示例数据¶
后面部分中的示例使用示例数据,您可以通过执行以下 SQL 语句来设置这些数据:
CREATE OR REPLACE TABLE sample_a (
id_a INTEGER,
name_a VARCHAR,
value INTEGER
);
INSERT INTO sample_a (id_a, name_a, value) VALUES
(10, 'A1', 5),
(40, 'A2', 10),
(80, 'A3', 15),
(90, 'A4', 20)
;
CREATE OR REPLACE TABLE sample_b (
id_b INTEGER,
name_b VARCHAR,
id_a INTEGER,
value INTEGER
);
INSERT INTO sample_b (id_b, name_b, id_a, value) VALUES
(4000, 'B1', 40, 10),
(4001, 'B2', 10, 5),
(9000, 'B3', 80, 15),
(9099, 'B4', NULL, 200)
;
CREATE OR REPLACE TABLE sample_c (
id_c INTEGER,
name_c VARCHAR,
id_a INTEGER,
id_b INTEGER
);
INSERT INTO sample_c (id_c, name_c, id_a, id_b) VALUES
(1012, 'C1', 10, NULL),
(1040, 'C2', 40, 4000),
(1041, 'C3', 40, 4001)
;
指定联接的列¶
利用 DataFrame.join
方法,可以通过以下方式之一指定要使用的列:
指定描述联接条件的列表达式。
指定应用作联接中的公共列的一个或多个列。
以下示例对名为 id_a
的列执行内部联接:
// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));
dfJoined.show();
请注意,该示例使用 DataFrame.col
方法来指定要用于联接的条件。有关此方法的更多信息,请参阅 指定列和表达式。
这将打印以下输出:
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
----------------------------------------------------------------------
联接结果中重复的相同列名称¶
在联接产生的 DataFrame 中,Snowpark 库使用在已联接的表中找到的列名称,即使这些列名称在表之间相同也是如此。发生这种情况时,这些列名称将在联接产生的 DataFrame 中重复。要按名称访问重复的列,请对表示列的原始表的 DataFrame 调用 col
方法。(有关指定列的更多信息,请参阅 引用不同 DataFrames 中的列。)
以下示例中的代码联接两个 DataFrames,然后对已联接的 DataFrame 调用 select
方法。此代码通过从表示相应 DataFrame 对象( dfRhs
和 dfLhs
)的变量调用 col
方法来指定要选择的列。它使用 as
方法为 select
方法创建的 DataFrame 中的列提供新名称。
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));
DataFrame dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"));
dfSelected.show();
这将打印以下输出:
------------------------------
|"LEFTVALUE" |"RIGHTVALUE" |
------------------------------
|5 |5 |
|10 |10 |
|15 |15 |
------------------------------
在保存或缓存之前删除重复列¶
请注意,当联接产生的 DataFrame 包含重复的列名称时,必须通过删除重复列或重命名列来移除 DataFrame 中的重复项,然后才能将结果保存到表中或者缓存 DataFrame。对于保存到表或缓存的 DataFrame 中的重复列名称,Snowpark 库会将重复的列名称替换为别名,让它们不再重复。
对于缓存的 DataFrame,下例说明了其输出在以下情况下将如何显示:列名称 ID_A
和 VALUE
在两个表的联接结果中重复,之后,在缓存结果之前未删除重复列或未重命名列。
--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A" |"NAME_A" |"l_ZSz7_VALUE" |"ID_B" |"NAME_B" |"r_heec_ID_A" |"r_heec_VALUE" |
--------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
--------------------------------------------------------------------------------------------------
执行自然联接¶
要执行 :ref:` 自然联接 <label-querying_join_natural>` (在具有相同名称的列上联接 DataFrames),请调用 ` naturalJoin `_ 方法。
以下示例在 sample_a
和 sample_b
的公共列( id_a
列)上联接这两个表的 DataFrames :
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.naturalJoin(dfRhs);
dfJoined.show();
这将打印以下输出:
---------------------------------------------------
|"ID_A" |"VALUE" |"NAME_A" |"ID_B" |"NAME_B" |
---------------------------------------------------
|10 |5 |A1 |4001 |B2 |
|40 |10 |A2 |4000 |B1 |
|80 |15 |A3 |9000 |B3 |
---------------------------------------------------
指定联接类型¶
默认情况下, DataFrame.join
方法创建内部联接。要指定其他类型的联接,请将 joinType
实参设置为以下值之一:
联接类型 |
|
---|---|
内部联接 |
|
左外部联接 |
|
右外部联接 |
|
完整外部联接 |
|
交叉联接 |
|
例如:
// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")), "left");
dfLeftOuterJoin.show();
这将打印以下输出:
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|40 |A2 |10 |4000 |B1 |40 |10 |
|10 |A1 |5 |4001 |B2 |10 |5 |
|80 |A3 |15 |9000 |B3 |80 |15 |
|90 |A4 |20 |NULL |NULL |NULL |NULL |
----------------------------------------------------------------------
联接多个表¶
要联接多个表,请执行以下步骤:
为每个表创建一个 DataFrame。
对第一个 DataFrame 调用
DataFrame.join
方法,并传入第二个 DataFrame。使用
join
方法返回的 DataFrame 来调用join
方法,并传入第三个 DataFrame。
可以 链接 join
调用,如下所示:
DataFrame dfFirst = session.table("sample_a");
DataFrame dfSecond = session.table("sample_b");
DataFrame dfThird = session.table("sample_c");
DataFrame dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a").equal_to(dfSecond.col("id_a"))).join(dfThird, dfFirst.col("id_a").equal_to(dfThird.col("id_a")));
dfJoinThreeTables.show();
这将打印以下输出:
------------------------------------------------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |"ID_C" |"NAME_C" |"ID_A" |"ID_B" |
------------------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |1012 |C1 |10 |NULL |
|40 |A2 |10 |4000 |B1 |40 |10 |1040 |C2 |40 |4000 |
|40 |A2 |10 |4000 |B1 |40 |10 |1041 |C3 |40 |4001 |
------------------------------------------------------------------------------------------------------------
执行自联接¶
如果需要基于不同的列将表与其自身联接,则不能使用单个 DataFrame 执行自联接。以下使用单个 DataFrame 执行自联接的示例最终失败,因为联接的左侧和右侧都存在 "id"
的列表达式:
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, Functions.col("id").equal_to(Functions.col("parent_id")));
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, df.col("id").equal_to(df.col("parent_id")));
上面两个示例都失败了,并引发以下异常:
Exception in thread "main" com.snowflake.snowpark_java.SnowparkClientException:
Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
Instead, join this DataFrame to a clone() of itself.
请改用 ` clone `_ 方法创建 DataFrame 对象的克隆,并使用这两个 DataFrame 对象执行联接:
// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
DataFrame dfLhs = session.table("sample_product_data");
// Clone the DataFrame object to use as the right-hand side of the join.
DataFrame dfRhs = dfLhs.clone();
// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id").equal_to(dfRhs.col("parent_id")));
dfJoined.show();
如果要基于同一列执行自联接,请调用 join
方法,为 USING
子句传入列的名称(或列名称的数组):
// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, "key");
执行操作以计算 DataFrame¶
如前所述, DataFrame 是延迟计算的,也就是说,在您执行操作之前, SQL 语句不会发送到服务器执行。执行操作会导致对 DataFrame 进行计算,并将相应的 SQL 语句发送到服务器执行。
以下部分介绍如何同步和异步地对 DataFrame 执行操作:
同步执行操作¶
要同步执行操作,请调用以下操作方法之一:
同步执行操作的方法 |
描述 |
---|---|
|
|
|
计算 DataFrame,并返回 ` Row _ 对象的 :code:`Iterator。如果结果集很大,请使用此方法,以避免将所有结果同时加载到内存中。请参阅 返回行的迭代器。 |
|
计算 DataFrame 并返回行数。 |
|
计算 DataFrame,并将行打印到控制台。请注意,此方法将行数限制为 10 行(默认值)。请参阅 打印 DataFrame 中的行。 |
|
执行查询,创建临时表,并将结果放入表中。该方法返回一个 |
|
将 DataFrame 中的数据保存到指定表中。请参阅 将数据保存到表中。 |
|
将 DataFrame 中的数据复制到指定表中。请参阅 将数据从文件复制到表中。 |
|
删除指定表中的行。请参阅 更新、删除和合并表中的行。 |
|
更新指定表中的行。请参阅 更新、删除和合并表中的行。 |
|
将行合并到指定的表中。请参阅 更新、删除和合并表中的行。 |
例如,要执行查询并返回结果数,请调用 count
方法:
// Create a DataFrame for the "sample_product_data" table.
DataFrame dfProducts = session.table("sample_product_data");
// Send the query to the server for execution and
// print the count of rows in the table.
System.out.println("Rows returned: " + dfProducts.count());
还可以调用操作方法来:
注意:如果调用 schema
方法来获取 DataFrame 中列的定义,则无需调用操作方法。
异步执行操作¶
备注
此功能是在 Snowpark 0.11.0 中引入的。
要异步执行操作,请调用 async
方法以返回“异步执行者”对象(例如 DataFrameAsyncActor
),然后在该对象中调用异步操作方法。
异步执行者对象的这些操作方法会返回一个 TypedAsyncJob
对象,您可以使用该对象检查异步操作的状态和检索操作的结果。
接下来的部分将介绍如何异步执行操作和检查结果。
了解异步操作的基本流程¶
可以使用以下方法异步执行操作:
异步执行操作的方法 |
描述 |
---|---|
|
异步计算 DataFrame,以 ` Row _ 对象的 :code:`Array 的形式检索生成的数据集。请参阅 返回所有行。 |
|
异步计算 DataFrame,以检索 ` Row _ 对象的 :code:`Iterator。如果结果集很大,请使用此方法,以避免将所有结果同时加载到内存中。请参阅 返回行的迭代器。 |
|
异步计算 DataFrame 以检索行数。 |
|
将 DataFrame 中的数据异步保存到指定表中。请参阅 将数据保存到表中。 |
|
将 DataFrame 中的数据异步复制到指定的表中。请参阅 将数据从文件复制到表中。 |
|
异步删除指定表中的行。请参阅 更新、删除和合并表中的行。 |
|
异步更新指定表中的行。请参阅 更新、删除和合并表中的行。 |
从返回的 TypedAsyncJob 对象中,您可以执行以下操作:
要确定操作是否已完成,请调用
isDone
方法。要获取与操作对应的查询 ID,请调用
getQueryId
方法。要返回操作的结果(例如,对于
collect
方法为Row
对象的Array
;对于count
方法为行数),请调用getResult
方法。请注意,
getResult
是阻塞调用。要取消操作,请调用
cancel
方法。
例如,要异步执行查询,并以 Row
对象的 Array
的形式检索结果,请调用 async().collect()
:
import java.util.Arrays;
// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
DataFrame df = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
// Execute the query asynchronously.
// This call does not block.
TypedAsyncJob<Row[]> asyncJob = df.async().collect();
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
Row[] results = asyncJob.getResult();
System.out.println(Arrays.toString(results));
要异步执行查询并检索结果数,请调用 async().count()
:
// Create a DataFrame for the "sample_product_data" table.
DataFrame dfProducts = session.table("sample_product_data");
// Execute the query asynchronously.
// This call does not block.
TypedAsyncJob<Long> asyncJob = dfProducts.async().count();
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// Print the count of rows in the table.
// Note that getResult is a blocking call.
System.out.println("Rows returned: " + asyncJob.getResult());
指定等待的最大秒数¶
调用 getResult
方法时,可以使用 maxWaitTimeInSeconds
实参指定在尝试检索结果之前等待查询完成的最大秒数。例如:
// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
Row[] results = asyncJob.getResult(10);
如果省略此实参,该方法将等待 snowpark_request_timeout_in_seconds 配置属性指定的最大秒数。( :ref:` 创建 Session 对象 <label-snowpark_java_creating_session>` 时可以设置此属性。)
通过 ID 访问异步查询¶
如果有之前提交的异步查询的查询 ID,则可以调用 Session.createAsyncJob
方法创建一个 AsyncJob 对象,该对象可用于检查查询状态、检索查询结果或取消查询。
请注意,与 TypedAsyncJob
不同的是, AsyncJob
没有提供用于检索结果的 getResult
方法。如果需要检索结果,请改为调用 getRows
或 getIterator
方法。
例如:
import java.util.Arrays;
...
AsyncJob asyncJob = session.createAsyncJob(myQueryId);
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
Row[] rows = asyncJob.getRows();
System.out.println(Arrays.toString(rows));
将行检索到 DataFrame 之中¶
指定 DataFrame 应如何转换 来执行查询并返回结果。可以返回 Array
中的所有行,也可以返回一个 Iterator
,以便逐行遍历结果。在后一种情况下,如果数据量较大,则行将按块加载到内存中,以免将大量数据加载到内存中。
返回所有行¶
要同时返回所有行,请调用 collect 方法。此方法返回 ` Row _ 对象的数组。要从行中检索值,请调用 :samp:`get{Type} 方法(例如 getString
、getInt
等)。
例如:
Row[] rows = session.table("sample_product_data").select(Functions.col("name"), Functions.col("category_id")).sort(Functions.col("name")).collect();
for (Row row : rows) {
System.out.println("Name: " + row.getString(0) + "; Category ID: " + row.getInt(1));
}
返回行的迭代器¶
如果要使用 Iterator
遍历结果中的 ` Row _ 对象,请调用 `toLocalIterator。如果结果中的数据量很大,该方法将按块加载行,以免同时将所有行加载到内存中。
例如:
import java.util.Iterator;
Iterator<Row> rowIterator = session.table("sample_product_data").select(Functions.col("name"), Functions.col("category_id")).sort(Functions.col("name")).toLocalIterator();
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
System.out.println("Name: " + row.getString(0) + "; Category ID: " + row.getInt(1));
}
返回前 n
行¶
要返回前 n
行,请调用 first 方法,并传入要返回的行数。
如 限制 DataFrame 中的行数 中所述,结果是非确定性的。如果希望结果是确定性的,请对经过排序的 DataFrame 调用此方法 (df.sort().first()
)。
例如:
import java.util.Arrays;
...
DataFrame df = session.table("sample_product_data");
Row[] rows = df.sort(Functions.col("name")).first(5);
System.out.println(Arrays.toString(rows));
打印 DataFrame 中的行¶
要将 DataFrame 中的前 10 行打印到控制台,请调用 show 方法。要打印出其他数量的行,请传入要打印的行数。
如 限制 DataFrame 中的行数 中所述,结果是非确定性的。如果希望结果是确定性的,请对经过排序的 DataFrame 调用此方法 (df.sort().show()
)。
例如:
DataFrame df = session.table("sample_product_data");
df.sort(Functions.col("name")).show();
更新、删除和合并表中的行¶
备注
此功能是在 Snowpark 0.7.0 中引入的。
当您调用 Session.table
为表创建 DataFrame
对象时,该方法将返回一个 Updatable
对象,此对象通过用于更新和删除表中数据的其他方法扩展了 DataFrame
。(请参阅 Updatable。)
如果需要更新或删除表中的行,可以使用 Updatable
类的以下方法:
调用
update
或updateColumn
,以更新表中的现有行。请参阅 更新表中的行。调用
delete
以删除表中的行。请参阅 删除表中的行。调用
merge
,以根据另一个表或子查询中的数据在一个表中插入、更新和删除行。(这等效于 SQL 中的 MERGE 命令。)请参阅 将行合并到一个表中。
更新表中的行¶
要更新表中的行,请调用 update
或 updateColumn
方法,并传入 Map
(它关联要更新的列和要分配给这些列的相应值):
要将列名称指定为
Map
中的字符串,请调用updateColumn
。要在
Map
中指定Column
对象,请调用update
。
这两种方法都返回一个 UpdateResult
对象,其中包含已更新的行数。(请参阅 UpdateResult。)
备注
这两种方法都是 操作方法,这意味着调用方法会将 SQL 语句发送到服务器执行。
例如,要将名为 count
的列中的值替换为值 1
,并且要使用将列名称 (String
) 与相应值关联的 Map
,请调用 updateColumn
:
import java.util.HashMap;
import java.util.Map;
...
Map<String, Column> assignments = new HashMap<>();
assignments.put("3rd", Functions.lit(1));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.updateColumn(assignments);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
如果要在 Map
中使用 Column
对象来标识要更新的列,请调用 update
:
import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(1));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.update(assignments);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
如果仅在满足条件时才应进行更新,则可以将该条件指定为实参。例如,对于 category_id
列的值为 20
的行,要将名为 count
的列中的值替换为 2
,请运行以下语句:
import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(2));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.update(assignments, Functions.col("category_id").equal_to(Functions.lit(20)));
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
如果需要让条件基于与另一个 DataFrame
对象进行的联接,则可以将该 DataFrame
作为实参传入,并在条件中使用该 DataFrame
。例如,对于 category_id
列与 DataFrame
dfParts
中的 category_id
匹配的行,要将名为 count
的列中的值替换为 3
,请运行以下语句:
import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(3));
Updatable updatableDf = session.table("sample_product_data");
DataFrame dfParts = session.table("parts");
UpdateResult updateResult = updatableDf.update(assignments, updatableDf.col("category_id").equal_to(dfParts.col("category_id")), dfParts);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
删除表中的行¶
对于 delete
方法,可以指定一个条件来标识要删除的行,并且可以使该条件基于与另一个 DataFrame 进行的联接。delete
会返回一个 DeleteResult
对象,其中包含已删除的行数。(请参阅 DeleteResult。)
备注
delete
是 操作方法,这意味着调用方法会将 SQL 语句发送到服务器执行。
例如,要删除 category_id
列中的值为 1
的行,请运行以下语句:
Updatable updatableDf = session.table("sample_product_data");
DeleteResult deleteResult = updatableDf.delete(updatableDf.col("category_id").equal_to(Functions.lit(1)));
System.out.println("Number of rows deleted: " + deleteResult.getRowsDeleted());
如果条件引用另一个 DataFrame 中的列,则将该 DataFrame 作为第二个实参传入。例如,要删除 category_id
列与 DataFrame
dfParts
中的 category_id
匹配的行,请将 dfParts
作为第二个实参传入:
Updatable updatableDf = session.table("sample_product_data");
DeleteResult deleteResult = updatableDf.delete(updatableDf.col("category_id").equal_to(dfParts.col("category_id")), dfParts);
System.out.println("Number of rows deleted: " + deleteResult.getRowsDeleted());
将行合并到一个表中¶
要根据另一个表或子查询中的值在一个表中插入、更新和删除行(等效于 SQL 中的 MERGE 命令),请执行以下步骤:
在要将数据合并到的表的
Updatable
对象中,调用merge
方法,并传入另一个表的DataFrame
对象和联接条件的列表达式。这将返回一个
MergeBuilder
对象,您可以使用该对象指定要对匹配的行和不匹配的行执行的操作(例如插入、更新或删除)。(请参阅 MergeBuilder。)使用
MergeBuilder
对象:要指定应对匹配的行执行的更新或删除,请调用
whenMatched
方法。如果需要指定有关何时应更新或删除行的附加条件,则可以传入该条件的列表达式。
此方法返回一个
MatchedClauseBuilder
对象,可用于指定要执行的操作。(请参阅 MatchedClauseBuilder。)调用
MatchedClauseBuilder
对象中的update
或delete
方法,以指定应对匹配的行执行的更新或删除操作。这些方法返回一个MergeBuilder
对象,可用于指定其他子句。要指定应在行不匹配时执行的插入,请调用
whenNotMatched
方法。如果需要指定有关何时应插入行的附加条件,则可以传入该条件的列表达式。
此方法返回一个
NotMatchedClauseBuilder
对象,可用于指定要执行的操作。(请参阅 NotMatchedClauseBuilder。)调用
NotMatchedClauseBuilder
对象中的insert
方法,以指定应在行不匹配时执行的插入操作。这些方法返回一个MergeBuilder
对象,可用于指定其他子句。
指定了应执行的插入、更新和删除后,调用
MergeBuilder
对象的collect
方法,以对表执行指定的插入、更新和删除。collect
会返回一个MergeResult
对象,其中包含已插入、更新和删除的行数。(请参阅 MergeResult。)
以下示例将 source
表中带有 id
和 value
列的行插入到 target
表中(如果 target
表未包含具有匹配 ID 的行):
MergeResult mergeResult = target.merge(source, target.col("id").equal_to(source.col("id")))
.whenNotMatched().insert([source.col("id"), source.col("value")])
.collect();
以下示例使用 source
表中具有同一 ID 的行的 value
列值更新 target
表中的行:
import java.util.HashMap;
import java.util.Map;
...
Map<String, Column> assignments = new HashMap<>();
assignments.put("value", source.col("value"));
MergeResult mergeResult = target.merge(source, target.col("id").equal_to(source.col("id")))
.whenMatched().update(assignments)
.collect();
将数据保存到表中¶
可以将 DataFrame 的内容保存到新表或现有表中。为此,您必须具有以下权限:
对架构的 CREATE TABLE 权限(如果表不存在)。
对表的 INSERT 权限。
要将 DataFrame 的内容保存到表中,请执行以下操作:
调用 DataFrame 的 write 方法,以获取 DataFrameWriter 对象。
调用
DataFrameWriter
对象的 ` mode `_ 方法,并传入一个 ` SaveMode `_ 对象,以指定有关写入表的首选项:要插入行,请传入
SaveMode.Append
。要覆盖现有表,请传入
SaveMode.Overwrite
。
此方法返回使用指定模式进行了配置的相同
DataFrameWriter
对象。如果要在现有表中插入行 (
SaveMode.Append
),并且 DataFrame 中的列名称与表中的列名称匹配,请调用 ` DataFrameWriter.option _,并传入 :code:"columnOrder"` 和"name"
作为实参。备注
此方法是在 Snowpark 1.4.0 中引入的。
默认情况下,
columnOrder
选项设置为"index"
,这意味着DataFrameWriter
按照列的出现顺序插入值。例如,DataFrameWriter
在表中第一列内插入 DataFrame 中第一列的值,在表中第二列内插入 DataFrame 中第二列的值,依此类推。此方法返回使用指定选项进行了配置的同一个
DataFrameWriter
对象。调用
DataFrameWriter
对象的 ` saveAsTable `_ 方法,以将 DataFrame 的内容保存到指定的表中。无需调用单独的方法(例如
collect
)来执行将数据保存到表中的 SQL 语句。saveAsTable
是执行 SQL 语句的 操作方法。
以下示例使用 DataFrame df
的内容覆盖现有表(由 tableName
变量标识):
df.write().mode(SaveMode.Overwrite).saveAsTable(tableName);
以下示例将 DataFrame df
中的行插入到现有表(由 tableName
变量标识)。在此示例中,表和 DataFrame 都包含列 c1
和 c2
。
该示例展示了这两种做法的区别:将 columnOrder
选项设置为 "name"
(这会将值插入到与 DataFrame 列同名的表列中),以及使用默认的 columnOrder
选项(这会根据 DataFrame 中列的顺序将值插入到表列中)。
DataFrame df = session.sql("SELECT 1 AS c2, 2 as c1");
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write().mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName);
// With the default value of the columnOrder option ("index"), the DataFrameWriter uses the column positions
// and inserts a row with the values (1, 2).
df.write().mode(SaveMode.Append).saveAsTable(tableName);
基于 DataFrame 创建视图¶
要基于 DataFrame 创建视图,请调用 createOrReplaceView 方法:
df.createOrReplaceView("db.schema.viewName");
请注意,调用 createOrReplaceView
会立即创建新视图。更重要的是,它不会导致对 DataFrame 进行计算。(在您 :ref:` 执行操作 <label-snowpark_java_dataframe_action_method>` 之前,不会对 DataFrame 本身进行计算。)
通过调用 createOrReplaceView
创建的视图是持久保留的。如果不再需要该视图,可以 手动删除视图。
如果只需为会话创建临时视图,请改为调用 createOrReplaceTempView 方法:
df.createOrReplaceTempView("db.schema.viewName");
缓存 DataFrame¶
在某些情况下,可能需要执行复杂的查询,并将结果保留下来,以供后续操作使用(而不必再次执行相同的查询)。在此类情况下,可以调用 cacheResult 方法来缓存 DataFrame 的内容。
此方法会:
运行查询。
在调用
cacheResult
之前,无需 调用单独的操作方法来检索结果。cacheResult
是执行查询的操作方法。将结果保存在临时表中
由于
cacheResult
会创建临时表,因此,您必须对正在使用的架构具有 CREATE TABLE 权限。返回一个 HasCachedResult 对象,该对象提供对临时表中结果的访问权。
由于
HasCachedResult
扩展了DataFrame
,因此,可以对此缓存数据执行一些同样可以对 DataFrame 执行的操作。
备注
由于 cacheResult
执行查询并将结果保存到表中,因此该方法可能会导致计算和存储成本增加。
例如:
// Set up a DataFrame to query a table.
DataFrame df = session.table("sample_product_data").filter(Functions.col("category_id").gt(Functions.lit(10)));
// Retrieve the results and cache the data.
HasCachedResult cachedDf = df.cacheResult();
// Create a DataFrame containing a subset of the cached data.
DataFrame dfSubset = cachedDf.filter(Functions.col("category_id").equal_to(Functions.lit(20))).select(Functions.col("name"), Functions.col("category_id"));
dfSubset.show();
请注意,调用该方法时,原始的 DataFrame 不受影响。例如,假设 dfTable
是 sample_product_data
表的 DataFrame :
HasCachedResult dfTempTable = dfTable.cacheResult();
调用 cacheResult
后, dfTable
仍指向 sample_product_data
表,而且您可以继续使用 dfTable
来查询和更新该表。
要使用临时表中的缓存数据,请使用 dfTempTable
(即 cacheResult
返回的 HasCachedResult
对象)。
处理暂存区中的文件¶
Snowpark 库提供了一些类和方法,可让您通过使用暂存区中的文件 将数据加载到 Snowflake 中,以及 从 Snowflake 卸载数据。
备注
为了在暂存区上使用这些类和方法,您必须具有 使用暂存区所需的权限。
接下来的部分将说明如何使用这些类和方法:
在暂存区中上传和下载文件¶
要在暂存区中上传和下载文件,请使用 ` FileOperation _ 对象的 :code:`put 和 get
方法:
将文件上传到暂存区¶
要将文件上传到暂存区,请执行以下步骤:
验证您是否具有 将文件上传到暂存区的权限。
调用
FileOperation
对象的 ` put `_ 方法将文件上传到暂存区。此方法执行 SQL PUT 命令。
要为 PUT 命令指定任何 可选参数,请创建参数和值的
Map
,然后传入该Map
作为options
实参。例如:import java.util.HashMap; import java.util.Map; ... // Upload a file to a stage without compressing the file. Map<String, String> putOptions = new HashMap<>(); putOptions.put("AUTO_COMPRESS", "FALSE"); PutResult[] putResults = session.file().put("file:///tmp/myfile.csv", "@myStage", putOptions);
在
localFileName
实参中,可以使用通配符(*
和?
)来标识要上传的一组文件。例如:// Upload the CSV files in /tmp with names that start with "file". // You can use the wildcard characters "*" and "?" to match multiple files. PutResult[] putResults = session.file().put("file:///tmp/file*.csv", "@myStage/prefix2")
检查
put
方法返回的 ` PutResult _ 对象的 :code:`Array,以确定文件是否上传成功。例如,要打印该文件的文件名和 PUT 操作的状态,请运行以下语句:// Print the filename and the status of the PUT operation. for (PutResult result : putResults) { System.out.println(result.getSourceFileName() + ": " + result.getStatus()); }
从暂存区下载文件¶
要从暂存区下载文件,请执行以下步骤:
验证您是否具有 从暂存区下载文件的权限。
调用
FileOperation
对象的 ` get `_ 方法,以从暂存区下载文件。此方法执行 SQL GET 命令。
要为 GET 命令指定任何 可选参数,请创建参数和值的
Map
,然后传入该Map
作为options
实参。例如:import java.util.HashMap; import java.util.Map; ... // Upload a file to a stage without compressing the file. // Download files with names that match a regular expression pattern. Map<String, String> getOptions = new HashMap<>(); getOptions.put("PATTERN", "'.*file_.*.csv.gz'"); GetResult[] getResults = session.file().get("@myStage", "file:///tmp", getOptions);
检查
get
方法返回的 ` GetResult _ 对象的 :code:`Array,以确定文件是否下载成功。例如,要打印该文件的文件名和 GET 操作的状态,请运行以下语句:// Print the filename and the status of the GET operation. for (GetResult result : getResults) { System.out.println(result.getFileName() + ": " + result.getStatus()); }
使用输入流在暂存区中上传和下载数据¶
备注
此功能是在 Snowpark 1.4.0 中引入的。
要使用输入流将数据上传到暂存区上的文件,以及从暂存区上的文件下载数据,请使用 ` FileOperation _ 对象的 :code:`uploadStream 和 downloadStream
方法:
使用输入流将数据上传到暂存区上的文件¶
要将数据从 ` java.io.InputStream `_ 对象上传到暂存区上的文件,请执行以下步骤:
验证您是否具有 将文件上传到暂存区的权限。
调用
FileOperation
对象的 ` uploadStream `_ 方法。传入应写入数据的暂存区上文件的完整路径和
InputStream
对象。此外,使用compress
实参,指定在上传数据之前是否应压缩数据。
例如:
import java.io.InputStream;
...
boolean compressData = true;
String pathToFileOnStage = "@myStage/path/file";
session.file().uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData);
使用输入流从暂存区上的文件下载数据¶
要将数据从暂存区上的文件下载到 ` java.io.InputStream `_ 对象,请执行以下操作:
验证您是否具有 从暂存区下载文件的权限。
调用
FileOperation
对象的 ` downloadStream `_ 方法。传入包含要下载的数据的暂存区文件的完整路径。使用
decompress
实参,指定是否要压缩文件中的数据。
例如:
import java.io.InputStream;
...
boolean isDataCompressed = true;
String pathToFileOnStage = "@myStage/path/file";
InputStream is = session.file().downloadStream(pathToFileOnStage, isDataCompressed);
为暂存区中的文件设置 DataFrame¶
本部分介绍如何为 Snowflake 暂存区中的文件设置 DataFrame。创建此 DataFrame 后,您可以使用 DataFrame 来执行以下操作:
要为 Snowflake 暂存区中的文件设置 DataFrame,请使用 DataFrameReader
类:
验证您是否具有以下权限:
以下其中一项:
架构的 CREATE TABLE 权限(如果您计划指定用于确定如何从暂存文件复制数据的 复制选项 )。
否则确保您具备架构的 CREATE FILE FORMAT 权限。
调用
Session
类中的read
方法,以访问DataFrameReader
对象。如果文件采用 CSV 格式,请描述文件中的字段。要这样做,请执行以下操作:
创建一个 ` StructType `_ 对象,该对象包含描述文件中字段的 ` StructField `_ 对象的数组。
对于每个
StructField
对象,请指定以下内容:字段的名称。
字段的数据类型(指定为
com.snowflake.snowpark_java.types
包中的对象)。字段是否可为 null。
例如:
import com.snowflake.snowpark_java.types.*; ... StructType schemaForDataFile = StructType.create( new StructField("id", DataTypes.StringType, true), new StructField("name", DataTypes.StringType, true));
调用
DataFrameReader
对象中的schema
方法,传入StructType
对象。例如:
DataFrameReader dfReader = session.read().schema(schemaForDataFile);
schema
方法会返回一个DataFrameReader
对象,该对象配置为读取包含指定字段的文件。请注意,对于其他格式(如 JSON)的文件,无需执行此操作。对于这些文件,
DataFrameReader
会将数据视为字段名称为$1
、类型为 VARIANT 的单个字段处理。
如果需要指定有关数据读取方式的其他信息(例如,数据经过压缩,或者 CSV 文件使用分号而非逗号来分隔字段),请调用 ` DataFrameReader.option `_ 或 ` DataFrameReader.options `_ 方法。
传入要设置的选项的名称和值。可以设置以下类型的选项:
有关 CREATE FILE FORMAT 的文档。
COPY INTO TABLE 文档。
请注意,当您 :ref:` 将数据检索到 DataFrame <label-snowpark_java_dataframe_action_method>` 中时,设置复制选项可能造成费用更昂贵的执行策略。
下面的示例设置了
DataFrameReader
对象,以查询未压缩且使用分号作为字段分隔符的 CSV 文件中的数据。dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE");
option
方法返回使用指定选项进行了配置的DataFrameReader
对象。若要设置多个选项,可以 链接多个调用 到
option
方法(如上面的示例所示),或是调用 ` DataFrameReader.options _ 方法,并传入选项名称和值的 :code:`Map。调用与文件格式相对应的方法。您可以调用以下方法之一:
` DataFrameReader.avro `_
` DataFrameReader.csv `_
` DataFrameReader.json `_
` DataFrameReader.orc `_
` DataFrameReader.parquet `_
` DataFrameReader.xml `_
调用这些方法时,传入要读取的文件的暂存区位置。例如:
DataFrame df = dfReader.csv("@mystage/myfile.csv");
若要指定以相同前缀开头的多个文件,请在暂存区名称后指定前缀。例如,要从暂存区
@mystage
加载具有前缀csv_
的文件,请执行以下操作:DataFrame df = dfReader.csv("@mystage/csv_");
与文件格式相对应的方法会返回文件的一个 CopyableDataFrame 对象。
CopyableDataFrame
扩展了DataFrame
,并提供用于处理暂存文件中的数据的其他方法。调用操作方法,以执行以下操作:
与表的 DataFrames 一样,在您调用 :ref:` 操作方法 <label-snowpark_java_dataframe_action_method>` 之前,不会将数据检索到 DataFrame 中。
将数据从文件加载到 DataFrame 中¶
在 为暂存区中的文件设置 DataFrame 之后,可以将文件中的数据加载到 DataFrame 中:
使用 DataFrame 对象方法,对 :ref:` 数据集执行所需的任何转换 <label-snowpark_java_dataframe_transform>` (例如,选择特定字段、筛选行等)。
例如,要从名为
mystage
的暂存区中名为data.json
的 JSON 文件中提取color
元素,请运行以下语句:DataFrame df = session.read().json("@mystage/data.json").select(Functions.col("$1").subField("color"));
如前所述,对于格式并非 CSV 的文件(例如 JSON 格式的文件),
DataFrameReader
会将文件中的数据视为名为$1
的单个 VARIANT 列。调用
DataFrame.collect
方法以加载数据。例如:Row[] results = df.collect();
将数据从文件复制到表中¶
为暂存区中的文件设置 DataFrame 之后,可以调用 copyInto 方法将数据复制到表中。此方法执行 COPY INTO <table> 命令。
备注
在调用 copyInto
之前,无需调用 collect
方法。在调用 copyInto
之前,文件中的数据不需要在 DataFrame 之中。
例如,以下代码将 myFileStage
指定的 CSV 文件中的数据加载到 mytable
表中。由于数据位于 CSV 文件中,代码还必须 描述文件中的字段。为此,该示例调用 DataFrameReader
对象的 ` schema _ 方法,并传入 ` StructType `_ 对象 (``schemaForDataFile`),其中包含了描述字段的 ` StructField `_ 对象的数组。
CopyableDataFrame copyableDf = session.read().schema(schemaForDataFile).csv("@mystage/myfile.csv");
copyableDf.copyInto("mytable");
将 DataFrame 保存到暂存区上的文件¶
备注
此功能是在 Snowpark 1.5.0 中引入的。
如果需要将 DataFrame 保存到暂存区上的文件中,可以调用与文件格式对应的 DataFrameWriter 方法(例如 csv
方法用于写入 CSV 文件),并传入应将文件保存到的暂存区位置。这些 DataFrameWriter
方法执行 COPY INTO <location> 命令。
备注
在调用这些 DataFrameWriter
方法之前,无需调用 collect
方法。在调用这些方法之前,文件中的数据不需要在 DataFrame 之中。
要将 DataFrame 的内容保存到暂存区上的文件中,请执行以下操作:
调用 DataFrame 对象的 ` write _ 方法,以获取 ` DataFrameWriter `_ 对象。例如,要获取代表名为 ``sample_product_data` 的表的 DataFrame 的
DataFrameWriter
对象,请运行以下语句:DataFrameWriter dfWriter = session.table("sample_product_data").write();
如果要覆盖文件的内容(如果文件存在),请调用
DataFrameWriter
对象的 ` mode _ 方法,并传入 :code:`SaveMode.Overwrite。否则,默认情况下,如果暂存区上的指定文件已存在,
DataFrameWriter
会报告错误。mode
方法返回使用指定模式进行了配置的相同DataFrameWriter
对象。例如,要指定
DataFrameWriter
应覆盖暂存区上的文件,请运行以下语句:dfWriter = dfWriter.mode(SaveMode.Overwrite);
如果需要指定有关应如何保存数据的其他信息(例如,应压缩数据,或者要使用分号来分隔 CSV 文件中的字段),请调用 ` DataFrameWriter.option `_ 方法或 ` DataFrameWriter.options `_ 方法。
传入要设置的选项的名称和值。可以设置以下类型的选项:
有关 COPY INTO <location> 的文档。
有关 COPY INTO <location> 的文档中介绍的 复制选项。
请注意,不能使用
option
方法设置以下选项:TYPE 格式类型选项。
OVERWRITE 复制选项。若要设置此选项,请改为调用
mode
方法(如上一步所述)。
下面的示例设置
DataFrameWriter
对象,以使用分号(而非逗号)作为字段分隔符,以未压缩的形式将数据保存到 CSV 文件中。dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE");
option
方法返回使用指定选项进行了配置的DataFrameWriter
对象。要设置多个选项,可以如上例所示 :ref:` 链接多个调用 <label-snowpark_java_dataframe_chaining_method_calls>` (调用的是
option
方法),或者调用 ` DataFrameWriter.options _ 方法,并传入选项名称和值的 :code:`Map。若要返回有关已保存的各文件的详细信息,请将
DETAILED_OUTPUT
复制选项 设置为TRUE
。默认情况下,
DETAILED_OUTPUT
为FALSE
,这代表该方法会返回一行输出,其中包含"rows_unloaded"
、"input_bytes"
和"output_bytes"
字段。将
DETAILED_OUTPUT
设置为TRUE
时,该方法会为所保存的每个文件返回一行输出。每行都包含FILE_NAME
、FILE_SIZE
和ROW_COUNT
字段。调用文件格式对应的方法,将数据保存到文件中。您可以调用以下方法之一:
调用这些方法时,传入应写入数据的文件的暂存区位置(如
@mystage
)。默认情况下,该方法会将数据保存到名称带有前缀
data_
的文件中(例如@mystage/data_0_0_0.csv
)。如果希望使用不同的前缀命名文件,请在暂存区名称后指定前缀。例如:WriteFileResult writeFileResult = dfWriter.csv("@mystage/saved_data");
本示例将 DataFrame 的内容保存到名称以
saved_data
为前缀(例如@mystage/saved_data_0_0_0.csv
)的文件中。检查返回的 WriteFileResult 对象,以获取有关写入文件的数据量的信息。
从
WriteFileResult
对象,您可以访问 COPY INTO <location> 命令生成的输出:要以 Row 对象数组的形式访问输出行,请调用
getRows
方法。要确定行中有哪些字段,请调用
getSchema
方法,该方法将返回描述行中字段的 StructType。
例如,要在输出行中打印出字段名称和值,请运行以下语句:
WriteFileResult writeFileResult = dfWriter.csv("@mystage/saved_data"); Row[] rows = writeFileResult.getRows(); StructType schema = writeFileResult.getSchema(); for (int i = 0 ; i < rows.length ; i++) { System.out.println("Row:" + i); Row row = rows[i]; for (int j = 0; j < schema.size(); j++) { System.out.println(schema.get(j).name() + ": " + row.get(j)); } }
以下示例使用 DataFrame 将名为 car_sales
的表的内容保存到 JSON 文件,这些文件位于 @mystage
暂存区中,名称带有 saved_data
前缀(例如 @mystage/saved_data_0_0_0.json
)。示例代码:
覆盖文件(如果暂存区中已存在该文件)。
返回有关保存操作的详细输出。
保存未压缩的数据。
最后,示例代码在返回的输出行中打印出每个字段和值:
DataFrame df = session.table("car_sales");
WriteFileResult writeFileResult = df.write().mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data");
Row[] rows = writeFileResult.getRows();
StructType schema = writeFileResult.getSchema();
for (int i = 0 ; i < rows.length ; i++) {
System.out.println("Row:" + i);
Row row = rows[i];
for (int j = 0; j < schema.size(); j++) {
System.out.println(schema.get(j).name() + ": " + row.get(j));
}
}
使用半结构化数据¶
使用 DataFrame,您可以查询和访问 :doc:` 半结构化数据 </user-guide/semistructured-intro>` (例如 JSON 数据)。接下来的几个部分将介绍如何在 DataFrame 中处理半结构化数据。
备注
这些部分中的示例使用 示例中使用的示例数据 中的示例数据。
遍历半结构化数据¶
若要引用半结构化数据中的特定字段或元素,请使用 ` Column `_ 对象的以下方法:
使用 subField("<field_name>") 返回 OBJECT(或包含 OBJECT 的 VARIANT)中字段的
Column
对象。使用 subField(<index>) 返回 ARRAY(或包含 ARRAY 的 VARIANT )中元素的
Column
对象。
备注
如果路径中的字段名称或元素不规则,并且导致难以使用 Column.apply
方法,则可以改用 ` Functions.get _、 Functions.get_ignore_case `_ 或 ` Functions.get_path `_。
例如,以下代码会从 :ref:` 示例数据 <label-sample_data_semistructured_data>` 的 src
列的对象内,选择 dealership
字段:
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("dealership")).show();
该代码会打印以下输出:
----------------------------
|"""SRC""['DEALERSHIP']" |
----------------------------
|"Valley View Auto Sales" |
|"Tindel Toyota" |
----------------------------
备注
DataFrame 中的值放在双引号之间,因为这些值以字符串字面量的形式返回。若要将这些值的类型转换为特定类型,请参阅 显式转换半结构化数据中的值。
还可以 链接多个方法调用,以遍历特定字段或元素的路径。
例如,以下代码选择 salesperson
对象中的 name
字段:
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("salesperson").subField("name")).show();
该代码会打印以下输出:
------------------------------------
|"""SRC""['SALESPERSON']['NAME']" |
------------------------------------
|"Frank Beasley" |
|"Greg Northrup" |
------------------------------------
再举一个例子,下面的代码选择 vehicle
字段的第一个元素,其中包含车辆数组。该示例还会选择第一个元素中的 price
字段。
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("vehicle").subField(0)).show();
df.select(Functions.col("src").subField("vehicle").subField(0).subField("price")).show();
该代码会打印以下输出:
---------------------------
|"""SRC""['VEHICLE'][0]" |
---------------------------
|{ |
| "extras": [ |
| "ext warranty", |
| "paint protection" |
| ], |
| "make": "Honda", |
| "model": "Civic", |
| "price": "20275", |
| "year": "2017" |
|} |
|{ |
| "extras": [ |
| "ext warranty", |
| "rust proofing", |
| "fabric protection" |
| ], |
| "make": "Toyota", |
| "model": "Camry", |
| "price": "23500", |
| "year": "2017" |
|} |
---------------------------
------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']" |
------------------------------------
|"20275" |
|"23500" |
------------------------------------
如果路径中的字段名称或元素不规则,并且导致难以使用 Column.subField
方法,则可以使用 ` Functions.get _、 Functions.get_ignore_case _ 或 ` Functions.get_path `_ 函数来代替 :code:`apply 方法。
例如,以下代码行都会打印对象中指定字段的值:
df.select(Functions.get(Functions.col("src"), Functions.lit("dealership"))).show();
df.select(Functions.col("src").subField("dealership")).show();
同样,以下代码行都会打印对象中指定路径处的字段值:
df.select(Functions.get_path(Functions.col("src"), Functions.lit("vehicle[0].make"))).show();
df.select(Functions.col("src").subField("vehicle").subField(0).subField("make")).show();
显式转换半结构化数据中的值¶
默认情况下,字段和元素的值以字符串字面量(包括双引号)的形式返回,如上面的示例所示。
若要避免意外结果,请调用 cast 方法,将值转换为特定类型。例如,以下代码会打印出未经类型转换和经过类型转换的值:
// Import the objects for the data types, including StringType.
import com.snowflake.snowpark_java.types.*;
...
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("salesperson").subField("id")).show();
df.select(Functions.col("src").subField("salesperson").subField("id").cast(DataTypes.StringType)).show();
该代码会打印以下输出:
----------------------------------
|"""SRC""['SALESPERSON']['ID']" |
----------------------------------
|"55" |
|"274" |
----------------------------------
---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)" |
---------------------------------------------------
|55 |
|274 |
---------------------------------------------------
将对象数组展平为行¶
如果需要将半结构化数据“展平”为 DataFrame(例如,为数组中的每个对象生成一行),请调用 flatten 方法。此方法与 FLATTEN SQL 函数等效。如果传入对象或数组的路径,该方法会返回一个 DataFrame,其中包含对象或数组中各字段或元素的行。
例如,在 :ref:` 示例数据 <label-sample_data_semistructured_data>` 中, src:customer
是一个包含有关客户的信息的对象数组。每个对象都包含 name
和 address
字段。
如果将此路径传递给 flatten
函数:
DataFrame df = session.table("car_sales");
df.flatten(Functions.col("src").subField("customer")).show();
该方法返回一个 DataFrame:
----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC" |"SEQ" |"KEY" |"PATH" |"INDEX" |"VALUE" |"THIS" |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{ |1 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "San Francisco, CA", | { |
| { | | | | | "name": "Joyce Ridgely", | "address": "San Francisco, CA", |
| "address": "San Francisco, CA", | | | | | "phone": "16504378889" | "name": "Joyce Ridgely", |
| "name": "Joyce Ridgely", | | | | |} | "phone": "16504378889" |
| "phone": "16504378889" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Valley View Auto Sales", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "55", | | | | | | |
| "name": "Frank Beasley" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "paint protection" | | | | | | |
| ], | | | | | | |
| "make": "Honda", | | | | | | |
| "model": "Civic", | | | | | | |
| "price": "20275", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
|{ |2 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "New York, NY", | { |
| { | | | | | "name": "Bradley Greenbloom", | "address": "New York, NY", |
| "address": "New York, NY", | | | | | "phone": "12127593751" | "name": "Bradley Greenbloom", |
| "name": "Bradley Greenbloom", | | | | |} | "phone": "12127593751" |
| "phone": "12127593751" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Tindel Toyota", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "274", | | | | | | |
| "name": "Greg Northrup" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "rust proofing", | | | | | | |
| "fabric protection" | | | | | | |
| ], | | | | | | |
| "make": "Toyota", | | | | | | |
| "model": "Camry", | | | | | | |
| "price": "23500", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
----------------------------------------------------------------------------------------------------------------------------------------------------------
从 DataFrame 中,您可以从 VALUE
字段中的每个对象选择 name
和 address
字段:
df.flatten(Functions.col("src").subField("customer")).select(Functions.col("value").subField("name"), Functions.col("value").subField("address")).show();
-------------------------------------------------
|"""VALUE""['NAME']" |"""VALUE""['ADDRESS']" |
-------------------------------------------------
|"Joyce Ridgely" |"San Francisco, CA" |
|"Bradley Greenbloom" |"New York, NY" |
-------------------------------------------------
以下代码 将值类型转换为特定类型 并更改列的名称,从而补充了上一个示例:
df.flatten(Functions.col("src").subField("customer")).select(Functions.col("value").subField("name").cast(DataTypes.StringType).as("Customer Name"), Functions.col("value").subField("address").cast(DataTypes.StringType).as("Customer Address")).show();
-------------------------------------------
|"Customer Name" |"Customer Address" |
-------------------------------------------
|Joyce Ridgely |San Francisco, CA |
|Bradley Greenbloom |New York, NY |
-------------------------------------------
执行 SQL 语句¶
若要执行您指定的 SQL 语句,请调用 Session
类中的 sql
方法,然后传入要执行的语句。该方法返回一个 DataFrame。
请注意,在您 :ref:` 调用操作方法 <label-snowpark_java_dataframe_action_method>` 之前, SQL 语句不会执行。
import java.util.Arrays;
// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
DataFrame dfStageFiles = session.sql("ls @myStage");
Row[] files = dfStageFiles.collect();
System.out.println(Arrays.toString(files));
// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect();
DataFrame tableDf = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
// Get the count of rows from the table.
long numRows = tableDf.count();
System.out.println("Count: " + numRows);
如果要 :ref:` 调用方法来转换 DataFrame <label-snowpark_java_dataframe_transform>` (例如 filter、select 等),请注意,仅当基础 SQL 语句是 SELECT 语句时,这些方法才有效。其他类型的 SQL 语句不支持转换方法。
import java.util.Arrays;
DataFrame df = session.sql("select id, category_id, name from sample_product_data where id > 10");
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
Row[] results = df.filter(Functions.col("category_id").lt(Functions.lit(10))).select(Functions.col("id")).collect();
System.out.println(Arrays.toString(results));
// In this example, the underlying SQL statement is not a SELECT statement.
DataFrame dfStageFiles = session.sql("ls @myStage");
// Calling the filter method results in an error.
dfStageFiles.filter(...);