在 Snowpark Java 中使用 DataFrames

在 Snowpark 中,主要通过 DataFrame 来查询和处理数据。本主题说明如何使用 DataFrames。

本主题内容:

要检索和操作数据,需要使用 ` DataFrame `_ 类。DataFrame 表示延迟评估的关系型数据集,延迟评估是指仅在触发特定操作时执行。从某种意义上说, DataFrame 就像一个需要评估才能检索数据的查询。

要将数据检索到 DataFrame 之中,请执行以下步骤:

  1. 构造 DataFrame,为数据集指定数据源

    例如,您可以创建一个 DataFrame,以保存来自表和外部 CSV 文件的数据或执行 SQL 语句时产生的数据。

  2. 指定 DataFrame 中的数据集应如何转换

    例如,可以指定应该选择哪些列、如何筛选行、如何对结果进行排序和分组等。

  3. 执行语句以将数据检索到 DataFrame 之中

    为了将数据检索到 DataFrame 之中,必须调用执行操作的方法(例如 collect() 方法)。

接下来的部分将更详细地介绍这些步骤。

设置本部分的示例

本部分的一些示例使用 DataFrame 查询名为 sample_product_data 的表。若要运行这些示例,可以执行以下 SQL 语句,以创建此表并用一些数据来填充此表:

CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT, amount NUMBER(12, 2), quantity INT, product_date DATE);
INSERT INTO sample_product_data VALUES
    (1, 0, 5, 'Product 1', 'prod-1', 1, 10, 1.00, 15, TO_DATE('2021.01.01', 'YYYY.MM.DD')),
    (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20, 2.00, 30, TO_DATE('2021.02.01', 'YYYY.MM.DD')),
    (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30, 3.00, 45, TO_DATE('2021.03.01', 'YYYY.MM.DD')),
    (4, 0, 10, 'Product 2', 'prod-2', 2, 40, 4.00, 60, TO_DATE('2021.04.01', 'YYYY.MM.DD')),
    (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50, 5.00, 75, TO_DATE('2021.05.01', 'YYYY.MM.DD')),
    (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60, 6.00, 90, TO_DATE('2021.06.01', 'YYYY.MM.DD')),
    (7, 0, 20, 'Product 3', 'prod-3', 3, 70, 7.00, 105, TO_DATE('2021.07.01', 'YYYY.MM.DD')),
    (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80, 7.25, 120, TO_DATE('2021.08.01', 'YYYY.MM.DD')),
    (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90, 7.50, 135, TO_DATE('2021.09.01', 'YYYY.MM.DD')),
    (10, 0, 50, 'Product 4', 'prod-4', 4, 100, 7.75, 150, TO_DATE('2021.10.01', 'YYYY.MM.DD')),
    (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100, 8.00, 165, TO_DATE('2021.11.01', 'YYYY.MM.DD')),
    (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100, 8.50, 180, TO_DATE('2021.12.01', 'YYYY.MM.DD'));
Copy

要验证表是否已创建,请运行:

SELECT * FROM sample_product_data;
Copy

构造 DataFrame

要构造 DataFrame,可以使用 Session 类中的方法。以下每种方法都基于不同类型的数据源构造 DataFrame :

  • 要基于表、视图或流中的数据创建 DataFrame,请调用 table 方法:

    // Create a DataFrame from the data in the "sample_product_data" table.
    DataFrame dfTable = session.table("sample_product_data");
    
    // Print out the first 10 rows.
    dfTable.show();
    
    Copy

    备注

    table 方法返回一个 Updatable 对象。Updatable 扩展了 DataFrame,并提供了用于处理表中数据的其他方法(例如,用于更新和删除数据的方法)。请参阅 更新、删除和合并表中的行

  • 要基于指定值创建 DataFrame,请执行以下步骤:

    1. 构造一个由包含值的 Row 对象组成的数组。

    2. 构造一个 StructType 对象,以描述这些值的数据类型。

    3. 调用 createDataFrame 方法,并传入数组和 StructType 对象。

     // Import name from the types package, which contains StructType and StructField.
    import com.snowflake.snowpark_java.types.*;
    ...
    
     // Create a DataFrame containing specified values.
     Row[] data = {Row.create(1, "a"), Row.create(2, "b")};
     StructType schema =
       StructType.create(
         new StructField("num", DataTypes.IntegerType),
         new StructField("str", DataTypes.StringType));
     DataFrame df = session.createDataFrame(data, schema);
    
     // Print the contents of the DataFrame.
     df.show();
    
    Copy

    备注

    构造 DataFrame 时,Snowflake 的保留字不能用作有效的列名。有关保留字的列表,请参阅 保留和受限关键字

  • 要创建包含特定值范围的 DataFrame,请调用 range 方法:

    // Create a DataFrame from a range
    DataFrame dfRange = session.range(1, 10, 2);
    
    // Print the contents of the DataFrame.
    dfRange.show();
    
    Copy
  • 为暂存区中的文件创建 DataFrame,请调用 read 来获取 DataFrameReader 对象。在 DataFrameReader 对象中,调用与文件中的数据格式对应的方法:

    // Create a DataFrame from data in a stage.
    DataFrame dfJson = session.read().json("@mystage2/data1.json");
    
    // Print the contents of the DataFrame.
    dfJson.show();
    
    Copy
  • 要创建 DataFrame 来保存 SQL 查询的结果,请调用 sql 方法:

    // Create a DataFrame from a SQL query
    DataFrame dfSql = session.sql("SELECT name from sample_product_data");
    
    // Print the contents of the DataFrame.
    dfSql.show();
    
    Copy

    注意:虽然可以使用此方法执行 SELECT 语句,以从表和暂存文件中检索数据,但应改用 tableread 方法。在开发工具中, tableread 之类的方法可以提供更好的语法突出显示、错误突出显示和智能代码补全效果。

指定应该如何转换数据集

要指定应选择哪些列,以及应如何对结果进行筛选、排序、分组等,请调用能转换数据集的 DataFrame 方法。要在这些方法中标识列,请使用 Functions.col 静态方法或计算结果为列的表达式。(请参阅 指定列和表达式。)

例如:

  • 若要指定应返回的行,请调用 filter 方法:

    // Create a DataFrame for the rows with the ID 1
    // in the "sample_product_data" table.
    DataFrame df = session.table("sample_product_data").filter(
      Functions.col("id").equal_to(Functions.lit(1)));
    df.show();
    
    Copy
  • 若要指定应选择的列,请调用 select 方法:

    // Create a DataFrame that contains the id, name, and serial_number
    // columns in te "sample_product_data" table.
    DataFrame df = session.table("sample_product_data").select(
      Functions.col("id"), Functions.col("name"), Functions.col("serial_number"));
    df.show();
    
    Copy

每个方法都返回一个经过转换的新 DataFrame 对象。(该方法不会影响原始 DataFrame 对象。)这就意味着,如果要应用多个转换,可以 将多个方法调用链接起来,基于前一个方法调用所返回的新 DataFrame 对象来调用每个后续转换方法。

请注意,这些转换方法不会从 Snowflake 数据库中检索数据。( 执行操作以计算 DataFrame 中描述的操作方法会执行数据检索。)转换方法只是指定应如何构造 SQL 语句。

指定列和表达式

调用这些转换方法时,可能需要指定列或者使用列的表达式。例如,调用 select 方法时,需要指定应选择的列。

要引用列,请通过调用 ` Functions.col `_ 静态方法来创建 ` Column `_ 对象。

DataFrame dfProductInfo = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
dfProductInfo.show();
Copy

备注

要为字面量创建 Column 对象,请参阅 将字面量用作列对象

指定筛选器、投影、联接条件等时,可以在表达式中使用 Column 对象。例如:

  • 可以将 Column 对象与 filter 方法一起使用,以指定筛选条件:

    // Specify the equivalent of "WHERE id = 12"
    // in an SQL SELECT statement.
    DataFrame df = session.table("sample_product_data");
    df.filter(Functions.col("id").equal_to(Functions.lit(12))).show();
    
    Copy
    // Specify the equivalent of "WHERE key + category_id < 10"
    // in an SQL SELECT statement.
    DataFrame df2 = session.table("sample_product_data");
    df2.filter(Functions.col("key").plus(Functions.col("category_id")).lt(Functions.lit(10))).show();
    
    Copy
  • 可以将 Column 对象与 select 方法一起使用,以定义别名:

    // Specify the equivalent of "SELECT key * 10 AS c"
    // in an SQL SELECT statement.
    DataFrame df3 = session.table("sample_product_data");
    df3.select(Functions.col("key").multiply(Functions.lit(10)).as("c")).show();
    
    Copy
  • 可以将 Column 对象与 join 方法一起使用,以定义联接条件:

    // Specify the equivalent of "sample_a JOIN sample_b on sample_a.id_a = sample_b.id_a"
    // in an SQL SELECT statement.
    DataFrame dfLhs = session.table("sample_a");
    DataFrame dfRhs = session.table("sample_b");
    DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));
    dfJoined.show();
    
    Copy

引用不同 DataFrames 中的列

引用两个不同 DataFrame 对象中具有相同名称的列时(例如,基于该列联接 DataFrames ),可以在每个 DataFrame 对象中使用 col 方法引用该对象中的列(例如 df1.col("name")df2.col("name"))。

下面的示例演示了如何使用 col 方法来引用特定 DataFrame 中的列。该示例联接两个 DataFrame 对象,两者均具有名为 value 的列。该示例使用 Column 对象的 as 方法来更改新创建的 DataFrame 中的列名称。

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a"))).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"));
dfJoined.show();
Copy

将对象标识符(表名称、列名称等)放在双引号里

您指定的数据库、架构、表和暂存区的名称必须符合 Snowflake 标识符要求。指定名称时,Snowflake 会将该名称视为大写形式。例如,以下调用是等效的:

// The following calls are equivalent:
df.select(Functions.col("id123"));
df.select(Functions.col("ID123"));
Copy

如果名称不符合标识符要求,则必须将名称放在双引号里 (")。对于 Scala 字符串字面量中的双引号字符,请使用反斜杠 (\) 进行转义。例如,以下表名称并非以字母或下划线开头,因此必须将该名称放在双引号里:

DataFrame df = session.table("\"10tablename\"");
Copy

请注意,指定 :emph:` 列 ` 的名称时,不需要将名称放在双引号里。如果列名称不符合标识符要求,Snowpark 库会自动将列名称放在双引号里:

// The following calls are equivalent:
df.select(Functions.col("3rdID"));
df.select(Functions.col("\"3rdID\""));

// The following calls are equivalent:
df.select(Functions.col("id with space"));
df.select(Functions.col("\"id with space\""));
Copy

如果已在列名称两侧添加了双引号,则该库不会在列名称两侧插入其他双引号。

在某些情况下,列名称可能包含双引号字符:

describe table quoted;
+------------------------+ ...
| name                   | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted"   | ...
+------------------------+ ...
Copy

标识符要求 中所述,对于带双引号的标识符中的每个双引号字符,都必须使用两个双引号字符(例如 "name_with_""air""_quotes""""column_name_quoted"""):

DataFrame dfTable = session.table("quoted");
dfTable.select("\"name_with_\"\"air\"\"_quotes\"");
dfTable.select("\"\"\"column_name_quoted\"\"\"");
Copy

请注意,如果将标识符放在双引号里(无论是您显式添加了引号,还是由库为您添加了引号), Snowflake 都会将标识符视为区分大小写

// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(Functions.col("id with space"));
df.select(Functions.col("ID WITH SPACE"));
Copy

将字面量用作列对象

要在传入 Column 对象的方法中使用字面量,请将字面量传递给 Functions 类中的 lit 静态方法,从而为字面量创建 Column 对象。例如:

// Show the first 10 rows in which category_id is greater than 5.
// Use `Functions.lit(5)` to create a Column object for the literal 5.
DataFrame df = session.table("sample_product_data");
df.filter(Functions.col("category_id").gt(Functions.lit(5))).show();
Copy

如果字面量是 Java 中的浮点或双精度值(例如 0.05 默认情况下被视为双精度值),则 Snowpark 库会生成 SQL,以将该值隐式转换为相应的 Snowpark 数据类型(例如 0.05::DOUBLE)。这可能会产生与指定的确切数字不同的近似值。

例如,以下代码不显示匹配的行,即使筛选器(匹配大于或等于 0.05 的值)应匹配 DataFrame 中的行:

// Create a DataFrame that contains the value 0.05.
DataFrame df = session.sql("select 0.05 :: Numeric(5, 2) as a");

// Applying this filter results in no matching rows in the DataFrame.
df.filter(Functions.col("a").leq(Functions.lit(0.06).minus(Functions.lit(0.01)))).show();
Copy

问题在于 Functions.lit(0.06)Functions.lit(0.01)0.060.01 产生近似值而不是确切值。

要避免此问题,请 将字面量转换为要使用的 Snowpark 类型。例如,要使用精度为 5、小数位数为 2 的 NUMBER,请运行以下语句:

import com.snowflake.snowpark_java.types.*;
...

df.filter(Functions.col("a").leq(Functions.lit(0.06).cast(DataTypes.createDecimalType(5, 2)).minus(Functions.lit(0.01).cast(DataTypes.createDecimalType(5, 2))))).show();
Copy

将列对象的类型转换为特定类型

要将 Column 对象的类型转换为特定类型,请调用 ` cast _ 方法,然后从 ` com.snowflake.snowpark_java.types `_ 传入类型对象。例如,要将字面量转换为精度为 5、小数位数为 2 :ref:`label-data_type_number,请运行以下语句:

// Import for the DecimalType class..
import com.snowflake.snowpark_java.types.*;

Column decimalValue = Functions.lit(0.05).cast(DataTypes.createDecimalType(5,2));
Copy

链接多个方法调用

由于每个 转换 DataFrame 对象的方法 都会返回一个应用了转换的新 DataFrame 对象,因此您可以 链接多个方法调用 (link removed),以生成以其他方式转换的新 DataFrame。

下面的示例返回为如下目的而配置的 DataFrame :

  • 查询 sample_product_data 表。

  • 返回 id = 1 的行。

  • 选择 nameserial_number 列。

DataFrame dfProductInfo = session.table("sample_product_data").filter(Functions.col("id").equal_to(Functions.lit(1))).select(Functions.col("name"), Functions.col("serial_number"));
dfProductInfo.show();
Copy

在此示例中:

  • session.table("sample_product_data") 返回 sample_product_data 表的 DataFrame。

    尽管 DataFrame 尚且不包含表中的数据,但该对象确实包含表列的定义。

  • filter(Functions.col("id").equal_to(Functions.lit(1))) 返回 sample_product_data 表的 DataFrame(设置为返回 id = 1 的行)。

    请再次注意, DataFrame 尚未包含表中匹配的行。在 :ref:` 调用操作方法 <label-snowpark_java_dataframe_action_method>` 之前,不会检索匹配的行。

  • select(Functions.col("name"), Functions.col("serial_number")) 返回一个 DataFrame,包含 sample_product_data 表中 id = 1 的行的 nameserial_number 列。

在链接多个方法调用时,请注意调用顺序非常重要。每个方法调用都返回一个已转换的 DataFrame。确保后续调用使用已转换的 DataFrame。

例如,在下面的代码中,select 方法返回仅包含两列的 DataFrame:nameserial_number。对此 DataFrame 的 filter 方法调用会失败,因为它使用的 id 列不在已转换的 DataFrame 中。

// This fails with the error "invalid identifier 'ID'."
DataFrame dfProductInfo = session.table("sample_product_data").select(Functions.col("name"), Functions.col("serial_number")).filter(Functions.col("id").equal_to(Functions.lit(1)));
dfProductInfo.show();
Copy

相反,以下代码会成功执行,因为对包含 sample_product_data 表中所有列(包括 id 列)的 DataFrame 调用了 filter() 方法:

// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
DataFrame dfProductInfo = session.table("sample_product_data").filter(Functions.col("id").equal_to(Functions.lit(1))).select(Functions.col("name"), Functions.col("serial_number"));
dfProductInfo.show();
Copy

请注意,可能需要按照不同于在 SQL 语句中使用等效关键字(SELECT 和 WHERE)时的顺序执行 selectfilter 方法调用。

限制 DataFrame 中的行数

要限制 DataFrame 中的行数,可以使用 ` limit `_ 转换方法。

Snowpark API 还提供了以下操作方法,可检索和打印出有限数量的行:

  • ` first _ 操作方法(用于执行查询并返回前 :samp:`{n} 行)

  • ` show _ 操作方法(用于执行查询并打印前 :samp:`{n} 行)

这些方法能有效地将 LIMIT 子句添加到执行的 SQL 语句中。

如 :ref:` LIMIT 使用说明 <label-limit_cmd_usage_notes>` 中所述,除非将排序顺序 (ORDER BY) 与 LIMIT 一起指定,否则结果是不确定的。

要使 ORDER BY 子句与 LIMIT 子句一起出现(从而避免 ORDER BY 出现在另一个子查询等地方),必须调用 limit 方法,以限制 sort 方法返回的 DataFrame 中的结果数。

例如,如果 链接多个方法调用

DataFrame df = session.table("sample_product_data");

// Limit the number of rows to 5, sorted by parent_id.
DataFrame dfSubset = df.sort(Functions.col("parent_id")).limit(5);

// Return the first 5 rows, sorted by parent_id.
Row[] arrayOfRows = df.sort(Functions.col("parent_id")).first(5);

// Print the first 5 rows, sorted by parent_id.
df.sort(Functions.col("parent_id")).show(5);
Copy

检索列定义

要在 DataFrame 的数据集中检索列的定义,请调用 schema 方法。此方法会返回一个 StructType 对象,其中包含 StructField 对象的 Array。每个 StructField 对象都包含一列的定义。

import com.snowflake.snowpark_java.types.*;
...

// Get the StructType object that describes the columns in the
// underlying rowset.
StructType tableSchema = session.table("sample_product_data").schema();
System.out.println("Schema for sample_product_data: " + tableSchema);
Copy

在返回的 StructType 对象中,列名称始终是规范化的。不带引号的标识符以大写形式返回,带引号的标识符以其定义所用的确切大小写形式返回。

下面的示例创建一个 DataFrame,其中包含名为 ID3rd 的列。对于名为 3rd 的列,Snowpark 库会自动将名称放在双引号里(即 "3rd"),因为 该名称不符合标识符要求

该示例调用 schema 方法,随后对返回的 StructType 对象调用 names 方法,以获取列名称的数组。这些名称使用 schema 方法返回的 StructType 进行规范化。

import java.util.Arrays;
...

// Create a DataFrame containing the "id" and "3rd" columns.
DataFrame dfSelectedColumns = session.table("sample_product_data").select(Functions.col("id"), Functions.col("3rd"));
// Print out the names of the columns in the schema.
System.out.println(Arrays.toString(dfSelectedColumns.schema().names()));
Copy

联接 DataFrames

若要联接 DataFrame 对象,请调用 ` join `_ 方法。

以下部分说明如何使用 DataFrames 执行联接:

设置联接的示例数据

后面部分中的示例使用示例数据,您可以通过执行以下 SQL 语句来设置这些数据:

CREATE OR REPLACE TABLE sample_a (
  id_a INTEGER,
  name_a VARCHAR,
  value INTEGER
);
INSERT INTO sample_a (id_a, name_a, value) VALUES
  (10, 'A1', 5),
  (40, 'A2', 10),
  (80, 'A3', 15),
  (90, 'A4', 20)
;
CREATE OR REPLACE TABLE sample_b (
  id_b INTEGER,
  name_b VARCHAR,
  id_a INTEGER,
  value INTEGER
);
INSERT INTO sample_b (id_b, name_b, id_a, value) VALUES
  (4000, 'B1', 40, 10),
  (4001, 'B2', 10, 5),
  (9000, 'B3', 80, 15),
  (9099, 'B4', NULL, 200)
;
CREATE OR REPLACE TABLE sample_c (
  id_c INTEGER,
  name_c VARCHAR,
  id_a INTEGER,
  id_b INTEGER
);
INSERT INTO sample_c (id_c, name_c, id_a, id_b) VALUES
  (1012, 'C1', 10, NULL),
  (1040, 'C2', 40, 4000),
  (1041, 'C3', 40, 4001)
;
Copy

指定联接的列

利用 DataFrame.join 方法,可以通过以下方式之一指定要使用的列:

  • 指定描述联接条件的列表达式。

  • 指定应用作联接中的公共列的一个或多个列。

以下示例对名为 id_a 的列执行内部联接:

// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));
dfJoined.show();
Copy

请注意,该示例使用 DataFrame.col 方法来指定要用于联接的条件。有关此方法的更多信息,请参阅 指定列和表达式

这将打印以下输出:

----------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |
----------------------------------------------------------------------
|10      |A1        |5        |4001    |B2        |10      |5        |
|40      |A2        |10       |4000    |B1        |40      |10       |
|80      |A3        |15       |9000    |B3        |80      |15       |
----------------------------------------------------------------------
Copy
联接结果中重复的相同列名称

在联接产生的 DataFrame 中,Snowpark 库使用在已联接的表中找到的列名称,即使这些列名称在表之间相同也是如此。发生这种情况时,这些列名称将在联接产生的 DataFrame 中重复。要按名称访问重复的列,请对表示列的原始表的 DataFrame 调用 col 方法。(有关指定列的更多信息,请参阅 引用不同 DataFrames 中的列。)

以下示例中的代码联接两个 DataFrames,然后对已联接的 DataFrame 调用 select 方法。此代码通过从表示相应 DataFrame 对象( dfRhsdfLhs)的变量调用 col 方法来指定要选择的列。它使用 as 方法为 select 方法创建的 DataFrame 中的列提供新名称。

DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));
DataFrame dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"));
dfSelected.show();
Copy

这将打印以下输出:

------------------------------
|"LEFTVALUE"  |"RIGHTVALUE"  |
------------------------------
|5            |5             |
|10           |10            |
|15           |15            |
------------------------------
Copy
在保存或缓存之前删除重复列

请注意,当联接产生的 DataFrame 包含重复的列名称时,必须通过删除重复列或重命名列来移除 DataFrame 中的重复项,然后才能将结果保存到表中或者缓存 DataFrame。对于保存到表或缓存的 DataFrame 中的重复列名称,Snowpark 库会将重复的列名称替换为别名,让它们不再重复。

对于缓存的 DataFrame,下例说明了其输出在以下情况下将如何显示:列名称 ID_AVALUE 在两个表的联接结果中重复,之后,在缓存结果之前未删除重复列或未重命名列。

--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A"  |"NAME_A"  |"l_ZSz7_VALUE"  |"ID_B"  |"NAME_B"  |"r_heec_ID_A"  |"r_heec_VALUE"  |
--------------------------------------------------------------------------------------------------
|10             |A1        |5               |4001    |B2        |10             |5               |
|40             |A2        |10              |4000    |B1        |40             |10              |
|80             |A3        |15              |9000    |B3        |80             |15              |
--------------------------------------------------------------------------------------------------
Copy

执行自然联接

要执行 :ref:` 自然联接 <label-querying_join_natural>` (在具有相同名称的列上联接 DataFrames),请调用 ` naturalJoin `_ 方法。

以下示例在 sample_asample_b 的公共列( id_a 列)上联接这两个表的 DataFrames :

DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.naturalJoin(dfRhs);
dfJoined.show();
Copy

这将打印以下输出:

---------------------------------------------------
|"ID_A"  |"VALUE"  |"NAME_A"  |"ID_B"  |"NAME_B"  |
---------------------------------------------------
|10      |5        |A1        |4001    |B2        |
|40      |10       |A2        |4000    |B1        |
|80      |15       |A3        |9000    |B3        |
---------------------------------------------------
Copy

指定联接类型

默认情况下, DataFrame.join 方法创建内部联接。要指定其他类型的联接,请将 joinType 实参设置为以下值之一:

联接类型

joinType

内部联接

:code:`inner`(默认)

交叉联接

cross

完整外部联接

full

左外部联接

left

左反联接

leftanti

左半联接

leftsemi

右外部联接

right

例如:

// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")), "left");
dfLeftOuterJoin.show();
Copy

这将打印以下输出:

----------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |
----------------------------------------------------------------------
|40      |A2        |10       |4000    |B1        |40      |10       |
|10      |A1        |5        |4001    |B2        |10      |5        |
|80      |A3        |15       |9000    |B3        |80      |15       |
|90      |A4        |20       |NULL    |NULL      |NULL    |NULL     |
----------------------------------------------------------------------
Copy

联接多个表

要联接多个表,请执行以下步骤:

  1. 为每个表创建一个 DataFrame。

  2. 对第一个 DataFrame 调用 DataFrame.join 方法,并传入第二个 DataFrame。

  3. 使用 join 方法返回的 DataFrame 来调用 join 方法,并传入第三个 DataFrame。

可以 链接 join 调用,如下所示:

DataFrame dfFirst = session.table("sample_a");
DataFrame dfSecond  = session.table("sample_b");
DataFrame dfThird = session.table("sample_c");
DataFrame dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a").equal_to(dfSecond.col("id_a"))).join(dfThird, dfFirst.col("id_a").equal_to(dfThird.col("id_a")));
dfJoinThreeTables.show();
Copy

这将打印以下输出:

------------------------------------------------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |"ID_C"  |"NAME_C"  |"ID_A"  |"ID_B"  |
------------------------------------------------------------------------------------------------------------
|10      |A1        |5        |4001    |B2        |10      |5        |1012    |C1        |10      |NULL    |
|40      |A2        |10       |4000    |B1        |40      |10       |1040    |C2        |40      |4000    |
|40      |A2        |10       |4000    |B1        |40      |10       |1041    |C3        |40      |4001    |
------------------------------------------------------------------------------------------------------------
Copy

执行自联接

如果需要基于不同的列将表与其自身联接,则不能使用单个 DataFrame 执行自联接。以下使用单个 DataFrame 执行自联接的示例最终失败,因为联接的左侧和右侧都存在 "id" 的列表达式:

// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, Functions.col("id").equal_to(Functions.col("parent_id")));
Copy
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, df.col("id").equal_to(df.col("parent_id")));
Copy

上面两个示例都失败了,并引发以下异常:

Exception in thread "main" com.snowflake.snowpark_java.SnowparkClientException:
  Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
  Instead, join this DataFrame to a clone() of itself.
Copy

请改用 ` clone `_ 方法创建 DataFrame 对象的克隆,并使用这两个 DataFrame 对象执行联接:

// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
DataFrame dfLhs = session.table("sample_product_data");
// Clone the DataFrame object to use as the right-hand side of the join.
DataFrame dfRhs = dfLhs.clone();

// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id").equal_to(dfRhs.col("parent_id")));
dfJoined.show();
Copy

如果要基于同一列执行自联接,请调用 join 方法,为 USING 子句传入列的名称(或列名称的数组):

// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, "key");
Copy

执行操作以计算 DataFrame

如前所述, DataFrame 是延迟计算的,也就是说,在您执行操作之前, SQL 语句不会发送到服务器执行。执行操作会导致对 DataFrame 进行计算,并将相应的 SQL 语句发送到服务器执行。

以下部分介绍如何同步和异步地对 DataFrame 执行操作:

同步执行操作

要同步执行操作,请调用以下操作方法之一:

同步执行操作的方法

描述

DataFrame.collect()

计算 DataFrame,并将生成的数据集作为 ` Row _ 对象的 :code:`Array 返回。请参阅 返回所有行

DataFrame.toLocalIterator()

计算 DataFrame,并返回 ` Row _ 对象的 :code:`Iterator。如果结果集很大,请使用此方法,以避免将所有结果同时加载到内存中。请参阅 返回行的迭代器

DataFrame.count()

计算 DataFrame 并返回行数。

DataFrame.show()

计算 DataFrame,并将行打印到控制台。请注意,此方法将行数限制为 10 行(默认值)。请参阅 打印 DataFrame 中的行

DataFrame.cacheResult()

执行查询,创建临时表,并将结果放入表中。该方法返回一个 HasCachedResult 对象,您可以使用该对象访问此临时表中的数据。请参阅 缓存 DataFrame

DataFrame.write().saveAsTable()

将 DataFrame 中的数据保存到指定表中。请参阅 将数据保存到表中

DataFrame.read().fileformat().copyInto('tableName')

将 DataFrame 中的数据复制到指定表中。请参阅 将数据从文件复制到表中

Session.table('tableName').delete()

删除指定表中的行。请参阅 更新、删除和合并表中的行

Session.table('tableName').update()Session.table('tableName').updateColumn()

更新指定表中的行。请参阅 更新、删除和合并表中的行

Session.table('tableName').merge().methods.collect()

将行合并到指定的表中。请参阅 更新、删除和合并表中的行

例如,要执行查询并返回结果数,请调用 count 方法:

// Create a DataFrame for the "sample_product_data" table.
DataFrame dfProducts = session.table("sample_product_data");

// Send the query to the server for execution and
// print the count of rows in the table.
System.out.println("Rows returned: " + dfProducts.count());
Copy

还可以调用操作方法来:

注意:如果调用 schema 方法来获取 DataFrame 中列的定义,则无需调用操作方法。

异步执行操作

备注

此功能是在 Snowpark 0.11.0 中引入的。

要异步执行操作,请调用 async 方法以返回“异步执行者”对象(例如 DataFrameAsyncActor),然后在该对象中调用异步操作方法。

异步执行者对象的这些操作方法会返回一个 TypedAsyncJob 对象,您可以使用该对象检查异步操作的状态和检索操作的结果。

接下来的部分将介绍如何异步执行操作和检查结果。

了解异步操作的基本流程

可以使用以下方法异步执行操作:

异步执行操作的方法

描述

DataFrame.async().collect()

异步计算 DataFrame,以 ` Row _ 对象的 :code:`Array 的形式检索生成的数据集。请参阅 返回所有行

DataFrame.async.toLocalIterator

异步计算 DataFrame,以检索 ` Row _ 对象的 :code:`Iterator。如果结果集很大,请使用此方法,以避免将所有结果同时加载到内存中。请参阅 返回行的迭代器

DataFrame.async().count()

异步计算 DataFrame 以检索行数。

DataFrame.write().async().saveAsTable()

将 DataFrame 中的数据异步保存到指定表中。请参阅 将数据保存到表中

DataFrame.read().fileformat().async().copyInto('tableName')

将 DataFrame 中的数据异步复制到指定的表中。请参阅 将数据从文件复制到表中

Session.table('tableName').async().delete()

异步删除指定表中的行。请参阅 更新、删除和合并表中的行

Session.table('tableName').async().update()Session.table('tableName').async().updateColumn()

异步更新指定表中的行。请参阅 更新、删除和合并表中的行

从返回的 ` TypedAsyncJob `_ 对象中,您可以执行以下操作:

  • 要确定操作是否已完成,请调用 isDone 方法。

  • 要获取与操作对应的查询 ID,请调用 getQueryId 方法。

  • 要返回操作的结果(例如,对于 collect 方法为 Row 对象的 Array ;对于 count 方法为行数),请调用 getResult 方法。

    请注意, getResult 是阻塞调用。

  • 要取消操作,请调用 cancel 方法。

例如,要异步执行查询,并以 Row 对象的 Array 的形式检索结果,请调用 async().collect()

import java.util.Arrays;

// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
DataFrame df = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));

// Execute the query asynchronously.
// This call does not block.
TypedAsyncJob<Row[]> asyncJob = df.async().collect();
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
Row[] results = asyncJob.getResult();
System.out.println(Arrays.toString(results));
Copy

要异步执行查询并检索结果数,请调用 async().count()

// Create a DataFrame for the "sample_product_data" table.
DataFrame dfProducts = session.table("sample_product_data");

// Execute the query asynchronously.
// This call does not block.
TypedAsyncJob<Long> asyncJob = dfProducts.async().count();
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// Print the count of rows in the table.
// Note that getResult is a blocking call.
System.out.println("Rows returned: " + asyncJob.getResult());
Copy

指定等待的最大秒数

调用 getResult 方法时,可以使用 maxWaitTimeInSeconds 实参指定在尝试检索结果之前等待查询完成的最大秒数。例如:

// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
Row[] results = asyncJob.getResult(10);
Copy

如果省略此实参,该方法将等待 :ref:` snowpark_request_timeout_in_seconds <label-snowpark_java_request_timeout_in_seconds>` 配置属性指定的最大秒数。( :ref:` 创建 Session 对象 <label-snowpark_java_creating_session>` 时可以设置此属性。)

通过 ID 访问异步查询

如果有之前提交的异步查询的查询 ID,则可以调用 Session.createAsyncJob 方法创建一个 ` AsyncJob `_ 对象,该对象可用于检查查询状态、检索查询结果或取消查询。

请注意,与 TypedAsyncJob 不同的是, AsyncJob 没有提供用于检索结果的 getResult 方法。如果需要检索结果,请改为调用 getRowsgetIterator 方法。

例如:

import java.util.Arrays;
...

AsyncJob asyncJob = session.createAsyncJob(myQueryId);
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
Row[] rows = asyncJob.getRows();
System.out.println(Arrays.toString(rows));
Copy

将行检索到 DataFrame 之中

指定 DataFrame 应如何转换 来执行查询并返回结果。可以返回 Array 中的所有行,也可以返回一个 Iterator,以便逐行遍历结果。在后一种情况下,如果数据量较大,则行将按块加载到内存中,以免将大量数据加载到内存中。

返回所有行

要同时返回所有行,请调用 collect 方法。此方法返回 ` Row _ 对象的数组。要从行中检索值,请调用 :samp:`get{Type} 方法(例如 getStringgetInt 等)。

例如:

Row[] rows = session.table("sample_product_data").select(Functions.col("name"), Functions.col("category_id")).sort(Functions.col("name")).collect();
for (Row row : rows) {
  System.out.println("Name: " + row.getString(0) + "; Category ID: " + row.getInt(1));
}
Copy

返回行的迭代器

如果要使用 Iterator 遍历结果中的 ` Row _ 对象,请调用 `toLocalIterator。如果结果中的数据量很大,该方法将按块加载行,以免同时将所有行加载到内存中。

例如:

import java.util.Iterator;

Iterator<Row> rowIterator = session.table("sample_product_data").select(Functions.col("name"), Functions.col("category_id")).sort(Functions.col("name")).toLocalIterator();
while (rowIterator.hasNext()) {
  Row row = rowIterator.next();
  System.out.println("Name: " + row.getString(0) + "; Category ID: " + row.getInt(1));
}
Copy

返回前 n

要返回前 n 行,请调用 first 方法,并传入要返回的行数。

限制 DataFrame 中的行数 中所述,结果是非确定性的。如果希望结果是确定性的,请对经过排序的 DataFrame 调用此方法 (df.sort().first())。

例如:

import java.util.Arrays;
...

DataFrame df = session.table("sample_product_data");
Row[] rows = df.sort(Functions.col("name")).first(5);
System.out.println(Arrays.toString(rows));
Copy

打印 DataFrame 中的行

要将 DataFrame 中的前 10 行打印到控制台,请调用 show 方法。要打印出其他数量的行,请传入要打印的行数。

限制 DataFrame 中的行数 中所述,结果是非确定性的。如果希望结果是确定性的,请对经过排序的 DataFrame 调用此方法 (df.sort().show())。

例如:

DataFrame df = session.table("sample_product_data");
df.sort(Functions.col("name")).show();
Copy

更新、删除和合并表中的行

备注

此功能是在 Snowpark 0.7.0 中引入的。

当您调用 Session.table 为表创建 DataFrame 对象时,该方法将返回一个 Updatable 对象,此对象通过用于更新和删除表中数据的其他方法扩展了 DataFrame。(请参阅 ` Updatable `_。)

如果需要更新或删除表中的行,可以使用 Updatable 类的以下方法:

  • 调用 updateupdateColumn,以更新表中的现有行。请参阅 更新表中的行

  • 调用 delete 以删除表中的行。请参阅 删除表中的行

  • 调用 merge,以根据另一个表或子查询中的数据在一个表中插入、更新和删除行。(这等效于 SQL 中的 MERGE 命令。)请参阅 将行合并到一个表中

更新表中的行

要更新表中的行,请调用 updateupdateColumn 方法,并传入 Map (它关联要更新的列和要分配给这些列的相应值):

  • 要将列名称指定为 Map 中的字符串,请调用 updateColumn

  • 要在 Map 中指定 Column 对象,请调用 update

这两种方法都返回一个 UpdateResult 对象,其中包含已更新的行数。(请参阅 UpdateResult。)

备注

这两种方法都是 操作方法,这意味着调用方法会将 SQL 语句发送到服务器执行。

例如,要将名为 count 的列中的值替换为值 1,并且要使用将列名称 (String) 与相应值关联的 Map,请调用 updateColumn

import java.util.HashMap;
import java.util.Map;
...

Map<String, Column> assignments = new HashMap<>();
assignments.put("3rd", Functions.lit(1));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.updateColumn(assignments);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Copy

如果要在 Map 中使用 Column 对象来标识要更新的列,请调用 update

import java.util.HashMap;
import java.util.Map;
...

Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(1));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.update(assignments);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Copy

如果仅在满足条件时才应进行更新,则可以将该条件指定为实参。例如,对于 category_id 列的值为 20 的行,要将名为 count 的列中的值替换为 2,请运行以下语句:

import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(2));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.update(assignments, Functions.col("category_id").equal_to(Functions.lit(20)));
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Copy

如果需要让条件基于与另一个 DataFrame 对象进行的联接,则可以将该 DataFrame 作为实参传入,并在条件中使用该 DataFrame。例如,对于 category_id 列与 DataFrame dfParts 中的 category_id 匹配的行,要将名为 count 的列中的值替换为 3,请运行以下语句:

import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(3));
Updatable updatableDf = session.table("sample_product_data");
DataFrame dfParts = session.table("parts");
UpdateResult updateResult = updatableDf.update(assignments, updatableDf.col("category_id").equal_to(dfParts.col("category_id")), dfParts);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Copy

删除表中的行

对于 delete 方法,可以指定一个条件来标识要删除的行,并且可以使该条件基于与另一个 DataFrame 进行的联接。delete 会返回一个 DeleteResult 对象,其中包含已删除的行数。(请参阅 DeleteResult。)

备注

delete操作方法,这意味着调用方法会将 SQL 语句发送到服务器执行。

例如,要删除 category_id 列中的值为 1 的行,请运行以下语句:

Updatable updatableDf = session.table("sample_product_data");
DeleteResult deleteResult = updatableDf.delete(updatableDf.col("category_id").equal_to(Functions.lit(1)));
System.out.println("Number of rows deleted: " + deleteResult.getRowsDeleted());
Copy

如果条件引用另一个 DataFrame 中的列,则将该 DataFrame 作为第二个实参传入。例如,要删除 category_id 列与 DataFrame dfParts 中的 category_id 匹配的行,请将 dfParts 作为第二个实参传入:

Updatable updatableDf = session.table("sample_product_data");
DeleteResult deleteResult = updatableDf.delete(updatableDf.col("category_id").equal_to(dfParts.col("category_id")), dfParts);
System.out.println("Number of rows deleted: " + deleteResult.getRowsDeleted());
Copy

将行合并到一个表中

要根据另一个表或子查询中的值在一个表中插入、更新和删除行(等效于 SQL 中的 MERGE 命令),请执行以下步骤:

  1. 在要将数据合并到的表的 Updatable 对象中,调用 merge 方法,并传入另一个表的 DataFrame 对象和联接条件的列表达式。

    这将返回一个 MergeBuilder 对象,您可以使用该对象指定要对匹配的行和不匹配的行执行的操作(例如插入、更新或删除)。(请参阅 MergeBuilder。)

  2. 使用 MergeBuilder 对象:

    • 要指定应对匹配的行执行的更新或删除,请调用 whenMatched 方法。

      如果需要指定有关何时应更新或删除行的附加条件,则可以传入该条件的列表达式。

      此方法返回一个 MatchedClauseBuilder 对象,可用于指定要执行的操作。(请参阅 MatchedClauseBuilder。)

      调用 MatchedClauseBuilder 对象中的 updatedelete 方法,以指定应对匹配的行执行的更新或删除操作。这些方法返回一个 MergeBuilder 对象,可用于指定其他子句。

    • 要指定应在行不匹配时执行的插入,请调用 whenNotMatched 方法。

      如果需要指定有关何时应插入行的附加条件,则可以传入该条件的列表达式。

      此方法返回一个 NotMatchedClauseBuilder 对象,可用于指定要执行的操作。(请参阅 NotMatchedClauseBuilder。)

      调用 NotMatchedClauseBuilder 对象中的 insert 方法,以指定应在行不匹配时执行的插入操作。这些方法返回一个 MergeBuilder 对象,可用于指定其他子句。

  3. 指定了应执行的插入、更新和删除后,调用 MergeBuilder 对象的 collect 方法,以对表执行指定的插入、更新和删除。

    collect 会返回一个 MergeResult 对象,其中包含已插入、更新和删除的行数。(请参阅 ``_。)

以下示例将 source 表中带有 idvalue 列的行插入到 target 表中(如果 target 表未包含具有匹配 ID 的行):

MergeResult mergeResult = target.merge(source, target.col("id").equal_to(source.col("id")))
                    .whenNotMatched().insert([source.col("id"), source.col("value")])
                    .collect();
Copy

以下示例使用 source 表中具有同一 ID 的行的 value 列值更新 target 表中的行:

import java.util.HashMap;
import java.util.Map;
...
Map<String, Column> assignments = new HashMap<>();
assignments.put("value", source.col("value"));
MergeResult mergeResult = target.merge(source, target.col("id").equal_to(source.col("id")))
                    .whenMatched().update(assignments)
                    .collect();
Copy

将数据保存到表中

可以将 DataFrame 的内容保存到新表或现有表中。为此,您必须具有以下权限:

  • 对架构的 CREATE TABLE 权限(如果表不存在)。

  • 对表的 INSERT 权限。

要将 DataFrame 的内容保存到表中,请执行以下操作:

  1. 调用 DataFrame 的 write 方法,以获取 DataFrameWriter 对象。

  2. 调用 DataFrameWriter 对象的 ` mode `_ 方法,并传入一个 ` SaveMode `_ 对象,以指定有关写入表的首选项:

    • 要插入行,请传入 SaveMode.Append

    • 要覆盖现有表,请传入 SaveMode.Overwrite

    此方法返回使用指定模式进行了配置的相同 DataFrameWriter 对象。

  3. 如果要在现有表中插入行 (SaveMode.Append),并且 DataFrame 中的列名称与表中的列名称匹配,请调用 ` DataFrameWriter.option _,并传入 :code:"columnOrder"` 和 "name" 作为实参。

    备注

    此方法是在 Snowpark 1.4.0 中引入的。

    默认情况下, columnOrder 选项设置为 "index",这意味着 DataFrameWriter 按照列的出现顺序插入值。例如, DataFrameWriter 在表中第一列内插入 DataFrame 中第一列的值,在表中第二列内插入 DataFrame 中第二列的值,依此类推。

    此方法返回使用指定选项进行了配置的同一个 DataFrameWriter 对象。

  4. 调用 DataFrameWriter 对象的 ` saveAsTable `_ 方法,以将 DataFrame 的内容保存到指定的表中。

    无需调用单独的方法(例如 collect)来执行将数据保存到表中的 SQL 语句。saveAsTable 是执行 SQL 语句的 操作方法

以下示例使用 DataFrame df 的内容覆盖现有表(由 tableName 变量标识):

df.write().mode(SaveMode.Overwrite).saveAsTable(tableName);
Copy

以下示例将 DataFrame df 中的行插入到现有表(由 tableName 变量标识)。在此示例中,表和 DataFrame 都包含列 c1c2

该示例展示了这两种做法的区别:将 columnOrder 选项设置为 "name" (这会将值插入到与 DataFrame 列同名的表列中),以及使用默认的 columnOrder 选项(这会根据 DataFrame 中列的顺序将值插入到表列中)。

DataFrame df = session.sql("SELECT 1 AS c2, 2 as c1");
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write().mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName);
// With the default value of the columnOrder option ("index"), the DataFrameWriter uses the column positions
// and inserts a row with the values (1, 2).
df.write().mode(SaveMode.Append).saveAsTable(tableName);
Copy

基于 DataFrame 创建视图

要基于 DataFrame 创建视图,请调用 createOrReplaceView 方法:

df.createOrReplaceView("db.schema.viewName");
Copy

请注意,调用 createOrReplaceView 会立即创建新视图。更重要的是,它不会导致对 DataFrame 进行计算。(在您 :ref:` 执行操作 <label-snowpark_java_dataframe_action_method>` 之前,不会对 DataFrame 本身进行计算。)

通过调用 createOrReplaceView 创建的视图是持久保留的。如果不再需要该视图,可以 手动删除视图

如果只需为会话创建临时视图,请改为调用 createOrReplaceTempView 方法:

df.createOrReplaceTempView("db.schema.viewName");
Copy

缓存 DataFrame

在某些情况下,可能需要执行复杂的查询,并将结果保留下来,以供后续操作使用(而不必再次执行相同的查询)。在此类情况下,可以调用 cacheResult 方法来缓存 DataFrame 的内容。

此方法会:

  • 运行查询。

    在调用 cacheResult 之前,无需 调用单独的操作方法来检索结果cacheResult 是执行查询的操作方法。

  • 将结果保存在临时表中

    由于 cacheResult 会创建临时表,因此,您必须对正在使用的架构具有 CREATE TABLE 权限。

  • 返回一个 ` HasCachedResult `_ 对象,该对象提供对临时表中结果的访问权。

    由于 HasCachedResult 扩展了 DataFrame,因此,可以对此缓存数据执行一些同样可以对 DataFrame 执行的操作。

备注

由于 cacheResult 执行查询并将结果保存到表中,因此该方法可能会导致计算和存储成本增加。

例如:

// Set up a DataFrame to query a table.
DataFrame df = session.table("sample_product_data").filter(Functions.col("category_id").gt(Functions.lit(10)));
// Retrieve the results and cache the data.
HasCachedResult cachedDf = df.cacheResult();
// Create a DataFrame containing a subset of the cached data.
DataFrame dfSubset = cachedDf.filter(Functions.col("category_id").equal_to(Functions.lit(20))).select(Functions.col("name"), Functions.col("category_id"));
dfSubset.show();
Copy

请注意,调用该方法时,原始的 DataFrame 不受影响。例如,假设 dfTablesample_product_data 表的 DataFrame :

HasCachedResult dfTempTable = dfTable.cacheResult();
Copy

调用 cacheResult 后, dfTable 仍指向 sample_product_data 表,而且您可以继续使用 dfTable 来查询和更新该表。

要使用临时表中的缓存数据,请使用 dfTempTable (即 cacheResult 返回的 HasCachedResult 对象)。

处理暂存区中的文件

Snowpark 库提供了一些类和方法,可让您通过使用暂存区中的文件 将数据加载到 Snowflake 中,以及 从 Snowflake 卸载数据

备注

为了在暂存区上使用这些类和方法,您必须具有 使用暂存区所需的权限

接下来的部分将说明如何使用这些类和方法:

在暂存区中上传和下载文件

要在暂存区中上传和下载文件,请使用 ` FileOperation _ 对象的 :code:`putget 方法:

将文件上传到暂存区

要将文件上传到暂存区,请执行以下步骤:

  1. 验证您是否具有 将文件上传到暂存区的权限

  2. 使用 Session 对象的 ` file `_ 方法访问会话的 ` FileOperation `_ 对象。

  3. 调用 FileOperation 对象的 ` put `_ 方法将文件上传到暂存区。

    此方法执行 SQL PUT 命令。

    • 要为 PUT 命令指定任何 可选参数,请创建参数和值的 Map,然后传入该 Map 作为 options 实参。例如:

      import java.util.HashMap;
      import java.util.Map;
      ...
      // Upload a file to a stage without compressing the file.
      Map<String, String> putOptions = new HashMap<>();
      putOptions.put("AUTO_COMPRESS", "FALSE");
      PutResult[] putResults = session.file().put("file:///tmp/myfile.csv", "@myStage", putOptions);
      
      Copy
    • localFileName 实参中,可以使用通配符(*?)来标识要上传的一组文件。例如:

      // Upload the CSV files in /tmp with names that start with "file".
      // You can use the wildcard characters "*" and "?" to match multiple files.
      PutResult[] putResults = session.file().put("file:///tmp/file*.csv", "@myStage/prefix2")
      
      Copy
  4. 检查 put 方法返回的 ` PutResult _ 对象的 :code:`Array,以确定文件是否上传成功。例如,要打印该文件的文件名和 PUT 操作的状态,请运行以下语句:

    // Print the filename and the status of the PUT operation.
    for (PutResult result : putResults) {
      System.out.println(result.getSourceFileName() + ": " + result.getStatus());
    }
    
    Copy

从暂存区下载文件

要从暂存区下载文件,请执行以下步骤:

  1. 验证您是否具有 从暂存区下载文件的权限

  2. 使用 Session 对象的 ` file `_ 方法访问会话的 ` FileOperation `_ 对象。

  3. 调用 FileOperation 对象的 ` get `_ 方法,以从暂存区下载文件。

    此方法执行 SQL GET 命令。

    要为 GET 命令指定任何 可选参数,请创建参数和值的 Map,然后传入该 Map 作为 options 实参。例如:

    import java.util.HashMap;
    import java.util.Map;
    ...
    // Upload a file to a stage without compressing the file.
    // Download files with names that match a regular expression pattern.
    Map<String, String> getOptions = new HashMap<>();
    getOptions.put("PATTERN", "'.*file_.*.csv.gz'");
    GetResult[] getResults = session.file().get("@myStage", "file:///tmp", getOptions);
    
    Copy
  4. 检查 get 方法返回的 ` GetResult _ 对象的 :code:`Array,以确定文件是否下载成功。例如,要打印该文件的文件名和 GET 操作的状态,请运行以下语句:

    // Print the filename and the status of the GET operation.
    for (GetResult result : getResults) {
      System.out.println(result.getFileName() + ": " + result.getStatus());
    }
    
    Copy

使用输入流在暂存区中上传和下载数据

备注

此功能是在 Snowpark 1.4.0 中引入的。

要使用输入流将数据上传到暂存区上的文件,以及从暂存区上的文件下载数据,请使用 ` FileOperation _ 对象的 :code:`uploadStreamdownloadStream 方法:

使用输入流将数据上传到暂存区上的文件

要将数据从 ` java.io.InputStream `_ 对象上传到暂存区上的文件,请执行以下步骤:

  1. 验证您是否具有 将文件上传到暂存区的权限

  2. 使用 Session 对象的 ` file `_ 方法访问会话的 ` FileOperation `_ 对象。

  3. 调用 FileOperation 对象的 ` uploadStream `_ 方法。

    传入应写入数据的暂存区上文件的完整路径和 InputStream 对象。此外,使用 compress 实参,指定在上传数据之前是否应压缩数据。

例如:

import java.io.InputStream;
...
boolean compressData = true;
String pathToFileOnStage = "@myStage/path/file";
session.file().uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData);
Copy

使用输入流从暂存区上的文件下载数据

要将数据从暂存区上的文件下载到 ` java.io.InputStream `_ 对象,请执行以下操作:

  1. 验证您是否具有 从暂存区下载文件的权限

  2. 使用 Session 对象的 ` file `_ 方法访问会话的 ` FileOperation `_ 对象。

  3. 调用 FileOperation 对象的 ` downloadStream `_ 方法。

    传入包含要下载的数据的暂存区文件的完整路径。使用 decompress 实参,指定是否要压缩文件中的数据。

例如:

import java.io.InputStream;
...
boolean isDataCompressed = true;
String pathToFileOnStage = "@myStage/path/file";
InputStream is = session.file().downloadStream(pathToFileOnStage, isDataCompressed);
Copy

为暂存区中的文件设置 DataFrame

本部分介绍如何为 Snowflake 暂存区中的文件设置 DataFrame。创建此 DataFrame 后,您可以使用 DataFrame 来执行以下操作:

要为 Snowflake 暂存区中的文件设置 DataFrame,请使用 DataFrameReader 类:

  1. 验证您是否具有以下权限:

    • 访问暂存区中文件的权限

    • 以下其中一项:

      • 架构的 CREATE TABLE 权限(如果您计划指定用于确定如何从暂存文件复制数据的 复制选项 )。

      • 否则确保您具备架构的 CREATE FILE FORMAT 权限。

  2. 调用 Session 类中的 read 方法,以访问 DataFrameReader 对象。

  3. 如果文件采用 CSV 格式,请描述文件中的字段。要这样做,请执行以下操作:

    1. 创建一个 ` StructType `_ 对象,该对象包含描述文件中字段的 ` StructField `_ 对象的数组。

    2. 对于每个 StructField 对象,请指定以下内容:

      • 字段的名称。

      • 字段的数据类型(指定为 com.snowflake.snowpark_java.types 包中的对象)。

      • 字段是否可为 null。

      例如:

      import com.snowflake.snowpark_java.types.*;
      ...
      
      StructType schemaForDataFile = StructType.create(
        new StructField("id", DataTypes.StringType, true),
        new StructField("name", DataTypes.StringType, true));
      
      Copy
    3. 调用 DataFrameReader 对象中的 schema 方法,传入 StructType 对象。

      例如:

      DataFrameReader dfReader = session.read().schema(schemaForDataFile);
      
      Copy

      schema 方法会返回一个 DataFrameReader 对象,该对象配置为读取包含指定字段的文件。

      请注意,对于其他格式(如 JSON)的文件,无需执行此操作。对于这些文件, DataFrameReader 会将数据视为字段名称为 $1、类型为 VARIANT 的单个字段处理。

  4. 如果需要指定有关数据读取方式的其他信息(例如,数据经过压缩,或者 CSV 文件使用分号而非逗号来分隔字段),请调用 ` DataFrameReader.option `_ 或 ` DataFrameReader.options `_ 方法。

    传入要设置的选项的名称和值。可以设置以下类型的选项:

    • 有关 CREATE FILE FORMAT 的文档

    • COPY INTO TABLE 文档

      请注意,当您 将数据检索到 DataFrame 中时,设置复制选项可能造成费用更昂贵的执行策略。

    下面的示例设置了 DataFrameReader 对象,以查询未压缩且使用分号作为字段分隔符的 CSV 文件中的数据。

    dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE");
    
    Copy

    option 方法返回使用指定选项进行了配置的 DataFrameReader 对象。

    若要设置多个选项,可以 链接多个调用option 方法(如上面的示例所示),或是调用 ` DataFrameReader.options`_ 方法,并传入选项名称和值的 Map

  5. 调用与文件格式相对应的方法。您可以调用以下方法之一:

    • ` DataFrameReader.avro `_

    • ` DataFrameReader.csv `_

    • ` DataFrameReader.json `_

    • ` DataFrameReader.orc `_

    • ` DataFrameReader.parquet `_

    • ` DataFrameReader.xml `_

    调用这些方法时,传入要读取的文件的暂存区位置。例如:

    DataFrame df = dfReader.csv("@mystage/myfile.csv");
    
    Copy

    若要指定以相同前缀开头的多个文件,请在暂存区名称后指定前缀。例如,要从暂存区 @mystage 加载具有前缀 csv_ 的文件,请执行以下操作:

    DataFrame df = dfReader.csv("@mystage/csv_");
    
    Copy

    与文件格式相对应的方法会返回文件的一个 ` CopyableDataFrame _ 对象。:code:`CopyableDataFrame 扩展了 DataFrame,并提供用于处理暂存文件中的数据的其他方法。

  6. 调用操作方法,以执行以下操作:

    与表的 DataFrames 一样,在您调用 :ref:` 操作方法 <label-snowpark_java_dataframe_action_method>` 之前,不会将数据检索到 DataFrame 中。

将数据从文件加载到 DataFrame 中

为暂存区中的文件设置 DataFrame 之后,可以将文件中的数据加载到 DataFrame 中:

  1. 使用 DataFrame 对象方法,对 :ref:` 数据集执行所需的任何转换 <label-snowpark_java_dataframe_transform>` (例如,选择特定字段、筛选行等)。

    例如,要从名为 mystage 的暂存区中名为 data.json 的 JSON 文件中提取 color 元素,请运行以下语句:

    DataFrame df = session.read().json("@mystage/data.json").select(Functions.col("$1").subField("color"));
    
    Copy

    如前所述,对于格式并非 CSV 的文件(例如 JSON 格式的文件), DataFrameReader 会将文件中的数据视为名为 $1 的单个 VARIANT 列。

  2. 调用 DataFrame.collect 方法以加载数据。例如:

    Row[] results = df.collect();
    
    Copy

将数据从文件复制到表中

为暂存区中的文件设置 DataFrame 之后,可以调用 copyInto 方法将数据复制到表中。此方法执行 COPY INTO <table> 命令。

备注

在调用 copyInto 之前,无需调用 collect 方法。在调用 copyInto 之前,文件中的数据不需要在 DataFrame 之中。

例如,以下代码将 myFileStage 指定的 CSV 文件中的数据加载到 mytable 表中。由于数据位于 CSV 文件中,代码还必须 描述文件中的字段。为此,该示例调用 DataFrameReader 对象的 ` schema _ 方法,并传入 ` StructType `_ 对象 (``schemaForDataFile`),其中包含了描述字段的 ` StructField `_ 对象的数组。

CopyableDataFrame copyableDf = session.read().schema(schemaForDataFile).csv("@mystage/myfile.csv");
copyableDf.copyInto("mytable");
Copy

将 DataFrame 保存到暂存区上的文件

备注

此功能是在 Snowpark 1.5.0 中引入的。

如果需要将 DataFrame 保存到暂存区上的文件中,可以调用与文件格式对应的 ` DataFrameWriter _ 方法(例如 :code:`csv 方法用于写入 CSV 文件),并传入应将文件保存到的暂存区位置。这些 DataFrameWriter 方法执行 COPY INTO <location> 命令。

备注

在调用这些 DataFrameWriter 方法之前,无需调用 collect 方法。在调用这些方法之前,文件中的数据不需要在 DataFrame 之中。

要将 DataFrame 的内容保存到暂存区上的文件中,请执行以下操作:

  1. 调用 DataFrame 对象的 ` write _ 方法,以获取 ` DataFrameWriter `_ 对象。例如,要获取代表名为 ``sample_product_data` 的表的 DataFrame 的 DataFrameWriter 对象,请运行以下语句:

    DataFrameWriter dfWriter = session.table("sample_product_data").write();
    
    Copy
  2. 如果要覆盖文件的内容(如果文件存在),请调用 DataFrameWriter 对象的 ` mode _ 方法,并传入 :code:`SaveMode.Overwrite

    否则,默认情况下,如果暂存区上的指定文件已存在, DataFrameWriter 会报告错误。

    mode 方法返回使用指定模式进行了配置的相同 DataFrameWriter 对象。

    例如,要指定 DataFrameWriter 应覆盖暂存区上的文件,请运行以下语句:

    dfWriter = dfWriter.mode(SaveMode.Overwrite);
    
    Copy
  3. 如果需要指定有关应如何保存数据的其他信息(例如,应压缩数据,或者要使用分号来分隔 CSV 文件中的字段),请调用 ` DataFrameWriter.option `_ 方法或 ` DataFrameWriter.options `_ 方法。

    传入要设置的选项的名称和值。可以设置以下类型的选项:

    请注意,不能使用 option 方法设置以下选项:

    • TYPE 格式类型选项。

    • OVERWRITE 复制选项。若要设置此选项,请改为调用 mode 方法(如上一步所述)。

    下面的示例设置 DataFrameWriter 对象,以使用分号(而非逗号)作为字段分隔符,以未压缩的形式将数据保存到 CSV 文件中。

    dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE");
    
    Copy

    option 方法返回使用指定选项进行了配置的 DataFrameWriter 对象。

    要设置多个选项,可以如上例所示 链接多个调用 (调用的是 option 方法),或者调用 ` DataFrameWriter.options`_ 方法,并传入选项名称和值的 Map

  4. 若要返回有关已保存的各文件的详细信息,请将 DETAILED_OUTPUT 复制选项 设置为 TRUE

    默认情况下,DETAILED_OUTPUTFALSE,这代表该方法会返回一行输出,其中包含 "rows_unloaded""input_bytes""output_bytes" 字段。

    DETAILED_OUTPUT 设置为 TRUE 时,该方法会为所保存的每个文件返回一行输出。每行都包含 FILE_NAMEFILE_SIZEROW_COUNT 字段。

  5. 调用文件格式对应的方法,将数据保存到文件中。您可以调用以下方法之一:

    • ` DataFrameWriter.csv `_

    • ` DataFrameWriter.json `_

    • ` DataFrameWriter.parquet `_

    调用这些方法时,传入应写入数据的文件的暂存区位置(如 @mystage)。

    默认情况下,该方法会将数据保存到名称带有前缀 data_ 的文件中(例如 @mystage/data_0_0_0.csv)。如果希望使用不同的前缀命名文件,请在暂存区名称后指定前缀。例如:

    WriteFileResult writeFileResult = dfWriter.csv("@mystage/saved_data");
    
    Copy

    本示例将 DataFrame 的内容保存到名称以 saved_data 为前缀(例如 @mystage/saved_data_0_0_0.csv)的文件中。

  6. 检查返回的 ` WriteFileResult `_ 对象,以获取有关写入文件的数据量的信息。

    WriteFileResult 对象,您可以访问 COPY INTO <location> 命令生成的输出:

    • 要以 Row 对象数组的形式访问输出行,请调用 getRows 方法。

    • 要确定行中有哪些字段,请调用 getSchema 方法,该方法将返回描述行中字段的 StructType

    例如,要在输出行中打印出字段名称和值,请运行以下语句:

    WriteFileResult writeFileResult = dfWriter.csv("@mystage/saved_data");
    Row[] rows = writeFileResult.getRows();
    StructType schema = writeFileResult.getSchema();
    for (int i = 0 ; i < rows.length ; i++) {
      System.out.println("Row:" + i);
      Row row = rows[i];
      for (int j = 0; j < schema.size(); j++) {
        System.out.println(schema.get(j).name() + ": " + row.get(j));
      }
    }
    
    Copy

以下示例使用 DataFrame 将名为 car_sales 的表的内容保存到 JSON 文件,这些文件位于 @mystage 暂存区中,名称带有 saved_data 前缀(例如 @mystage/saved_data_0_0_0.json)。示例代码:

  • 覆盖文件(如果暂存区中已存在该文件)。

  • 返回有关保存操作的详细输出。

  • 保存未压缩的数据。

最后,示例代码在返回的输出行中打印出每个字段和值:

DataFrame df = session.table("car_sales");
WriteFileResult writeFileResult = df.write().mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data");
Row[] rows = writeFileResult.getRows();
StructType schema = writeFileResult.getSchema();
for (int i = 0 ; i < rows.length ; i++) {
  System.out.println("Row:" + i);
  Row row = rows[i];
  for (int j = 0; j < schema.size(); j++) {
    System.out.println(schema.get(j).name() + ": " + row.get(j));
  }
}
Copy

使用半结构化数据

使用 DataFrame,您可以查询和访问 :doc:` 半结构化数据 </user-guide/semistructured-intro>` (例如 JSON 数据)。接下来的几个部分将介绍如何在 DataFrame 中处理半结构化数据。

备注

这些部分中的示例使用 示例中使用的示例数据 中的示例数据。

遍历半结构化数据

若要引用半结构化数据中的特定字段或元素,请使用 ` Column `_ 对象的以下方法:

  • 使用 subField("<field_name>") 返回 OBJECT(或包含 OBJECT 的 VARIANT)中字段的 Column 对象。

  • 使用 subField(<index>) 返回 ARRAY(或包含 ARRAY 的 VARIANT )中元素的 Column 对象。

备注

如果路径中的字段名称或元素不规则,并且导致难以使用 Column.apply 方法,则可以改用 ` Functions.get _、 Functions.get_ignore_case `_ 或 ` Functions.get_path `_。

例如,以下代码会从 :ref:` 示例数据 <label-sample_data_semistructured_data>` 的 src 列的对象内,选择 dealership 字段:

DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("dealership")).show();
Copy

该代码会打印以下输出:

----------------------------
|"""SRC""['DEALERSHIP']"   |
----------------------------
|"Valley View Auto Sales"  |
|"Tindel Toyota"           |
----------------------------
Copy

备注

DataFrame 中的值放在双引号之间,因为这些值以字符串字面量的形式返回。若要将这些值的类型转换为特定类型,请参阅 显式转换半结构化数据中的值

还可以 链接多个方法调用,以遍历特定字段或元素的路径。

例如,以下代码选择 salesperson 对象中的 name 字段:

DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("salesperson").subField("name")).show();
Copy

该代码会打印以下输出:

------------------------------------
|"""SRC""['SALESPERSON']['NAME']"  |
------------------------------------
|"Frank Beasley"                   |
|"Greg Northrup"                   |
------------------------------------
Copy

再举一个例子,下面的代码选择 vehicle 字段的第一个元素,其中包含车辆数组。该示例还会选择第一个元素中的 price 字段。

DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("vehicle").subField(0)).show();
df.select(Functions.col("src").subField("vehicle").subField(0).subField("price")).show();
Copy

该代码会打印以下输出:

---------------------------
|"""SRC""['VEHICLE'][0]"  |
---------------------------
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "paint protection"   |
|  ],                     |
|  "make": "Honda",       |
|  "model": "Civic",      |
|  "price": "20275",      |
|  "year": "2017"         |
|}                        |
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "rust proofing",     |
|    "fabric protection"  |
|  ],                     |
|  "make": "Toyota",      |
|  "model": "Camry",      |
|  "price": "23500",      |
|  "year": "2017"         |
|}                        |
---------------------------

------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']"  |
------------------------------------
|"20275"                           |
|"23500"                           |
------------------------------------
Copy

如果路径中的字段名称或元素不规则,并且导致难以使用 Column.subField 方法,则可以使用 ` Functions.get _、 Functions.get_ignore_case _ ` Functions.get_path `_ 函数来代替 :code:`apply 方法。

例如,以下代码行都会打印对象中指定字段的值:

df.select(Functions.get(Functions.col("src"), Functions.lit("dealership"))).show();
df.select(Functions.col("src").subField("dealership")).show();
Copy

同样,以下代码行都会打印对象中指定路径处的字段值:

df.select(Functions.get_path(Functions.col("src"), Functions.lit("vehicle[0].make"))).show();
df.select(Functions.col("src").subField("vehicle").subField(0).subField("make")).show();
Copy

显式转换半结构化数据中的值

默认情况下,字段和元素的值以字符串字面量(包括双引号)的形式返回,如上面的示例所示。

若要避免意外结果,请调用 :ref:` cast <label-snowpark_java_dataframe_cols_cast>` 方法,将值转换为特定类型。例如,以下代码会打印出未经类型转换和经过类型转换的值:

// Import the objects for the data types, including StringType.
import com.snowflake.snowpark_java.types.*;
...
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("salesperson").subField("id")).show();
df.select(Functions.col("src").subField("salesperson").subField("id").cast(DataTypes.StringType)).show();
Copy

该代码会打印以下输出:

----------------------------------
|"""SRC""['SALESPERSON']['ID']"  |
----------------------------------
|"55"                            |
|"274"                           |
----------------------------------

---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)"  |
---------------------------------------------------
|55                                               |
|274                                              |
---------------------------------------------------
Copy

将对象数组展平为行

如果需要将半结构化数据“展平”为 DataFrame(例如,为数组中的每个对象生成一行),请调用 flatten 方法。此方法与 FLATTEN SQL 函数等效。如果传入对象或数组的路径,该方法会返回一个 DataFrame,其中包含对象或数组中各字段或元素的行。

例如,在 :ref:` 示例数据 <label-sample_data_semistructured_data>` 中, src:customer 是一个包含有关客户的信息的对象数组。每个对象都包含 nameaddress 字段。

如果将此路径传递给 flatten 函数:

DataFrame df = session.table("car_sales");
df.flatten(Functions.col("src").subField("customer")).show();
Copy

该方法返回一个 DataFrame:

----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC"                                      |"SEQ"  |"KEY"  |"PATH"  |"INDEX"  |"VALUE"                            |"THIS"                               |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{                                          |1      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "San Francisco, CA",  |  {                                  |
|    {                                      |       |       |        |         |  "name": "Joyce Ridgely",         |    "address": "San Francisco, CA",  |
|      "address": "San Francisco, CA",      |       |       |        |         |  "phone": "16504378889"           |    "name": "Joyce Ridgely",         |
|      "name": "Joyce Ridgely",             |       |       |        |         |}                                  |    "phone": "16504378889"           |
|      "phone": "16504378889"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Valley View Auto Sales",  |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "55",                            |       |       |        |         |                                   |                                     |
|    "name": "Frank Beasley"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "paint protection"                 |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Honda",                     |       |       |        |         |                                   |                                     |
|      "model": "Civic",                    |       |       |        |         |                                   |                                     |
|      "price": "20275",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
|{                                          |2      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "New York, NY",       |  {                                  |
|    {                                      |       |       |        |         |  "name": "Bradley Greenbloom",    |    "address": "New York, NY",       |
|      "address": "New York, NY",           |       |       |        |         |  "phone": "12127593751"           |    "name": "Bradley Greenbloom",    |
|      "name": "Bradley Greenbloom",        |       |       |        |         |}                                  |    "phone": "12127593751"           |
|      "phone": "12127593751"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Tindel Toyota",           |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "274",                           |       |       |        |         |                                   |                                     |
|    "name": "Greg Northrup"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "rust proofing",                   |       |       |        |         |                                   |                                     |
|        "fabric protection"                |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Toyota",                    |       |       |        |         |                                   |                                     |
|      "model": "Camry",                    |       |       |        |         |                                   |                                     |
|      "price": "23500",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
----------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

从 DataFrame 中,您可以从 VALUE 字段中的每个对象选择 nameaddress 字段:

df.flatten(Functions.col("src").subField("customer")).select(Functions.col("value").subField("name"), Functions.col("value").subField("address")).show();
Copy
-------------------------------------------------
|"""VALUE""['NAME']"   |"""VALUE""['ADDRESS']"  |
-------------------------------------------------
|"Joyce Ridgely"       |"San Francisco, CA"     |
|"Bradley Greenbloom"  |"New York, NY"          |
-------------------------------------------------
Copy

以下代码 将值类型转换为特定类型 并更改列的名称,从而补充了上一个示例:

df.flatten(Functions.col("src").subField("customer")).select(Functions.col("value").subField("name").cast(DataTypes.StringType).as("Customer Name"), Functions.col("value").subField("address").cast(DataTypes.StringType).as("Customer Address")).show();
Copy
-------------------------------------------
|"Customer Name"     |"Customer Address"  |
-------------------------------------------
|Joyce Ridgely       |San Francisco, CA   |
|Bradley Greenbloom  |New York, NY        |
-------------------------------------------
Copy

执行 SQL 语句

若要执行您指定的 SQL 语句,请调用 Session 类中的 sql 方法,然后传入要执行的语句。该方法返回一个 DataFrame。

请注意,在您 :ref:` 调用操作方法 <label-snowpark_java_dataframe_action_method>` 之前, SQL 语句不会执行。

import java.util.Arrays;

// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
DataFrame dfStageFiles  = session.sql("ls @myStage");
Row[] files = dfStageFiles.collect();
System.out.println(Arrays.toString(files));

// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect();

DataFrame tableDf = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
// Get the count of rows from the table.
long numRows = tableDf.count();
System.out.println("Count: " + numRows);
Copy

如果要 :ref:` 调用方法来转换 DataFrame <label-snowpark_java_dataframe_transform>` (例如 filter、select 等),请注意,仅当基础 SQL 语句是 SELECT 语句时,这些方法才有效。其他类型的 SQL 语句不支持转换方法。

import java.util.Arrays;

DataFrame df = session.sql("select id, category_id, name from sample_product_data where id > 10");
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
Row[] results = df.filter(Functions.col("category_id").lt(Functions.lit(10))).select(Functions.col("id")).collect();
System.out.println(Arrays.toString(results));

// In this example, the underlying SQL statement is not a SELECT statement.
DataFrame dfStageFiles = session.sql("ls @myStage");
// Calling the filter method results in an error.
dfStageFiles.filter(...);
Copy
语言: 中文