在 Snowpark Scala 中使用 DataFrames

在 Snowpark 中,主要通过 DataFrame 来查询和处理数据。本主题说明如何使用 DataFrames。

本主题内容:

要检索和操作数据,需要使用 ` DataFrame `_ 类。DataFrame 表示延迟评估的关系型数据集,延迟评估是指仅在触发特定操作时执行。从某种意义上说, DataFrame 就像一个需要评估才能检索数据的查询。

要将数据检索到 DataFrame 之中,请执行以下步骤:

  1. 构造 DataFrame,为数据集指定数据源

    例如,您可以创建一个 DataFrame,以保存来自表和外部 CSV 文件的数据或执行 SQL 语句时产生的数据。

  2. 指定 DataFrame 中的数据集应如何转换

    例如,可以指定应该选择哪些列、如何筛选行、如何对结果进行排序和分组等。

  3. 执行语句以将数据检索到 DataFrame 之中

    为了将数据检索到 DataFrame 之中,必须调用执行操作的方法(例如 collect() 方法)。

接下来的部分将更详细地介绍这些步骤。

设置本部分的示例

本部分的一些示例使用 DataFrame 查询名为 sample_product_data 的表。若要运行这些示例,可以执行以下 SQL 语句,以创建此表并用一些数据来填充此表:

CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
INSERT INTO sample_product_data VALUES
    (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
    (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
    (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
    (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
    (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
    (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
    (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
    (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
    (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
    (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
    (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
    (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);
Copy

要验证表是否已创建,请运行:

SELECT * FROM sample_product_data;
Copy

构造 DataFrame

要构造 DataFrame,可以使用 Session 类中的方法。以下每种方法都基于不同类型的数据源构造 DataFrame :

  • 要基于表、视图或流中的数据创建 DataFrame,请调用 table 方法:

    // Create a DataFrame from the data in the "sample_product_data" table.
    val dfTable = session.table("sample_product_data")
    
    // To print out the first 10 rows, call:
    //   dfTable.show()
    
    Copy

    备注

    session.table 方法返回一个 Updatable 对象。Updatable 扩展了 DataFrame,并提供了用于处理表中数据的其他方法(例如,用于更新和删除数据的方法)。请参阅 更新、删除和合并表中的行

  • 要基于一系列值创建 DataFrame,请调用 createDataFrame 方法:

    // Create a DataFrame containing a sequence of values.
    // In the DataFrame, name the columns "i" and "s".
    val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
    
    Copy

    备注

    构造 DataFrame 时,Snowflake 的保留字不能用作有效的列名。有关保留字的列表,请参阅 保留和受限关键字

  • 要创建包含特定值范围的 DataFrame,请调用 range 方法:

    // Create a DataFrame from a range
    val dfRange = session.range(1, 10, 2)
    
    Copy
  • 为暂存区中的文件创建 DataFrame,请调用 read 来获取 DataFrameReader 对象。在 DataFrameReader 对象中,调用与文件中的数据格式对应的方法:

    // Create a DataFrame from data in a stage.
    val dfJson = session.read.json("@mystage2/data1.json")
    
    Copy
  • 要创建 DataFrame 来保存 SQL 查询的结果,请调用 sql 方法:

    // Create a DataFrame from a SQL query
    val dfSql = session.sql("SELECT name from products")
    
    Copy

    注意:虽然可以使用此方法执行 SELECT 语句,以从表和暂存文件中检索数据,但应改用 tableread 方法。在开发工具中, tableread 之类的方法可以提供更好的语法突出显示、错误突出显示和智能代码补全效果。

指定应该如何转换数据集

要指定应选择哪些列,以及应如何对结果进行筛选、排序、分组等,请调用能转换数据集的 DataFrame 方法。要在这些方法中标识列,请使用 col 函数或计算结果为列的表达式。(请参阅 指定列和表达式。)

例如:

  • 若要指定应返回的行,请调用 filter 方法:

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame for the rows with the ID 1
    // in the "sample_product_data" table.
    //
    // This example uses the === operator of the Column object to perform an
    // equality check.
    val df = session.table("sample_product_data").filter(col("id") === 1)
    df.show()
    
    Copy
  • 若要指定应选择的列,请调用 select 方法:

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame that contains the id, name, and serial_number
    // columns in te "sample_product_data" table.
    val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number"))
    df.show()
    
    Copy

每个方法都返回一个经过转换的新 DataFrame 对象。(该方法不会影响原始 DataFrame 对象。)这就意味着,如果要应用多个转换,可以 将多个方法调用链接起来,基于前一个方法调用所返回的新 DataFrame 对象来调用每个后续转换方法。

请注意,这些转换方法不会从 Snowflake 数据库中检索数据。( 执行操作以计算 DataFrame 中描述的操作方法会执行数据检索。)转换方法只是指定应如何构造 SQL 语句。

指定列和表达式

调用这些转换方法时,可能需要指定列或者使用列的表达式。例如,调用 select 方法时,需要指定应选择的列。

要引用列,请通过调用 com.snowflake.snowpark.functions 对象中的 ` col `_ 函数来创建 ` Column `_ 对象。

// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

val dfProductInfo = session.table("sample_product_data").select(col("id"), col("name"))
dfProductInfo.show()
Copy

备注

要为字面量创建 Column 对象,请参阅 将字面量用作列对象

指定筛选器、投影、联接条件等时,可以在表达式中使用 Column 对象。例如:

  • 可以将 Column 对象与 filter 方法一起使用,以指定筛选条件:

    // Specify the equivalent of "WHERE id = 20"
    // in an SQL SELECT statement.
    df.filter(col("id") === 20)
    
    Copy
    // Specify the equivalent of "WHERE a + b < 10"
    // in an SQL SELECT statement.
    df.filter((col("a") + col("b")) < 10)
    
    Copy
  • 可以将 Column 对象与 select 方法一起使用,以定义别名:

    // Specify the equivalent of "SELECT b * 10 AS c"
    // in an SQL SELECT statement.
    df.select((col("b") * 10) as "c")
    
    Copy
  • 可以将 Column 对象与 join 方法一起使用,以定义联接条件:

    // Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y"
    // in an SQL SELECT statement.
    dfX.join(dfY, col("a_in_X") === col("b_in_Y"))
    
    Copy

引用不同 DataFrames 中的列

引用两个不同 DataFrame 对象中具有相同名称的列时(例如,基于该列联接 DataFrames ),可以在一个 DataFrame 对象中使用 DataFrame.col 方法引用该对象中的列(例如 df1.col("name")df2.col("name"))。

下面的示例演示了如何使用 DataFrame.col 方法来引用特定 DataFrame 中的列。该示例联接两个 DataFrame 对象,两者均具有名为 key 的列。该示例使用 Column.as 方法来更改新创建 DataFrame 中的列名称。

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))
Copy

使用 apply 方法引用列

作为 DataFrame.col 方法的替代方案,可以使用 DataFrame.apply 方法引用特定 DataFrame 中的列。与 DataFrame.col 方法一样, DataFrame.apply 方法接受列名作为输入并返回一个 Column 对象。

请注意,当对象在 Scala 中具有 apply 方法时,可通过像调用函数一样调用对象,来调用 apply 方法。例如,要调用 df.apply("column_name"),只需写成 df("column_name") 即可。以下调用是等效的:

  • df.col("<column_name>")

  • df.apply("<column_name>")

  • df("<column_name>")

以下示例与前面的示例相同,但使用 DataFrame.apply 方法引用联接操作中的列:

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))
Copy

使用列对象的简写

作为使用 col 函数的替代方法,您可以通过以下方式之一引用列:

  • 在带引号的列名称前面使用美元符号 ($"column_name")。

  • 在不带引号的列名称前面使用撇号(单引号)('column_name)。

为此,请在创建 Session 对象后从 implicits 对象导入名称:

val session = Session.builder.configFile("/path/to/properties").create

// Import this after you create the session.
import session.implicits._

// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)

// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)
Copy

将对象标识符(表名称、列名称等)放在双引号里

您指定的数据库、架构、表和暂存区的名称必须符合 Snowflake 标识符要求。指定名称时,Snowflake 会将该名称视为大写形式。例如,以下调用是等效的:

// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))
Copy

如果名称不符合标识符要求,则必须将名称放在双引号里 (")。对于 Scala 字符串字面量中的双引号字符,请使用反斜杠 (\) 进行转义。例如,以下表名称并非以字母或下划线开头,因此必须将该名称放在双引号里:

val df = session.table("\"10tablename\"")
Copy

请注意,指定 :emph:` 列 ` 的名称时,不需要将名称放在双引号里。如果列名称不符合标识符要求,Snowpark 库会自动将列名称放在双引号里:

// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\""))

// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))
Copy

如果已在列名称两侧添加了双引号,则该库不会在列名称两侧插入其他双引号。

在某些情况下,列名称可能包含双引号字符:

describe table quoted;
+------------------------+ ...
| name                   | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted"   | ...
+------------------------+ ...
Copy

标识符要求 中所述,对于带双引号的标识符中的每个双引号字符,都必须使用两个双引号字符(例如 "name_with_""air""_quotes""""column_name_quoted"""):

val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()
Copy

请注意,如果将标识符放在双引号里(无论是您显式添加了引号,还是由库为您添加了引号), Snowflake 都会将标识符视为区分大小写

// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))
Copy

将字面量用作列对象

要在传入 Column 对象的方法中使用字面量,请将字面量传递给 com.snowflake.snowpark.functions 对象中的 lit 函数,从而为字面量创建 Column 对象。例如:

// Import for the lit and col functions.
import com.snowflake.snowpark.functions._

// Show the first 10 rows in which num_items is greater than 5.
// Use `lit(5)` to create a Column object for the literal 5.
df.filter(col("num_items").gt(lit(5))).show()
Copy

如果字面量是 Scala 中的浮点或双精度值(例如 0.05 ` 默认情况下被视为双精度值 <https://docs.scala-lang.org/overviews/scala-book/built-in-types.html (https://docs.scala-lang.org/overviews/scala-book/built-in-types.html)>`_),则 Snowpark 库会生成 SQL,从而隐式将该值的类型转换为相应的 Snowpark 数据类型(例如 0.05::DOUBLE)。这可能会产生与指定的确切数字不同的近似值。

例如,以下代码不显示匹配的行,即使筛选器(匹配大于或等于 0.05 的值)应匹配 DataFrame 中的行:

// Create a DataFrame that contains the value 0.05.
val df = session.sql("select 0.05 :: Numeric(5, 2) as a")

// Applying this filter results in no matching rows in the DataFrame.
df.filter(col("a") <= lit(0.06) - lit(0.01)).show()
Copy

问题在于 lit(0.06)lit(0.01)0.060.01 产生近似值而不是确切值。

若要避免此问题,可使用下列方法之一:

  • 选项 1:将字面量的类型转换为要使用的 Snowpark 类型。例如,要使用精度为 5、小数位数为 2 的 NUMBER,请运行以下语句:

    df.filter(col("a") <= lit(0.06).cast(new DecimalType(5, 2)) - lit(0.01).cast(new DecimalType(5, 2))).show()
    
    Copy
  • 选项 2:将值的类型转换为要使用的类型,然后再将值传递给 lit 函数。例如,如果要使用 BigDecimal 类型 (https://docs.scala-lang.org/overviews/scala-book/built-in-types.html#bigint-and-bigdecimal),请执行如下操作:

    df.filter(col("a") <= lit(BigDecimal(0.06)) - lit(BigDecimal(0.01))).show()
    
    Copy

将列对象的类型转换为特定类型

要将 Column 对象的类型转换为特定类型,请调用 ` Column.cast _ 方法,然后从 `com.snowflake.snowpark.types package `_ 传入类型对象。例如,要将字面量转换为精度为 5、小数位数为 2 的 :ref:`label-data_type_number,请运行以下语句:

// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._

val decimalValue = lit(0.05).cast(new DecimalType(5,2))
Copy

链接多个方法调用

由于每个 转换 DataFrame 对象的方法 都会返回一个应用了转换的新 DataFrame 对象,因此您可以 链接多个方法调用 (link removed),以生成以其他方式转换的新 DataFrame。

下面的示例返回为如下目的而配置的 DataFrame :

  • 查询 sample_product_data 表。

  • 返回 id = 1 的行。

  • 选择 nameserial_number 列。

val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
Copy

在此示例中:

  • session.table("sample_product_data") 返回 sample_product_data 表的 DataFrame。

    尽管 DataFrame 尚且不包含表中的数据,但该对象确实包含表列的定义。

  • filter(col("id") === 1) 返回 sample_product_data 表的 DataFrame(设置为返回 id = 1 的行)。

    请再次注意, DataFrame 尚未包含表中匹配的行。在 :ref:` 调用操作方法 <label-snowpark_dataframe_action_method>` 之前,不会检索匹配的行。

  • select(col("name"), col("serial_number")) 返回一个 DataFrame,包含 sample_product_data 表中 id = 1 的行的 nameserial_number 列。

在链接多个方法调用时,请注意调用顺序非常重要。每个方法调用都返回一个已转换的 DataFrame。确保后续调用使用已转换的 DataFrame。

例如,在下面的代码中,select 方法返回仅包含两列的 DataFrame:nameserial_number。对此 DataFrame 的 filter 方法调用会失败,因为它使用的 id 列不在已转换的 DataFrame 中。

// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") === 1)
Copy

相反,以下代码会成功执行,因为对包含 sample_product_data 表中所有列(包括 id 列)的 DataFrame 调用了 filter() 方法:

// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
Copy

请注意,可能需要按照不同于在 SQL 语句中使用等效关键字(SELECT 和 WHERE)时的顺序执行 selectfilter 方法调用。

限制 DataFrame 中的行数

要限制 DataFrame 中的行数,可以使用 ` DataFrame.limit `_ 转换方法。

Snowpark API 还提供了以下操作方法,可检索和打印出有限数量的行:

  • ` DataFrame.first _ 操作方法(用于执行查询并返回前 :samp:`{n} 行)

  • ` DataFrame.show _ 操作方法(用于执行查询并打印前 :samp:`{n} 行)

这些方法能有效地将 LIMIT 子句添加到执行的 SQL 语句中。

如 :ref:` LIMIT 使用说明 <label-limit_cmd_usage_notes>` 中所述,除非将排序顺序 (ORDER BY) 与 LIMIT 一起指定,否则结果是不确定的。

要使 ORDER BY 子句与 LIMIT 子句一起出现(从而避免 ORDER BY 出现在另一个子查询等地方),必须调用 limit 方法,以限制 sort 方法返回的 DataFrame 中的结果数。

例如,如果 链接多个方法调用

// Limit the number of rows to 5, sorted by parent_id.
var dfSubset = df.sort(col("parent_id")).limit(5);

// Return the first 5 rows, sorted by parent_id.
var arrayOfRows = df.sort(col("parent_id")).first(5)

// Print the first 5 rows, sorted by parent_id.
df.sort(col("parent_id")).show(5)
Copy

检索列定义

要在 DataFrame 的数据集中检索列的定义,请调用 schema 方法。此方法会返回一个 StructType 对象,其中包含 StructField 对象的 Array。每个 StructField 对象都包含一列的定义。

// Get the StructType object that describes the columns in the
// underlying rowset.
val tableSchema = session.table("sample_product_data").schema
println("Schema for sample_product_data: " + tableSchema);
Copy

在返回的 StructType 对象中,列名称始终是规范化的。不带引号的标识符以大写形式返回,带引号的标识符以其定义所用的确切大小写形式返回。

下面的示例创建一个 DataFrame,其中包含名为 ID3rd 的列。对于名为 3rd 的列,Snowpark 库会自动将名称放在双引号里(即 "3rd"),因为 该名称不符合标识符要求

该示例调用 schema 方法,随后对返回的 StructType 对象调用 names 方法,以获取列名称的 ArraySeq。这些名称使用 schema 方法返回的 StructType 进行规范化。

// Create a DataFrame containing the "id" and "3rd" columns.
val dfSelectedColumns = session.table("sample_product_data").select(col("id"), col("3rd"))
// Print out the names of the columns in the schema. This prints out:
//   ArraySeq(ID, "3rd")
println(dfSelectedColumns.schema.names.toSeq)
Copy

联接 DataFrames

若要联接 DataFrame 对象,请调用 ` DataFrame.join `_ 方法。

以下部分说明如何使用 DataFrames 执行联接:

设置联接的示例数据

后面部分中的示例使用示例数据,您可以通过执行以下 SQL 语句来设置这些数据:

create or replace table sample_a (
  id_a integer,
  name_a varchar,
  value integer
);
insert into sample_a (id_a, name_a, value) values
  (10, 'A1', 5),
  (40, 'A2', 10),
  (80, 'A3', 15),
  (90, 'A4', 20)
;
create or replace table sample_b (
  id_b integer,
  name_b varchar,
  id_a integer,
  value integer
);
insert into sample_b (id_b, name_b, id_a, value) values
  (4000, 'B1', 40, 10),
  (4001, 'B2', 10, 5),
  (9000, 'B3', 80, 15),
  (9099, 'B4', null, 200)
;
create or replace table sample_c (
  id_c integer,
  name_c varchar,
  id_a integer,
  id_b integer
);
insert into sample_c (id_c, name_c, id_a, id_b) values
  (1012, 'C1', 10, null),
  (1040, 'C2', 40, 4000),
  (1041, 'C3', 40, 4001)
;
Copy

指定联接的列

利用 DataFrame.join 方法,可以通过以下方式之一指定要使用的列:

  • 指定描述联接条件的列表达式。

  • 指定应用作联接中的公共列的一个或多个列。

以下示例对名为 id_a 的列执行内部联接:

// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
Copy

请注意,该示例使用 DataFrame.col 方法来指定要用于联接的条件。有关此方法的更多信息,请参阅 指定列和表达式

这将打印以下输出:

----------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |
----------------------------------------------------------------------
|10      |A1        |5        |4001    |B2        |10      |5        |
|40      |A2        |10       |4000    |B1        |40      |10       |
|80      |A3        |15       |9000    |B3        |80      |15       |
----------------------------------------------------------------------
Copy
联接结果中重复的相同列名称

在联接产生的 DataFrame 中,Snowpark 库使用在已联接的表中找到的列名称,即使这些列名称在表之间相同也是如此。发生这种情况时,这些列名称将在联接产生的 DataFrame 中重复。要按名称访问重复的列,请对表示列的原始表的 DataFrame 调用 col 方法。(有关指定列的更多信息,请参阅 引用不同 DataFrames 中的列。)

以下示例中的代码联接两个 DataFrames,然后对已联接的 DataFrame 调用 select 方法。此代码通过从表示相应 DataFrame 对象( dfRhsdfLhs)的变量调用 col 方法来指定要选择的列。它使用 as 方法为 select 方法创建的 DataFrame 中的列提供新名称。

val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
val dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"))
dfSelected.show()
Copy

这将打印以下输出:

------------------------------
|"LEFTVALUE"  |"RIGHTVALUE"  |
------------------------------
|5            |5             |
|10           |10            |
|15           |15            |
------------------------------
Copy
在保存或缓存之前删除重复列

请注意,当联接产生的 DataFrame 包含重复的列名称时,必须通过删除重复列或重命名列来移除 DataFrame 中的重复项,然后才能将结果保存到表中或者缓存 DataFrame。对于保存到表或缓存的 DataFrame 中的重复列名称,Snowpark 库会将重复的列名称替换为别名,让它们不再重复。

对于缓存的 DataFrame,下例说明了其输出在以下情况下将如何显示:列名称 ID_AVALUE 在两个表的联接结果中重复,之后,在缓存结果之前未删除重复列或未重命名列。

--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A"  |"NAME_A"  |"l_ZSz7_VALUE"  |"ID_B"  |"NAME_B"  |"r_heec_ID_A"  |"r_heec_VALUE"  |
--------------------------------------------------------------------------------------------------
|10             |A1        |5               |4001    |B2        |10             |5               |
|40             |A2        |10              |4000    |B1        |40             |10              |
|80             |A3        |15              |9000    |B3        |80             |15              |
--------------------------------------------------------------------------------------------------
Copy

执行自然联接

要执行 :ref:` 自然联接 <label-querying_join_natural>` (在具有相同名称的列上联接 DataFrames),请调用 ` DataFrame.naturalJoin `_ 方法。

以下示例在 sample_asample_b 的公共列( id_a 列)上联接这两个表的 DataFrames :

val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
Copy

这将打印以下输出:

---------------------------------------------------
|"ID_A"  |"VALUE"  |"NAME_A"  |"ID_B"  |"NAME_B"  |
---------------------------------------------------
|10      |5        |A1        |4001    |B2        |
|40      |10       |A2        |4000    |B1        |
|80      |15       |A3        |9000    |B3        |
---------------------------------------------------
Copy

指定联接类型

默认情况下, DataFrame.join 方法创建内部联接。要指定其他类型的联接,请将 joinType 实参设置为以下值之一:

联接类型

joinType

内部联接

inner (默认)

左外部联接

left

右外部联接

right

完整外部联接

full

交叉联接

cross

例如:

// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
dfLeftOuterJoin.show()
Copy

这将打印以下输出:

----------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |
----------------------------------------------------------------------
|40      |A2        |10       |4000    |B1        |40      |10       |
|10      |A1        |5        |4001    |B2        |10      |5        |
|80      |A3        |15       |9000    |B3        |80      |15       |
|90      |A4        |20       |NULL    |NULL      |NULL    |NULL     |
----------------------------------------------------------------------
Copy

联接多个表

要联接多个表,请执行以下步骤:

  1. 为每个表创建一个 DataFrame。

  2. 对第一个 DataFrame 调用 DataFrame.join 方法,并传入第二个 DataFrame。

  3. 使用 join 方法返回的 DataFrame 来调用 join 方法,并传入第三个 DataFrame。

可以 链接 join 调用,如下所示:

val dfFirst = session.table("sample_a")
val dfSecond  = session.table("sample_b")
val dfThird = session.table("sample_c")
val dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a") === dfSecond.col("id_a")).join(dfThird, dfFirst.col("id_a") === dfThird.col("id_a"))
dfJoinThreeTables.show()
Copy

这将打印以下输出:

------------------------------------------------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |"ID_C"  |"NAME_C"  |"ID_A"  |"ID_B"  |
------------------------------------------------------------------------------------------------------------
|10      |A1        |5        |4001    |B2        |10      |5        |1012    |C1        |10      |NULL    |
|40      |A2        |10       |4000    |B1        |40      |10       |1040    |C2        |40      |4000    |
|40      |A2        |10       |4000    |B1        |40      |10       |1041    |C3        |40      |4001    |
------------------------------------------------------------------------------------------------------------
Copy

执行自联接

如果需要基于不同的列将表与其自身联接,则不能使用单个 DataFrame 执行自联接。以下使用单个 DataFrame 执行自联接的示例最终失败,因为联接的左侧和右侧都存在 "id" 的列表达式:

// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, col("id") === col("parent_id"))
Copy
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, df("id") === df("parent_id"))
Copy

上面两个示例都失败了,并引发以下异常:

Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
  Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
  Instead, join this DataFrame to a clone() of itself.
Copy

请改用 ` DataFrame.clone `_ 方法创建 DataFrame 对象的克隆,并使用这两个 DataFrame 对象执行联接:

// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
val dfLhs = session.table("sample_product_data")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()

// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))
dfJoined.show()
Copy

如果要对同一列执行自联接,请调用 join 方法,为 USING 子句传入列表达式的 Seq

// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
val df = session.table("sample_product_data");
val dfJoined = df.join(df, Seq("key"))
Copy

执行操作以计算 DataFrame

如前所述, DataFrame 是延迟计算的,也就是说,在您执行操作之前, SQL 语句不会发送到服务器执行。执行操作会导致对 DataFrame 进行计算,并将相应的 SQL 语句发送到服务器执行。

以下部分介绍如何同步和异步地对 DataFrame 执行操作:

同步执行操作

要同步执行操作,请调用以下操作方法之一:

同步执行操作的方法

描述

DataFrame.collect

计算 DataFrame,并将生成的数据集作为 ` Row _ 对象的 :code:`Array 返回。请参阅 返回所有行

DataFrame.toLocalIterator

计算 DataFrame,并返回 ` Row _ 对象的 ` Iterator `_。如果结果集很大,请使用此方法,以避免将所有结果同时加载到内存中。请参阅 :ref:`label-snowpark_dataframe_return_iterator

DataFrame.count

计算 DataFrame 并返回行数。

DataFrame.show

计算 DataFrame,并将行打印到控制台。请注意,此方法将行数限制为 10 行(默认值)。请参阅 打印 DataFrame 中的行

DataFrame.cacheResult

执行查询,创建临时表,并将结果放入表中。该方法返回一个 HasCachedResult 对象,您可以使用该对象访问此临时表中的数据。请参阅 缓存 DataFrame

DataFrame.write.saveAsTable

将 DataFrame 中的数据保存到指定表中。请参阅 将数据保存到表中

DataFrame.write.(csv|json|parquet)

将 DataFrame 保存到暂存区中的指定文件。请参阅 将 DataFrame 保存到暂存区上的文件

DataFrame.read.fileformat.copyInto('tableName')

将 DataFrame 中的数据复制到指定表中。请参阅 将数据从文件复制到表中

Session.table('tableName').delete

删除指定表中的行。请参阅 更新、删除和合并表中的行

Session.table('tableName').update

更新指定表中的行。请参阅 更新、删除和合并表中的行

Session.table('tableName').merge.methods.collect

将行合并到指定的表中。请参阅 更新、删除和合并表中的行

若要执行查询并返回结果数量,请调用 count 方法:

// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")

// Send the query to the server for execution and
// print the count of rows in the table.
println("Rows returned: " + dfProducts.count())
Copy

还可以调用操作方法来:

注意:如果调用 schema 方法来获取 DataFrame 中列的定义,则无需调用操作方法。

异步执行操作

备注

此功能是在 Snowpark 0.11.0 中引入的。

要异步执行操作,请调用 async 方法以返回“异步执行者”对象(例如 DataFrameAsyncActor),然后在该对象中调用异步操作方法。

异步执行者对象的这些操作方法会返回一个 TypedAsyncJob 对象,您可以使用该对象检查异步操作的状态和检索操作的结果。

接下来的部分将介绍如何异步执行操作和检查结果。

了解异步操作的基本流程

可以使用以下方法异步执行操作:

异步执行操作的方法

描述

DataFrame.async.collect

异步计算 DataFrame,以 ` Row _ 对象的 :code:`Array 的形式检索生成的数据集。请参阅 返回所有行

DataFrame.async.toLocalIterator

异步计算 DataFrame,以检索 ` Row _ 对象的 ` Iterator `_。如果结果集很大,请使用此方法,以避免将所有结果同时加载到内存中。请参阅 :ref:`label-snowpark_dataframe_return_iterator

DataFrame.async.count

异步计算 DataFrame 以检索行数。

DataFrame.write.async.saveAsTable

将 DataFrame 中的数据异步保存到指定表中。请参阅 将数据保存到表中

DataFrame.write.async.(csv|json|parquet)

将 DataFrame 保存到暂存区中的指定文件。请参阅 将 DataFrame 保存到暂存区上的文件

DataFrame.read.fileformat.async.copyInto('tableName')

将 DataFrame 中的数据异步复制到指定的表中。请参阅 将数据从文件复制到表中

Session.table('tableName').async.delete

异步删除指定表中的行。请参阅 更新、删除和合并表中的行

Session.table('tableName').async.update

异步更新指定表中的行。请参阅 更新、删除和合并表中的行

Session.table('tableName').merge.methods.async.collect

将行异步合并到指定表中。1.3.0 或更高版本中支持此方法。请参阅 更新、删除和合并表中的行

从返回的 ` TypedAsyncJob `_ 对象中,您可以执行以下操作:

  • 要确定操作是否已完成,请调用 isDone 方法。

  • 要获取与操作对应的查询 ID,请调用 getQueryId 方法。

  • 要返回操作的结果(例如,对于 collect 方法为 Row 对象的 Array ;对于 count 方法为行数),请调用 getResult 方法。

    请注意, getResult 是阻塞调用。

  • 要取消操作,请调用 cancel 方法。

例如,要异步执行查询,并以 Row 对象的 Array 的形式检索结果,请调用 DataFrame.async.collect

// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
val df = session.table("sample_product_data").select(col("id"), col("name"))

// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.collect()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
val results = asyncJob.getResult()
results.foreach(println)
Copy

要异步执行查询并检索结果数,请调用 DataFrame.async.count

// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")

// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.count()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Print the count of rows in the table.
// Note that getResult is a blocking call.
println("Rows returned: " + asyncJob.getResult())
Copy

指定等待的最大秒数

调用 getResult 方法时,可以使用 maxWaitTimeInSeconds 实参指定在尝试检索结果之前等待查询完成的最大秒数。例如:

// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
val results = asyncJob.getResult(10)
Copy

如果省略此实参,该方法将等待 :ref:` snowpark_request_timeout_in_seconds <label-snowpark_request_timeout_in_seconds>` 配置属性指定的最大秒数。( :ref:` 创建 Session 对象 <label-snowpark_creating_session>` 时可以设置此属性。)

通过 ID 访问异步查询

如果有之前提交的异步查询的查询 ID,则可以调用 Session.createAsyncJob 方法创建一个 ` AsyncJob `_ 对象,该对象可用于检查查询状态、检索查询结果或取消查询。

请注意,与 TypedAsyncJob 不同的是, AsyncJob 没有提供用于检索结果的 getResult 方法。如果需要检索结果,请改为调用 getRowsgetIterator 方法。

例如:

val asyncJob = session.createAsyncJob(myQueryId)
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
val rows = asyncJob.getRows()
rows.foreach(println)
Copy

将行检索到 DataFrame 之中

指定 DataFrame 应如何转换 来执行查询并返回结果。可以返回 Array 中的所有行,也可以返回一个 ` Iterator `_,以便逐行遍历访问结果。在后一种情况下,如果数据量较大,则行将按块加载到内存中,以免将大量数据加载到内存中。

返回所有行

要同时返回所有行,请调用 ` DataFrame.collect _ 方法。此方法返回 ` Row `_ 对象的数组。要从行中检索值,请调用 :samp:`get{Type} 方法(例如 getStringgetInt 等)。

例如:

import com.snowflake.snowpark.functions_

val rows = session.table("sample_product_data").select(col("name"), col("category_id")).sort(col("name")).collect()
for (row <- rows) {
  println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
Copy

返回行的迭代器

如果要使用 ` Iterator `_ 遍历结果中的 ` Row `_ 对象,请调用 ` DataFrame.toLocalIterator `_。如果结果中的数据量很大,该方法将按块加载行,以免同时将所有行加载到内存中。

例如:

import com.snowflake.snowpark.functions_

while (rowIterator.hasNext) {
  val row = rowIterator.next()
  println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
Copy

返回前 n

要返回前 n 行,请调用 ` DataFrame.first `_ 方法,并传入要返回的行数。

限制 DataFrame 中的行数 中所述,结果是非确定性的。如果希望结果是确定性的,请对经过排序的 DataFrame 调用此方法 (df.sort().first())。

例如:

import com.snowflake.snowpark.functions_

val df = session.table("sample_product_data")
val rows = df.sort(col("name")).first(5)
rows.foreach(println)
Copy

打印 DataFrame 中的行

要将 DataFrame 中的前 10 行打印到控制台,请调用 ` DataFrame.show `_ 方法。要打印出其他数量的行,请传入要打印的行数。

限制 DataFrame 中的行数 中所述,结果是非确定性的。如果希望结果是确定性的,请对经过排序的 DataFrame 调用此方法 (df.sort().show())。

例如:

import com.snowflake.snowpark.functions_

val df = session.table("sample_product_data")
df.sort(col("name")).show()
Copy

更新、删除和合并表中的行

备注

此功能是在 Snowpark 0.7.0 中引入的。

当您调用 Session.table 为表创建 DataFrame 对象时,该方法将返回一个 Updatable 对象,此对象通过用于更新和删除表中数据的其他方法扩展了 DataFrame。(请参阅 ` Updatable `_。)

如果需要更新或删除表中的行,可以使用 Updatable 类的以下方法:

  • 调用 update,以更新表中的现有行。请参阅 更新表中的行

  • 调用 delete 以删除表中的行。请参阅 删除表中的行

  • 调用 merge,以根据另一个表或子查询中的数据在一个表中插入、更新和删除行。(这等效于 SQL 中的 MERGE 命令。)请参阅 将行合并到一个表中

更新表中的行

对于 update 方法,传入一个 Map,它将要更新的列与要分配给这些列的相应值关联起来。update 会返回一个 UpdateResult 对象,其中包含已更新的行数。(请参阅 ` UpdateResult `_。)

备注

update操作方法,这意味着调用方法会将 SQL 语句发送到服务器执行。

例如,要将名为 count 的列中的值替换为值 1,请运行以下语句:

val updatableDf = session.table("sample_product_data")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")
Copy

上面的示例使用列名称来标识列。您也可以使用列表达式:

val updateResult = updatableDf.update(Map(col("count") -> lit(1)))
Copy

如果仅在满足条件时才应进行更新,则可以将该条件指定为实参。例如,对于 category_id 列的值为 20 的行,要替换名为 count 的列中的值,请运行以下语句:

val updateResult = updatableDf.update(Map(col("count") -> lit(1)), col("category_id") === 20)
Copy

如果需要让条件基于与另一个 DataFrame 对象进行的联接,则可以将该 DataFrame 作为实参传入,并在条件中使用该 DataFrame。例如,对于 category_id 列与 DataFrame dfParts 中的 category_id 匹配的行,要替换名为 count 的列中的值,请运行以下语句:

val updatableDf = session.table("sample_product_data")
val dfParts = session.table("parts")
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), updatableDf("category_id") === dfParts("category_id"), dfParts)
Copy

删除表中的行

对于 delete 方法,可以指定一个条件来标识要删除的行,并且可以使该条件基于与另一个 DataFrame 进行的联接。delete 会返回一个 DeleteResult 对象,其中包含已删除的行数。(请参阅 ` DeleteResult `_。)

备注

delete操作方法,这意味着调用方法会将 SQL 语句发送到服务器执行。

例如,要删除 category_id 列中的值为 1 的行,请运行以下语句:

val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Copy

如果条件引用另一个 DataFrame 中的列,则将该 DataFrame 作为第二个实参传入。例如,要删除 category_id 列与 DataFrame dfParts 中的 category_id 匹配的行,请将 dfParts 作为第二个实参传入:

val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === dfParts("category_id"), dfParts)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Copy

将行合并到一个表中

要根据另一个表或子查询中的值在一个表中插入、更新和删除行(等效于 SQL 中的 MERGE 命令),请执行以下步骤:

  1. 在要将数据合并到的表的 Updatable 对象中,调用 merge 方法,并传入另一个表的 DataFrame 对象和联接条件的列表达式。

    这将返回一个 MergeBuilder 对象,您可以使用该对象指定要对匹配的行和不匹配的行执行的操作(例如插入、更新或删除)。(请参阅 ` MergeBuilder `_。)

  2. 使用 MergeBuilder 对象:

    • 要指定应对匹配的行执行的更新或删除,请调用 whenMatched 方法。

      如果需要指定有关何时应更新或删除行的附加条件,则可以传入该条件的列表达式。

      此方法返回一个 MatchedClauseBuilder 对象,可用于指定要执行的操作。(请参阅 ` MatchedClauseBuilder `_。)

      调用 MatchedClauseBuilder 对象中的 updatedelete 方法,以指定应对匹配的行执行的更新或删除操作。这些方法返回一个 MergeBuilder 对象,可用于指定其他子句。

    • 要指定应在行不匹配时执行的插入,请调用 whenNotMatched 方法。

      如果需要指定有关何时应插入行的附加条件,则可以传入该条件的列表达式。

      此方法返回一个 NotMatchedClauseBuilder 对象,可用于指定要执行的操作。(请参阅 ` NotMatchedClauseBuilder `_。)

      调用 NotMatchedClauseBuilder 对象中的 insert 方法,以指定应在行不匹配时执行的插入操作。这些方法返回一个 MergeBuilder 对象,可用于指定其他子句。

  3. 指定了应执行的插入、更新和删除后,调用 MergeBuilder 对象的 collect 方法,以对表执行指定的插入、更新和删除。

    collect 会返回一个 MergeResult 对象,其中包含已插入、更新和删除的行数。(请参阅 ` MergeResult `_。)

以下示例将 source 表中带有 idvalue 列的行插入到 target 表中(如果 target 表未包含具有匹配 ID 的行):

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenNotMatched.insert(Seq(source("id"), source("value")))
                      .collect()
Copy

以下示例使用 source 表中具有同一 ID 的行的 value 列值更新 target 表中的行:

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenMatched.update(Map("value" -> source("value")))
                      .collect()
Copy

将数据保存到表中

可以将 DataFrame 的内容保存到新表或现有表中。为此,您必须具有以下权限:

  • 对架构的 CREATE TABLE 权限(如果表不存在)。

  • 对表的 INSERT 权限。

要将 DataFrame 的内容保存到表中,请执行以下操作:

  1. 调用 ` DataFrame.write `_ 方法,以获取 ` DataFrameWriter `_ 对象。

  2. 调用 ` DataFrameWriter.mode `_ 方法,传入一个 ` SaveMode `_ 对象,该对象指定有关写入表的首选项:

    • 要插入行,请传入 SaveMode.Append

    • 要覆盖现有表,请传入 SaveMode.Overwrite

    此方法返回使用指定模式进行了配置的相同 DataFrameWriter 对象。

  3. 如果要在现有表中插入行 (SaveMode.Append),并且 DataFrame 中的列名称与表中的列名称匹配,请调用 ` DataFrameWriter.option _ 方法,并传入 :code:"columnOrder"` 和 "name" 作为实参。

    备注

    此方法是在 Snowpark 1.4.0 中引入的。

    默认情况下, columnOrder 选项设置为 "index",这意味着 DataFrameWriter 按照列的出现顺序插入值。例如, DataFrameWriter 在表中第一列内插入 DataFrame 中第一列的值,在表中第二列内插入 DataFrame 中第二列的值,依此类推。

    此方法返回使用指定选项进行了配置的同一个 DataFrameWriter 对象。

  4. 调用 ` DataFrameWriter.`_ saveAsTable 将 DataFrame 的内容保存到指定的表中。

    无需调用单独的方法(例如 collect)来执行将数据保存到表中的 SQL 语句。saveAsTable 是执行 SQL 语句的 操作方法

以下示例使用 DataFrame df 的内容覆盖现有表(由 tableName 变量标识):

df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)
Copy

以下示例将 DataFrame df 中的行插入到现有表(由 tableName 变量标识)。在此示例中,表和 DataFrame 都包含列 c1c2

该示例展示了这两种做法的区别:将 columnOrder 选项设置为 "name" (这会将值插入到与 DataFrame 列同名的表列中),以及使用默认的 columnOrder 选项(这会根据 DataFrame 中列的顺序将值插入到表列中)。

val df = session.sql("SELECT 1 AS c2, 2 as c1")
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write.mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName)
// With the default value of the columnOrder option ("index"), the DataFrameWriter the uses column positions
// and inserts a row with the values (1, 2).
df.write.mode(SaveMode.Append).saveAsTable(tableName)
Copy

基于 DataFrame 创建视图

要基于 DataFrame 创建视图,请调用 ` DataFrame.createOrReplaceView `_ 方法:

df.createOrReplaceView("db.schema.viewName")
Copy

请注意,调用 createOrReplaceView 会立即创建新视图。更重要的是,它不会导致对 DataFrame 进行计算。(在您 :ref:` 执行操作 <label-snowpark_dataframe_action_method>` 之前,不会对 DataFrame 本身进行计算。)

通过调用 createOrReplaceView 创建的视图是持久保留的。如果不再需要该视图,可以 手动删除视图

如果仅需要为会话创建临时视图,请改为调用 ` DataFrame.createOrReplaceTempView `_ 方法:

df.createOrReplaceTempView("db.schema.viewName")
Copy

缓存 DataFrame

在某些情况下,可能需要执行复杂的查询,并将结果保留下来,以供后续操作使用(而不必再次执行相同的查询)。在此类情况下,可以调用 ` DataFrame.cacheResult `_ 方法来缓存 DataFrame 的内容。

此方法会:

  • 运行查询。

    在调用 cacheResult 之前,无需 调用单独的操作方法来检索结果cacheResult 是执行查询的操作方法。

  • 将结果保存在临时表中

    由于 cacheResult 会创建临时表,因此,您必须对正在使用的架构具有 CREATE TABLE 权限。

  • 返回一个 ` HasCachedResult `_ 对象,该对象提供对临时表中结果的访问权。

    由于 HasCachedResult 扩展了 DataFrame,因此,可以对此缓存数据执行一些同样可以对 DataFrame 执行的操作。

备注

由于 cacheResult 执行查询并将结果保存到表中,因此该方法可能会导致计算和存储成本增加。

例如:

import com.snowflake.snowpark.functions_

// Set up a DataFrame to query a table.
val df = session.table("sample_product_data").filter(col("category_id") > 10)
// Retrieve the results and cache the data.
val cachedDf = df.cacheResult()
// Create a DataFrame containing a subset of the cached data.
val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
dfSubset.show()
Copy

请注意,调用该方法时,原始的 DataFrame 不受影响。例如,假设 dfTablesample_product_data 表的 DataFrame :

val dfTempTable = dfTable.cacheResult()
Copy

调用 cacheResult 后, dfTable 仍指向 sample_product_data 表,而且您可以继续使用 dfTable 来查询和更新该表。

要使用临时表中的缓存数据,请使用 dfTempTable (即 cacheResult 返回的 HasCachedResult 对象)。

处理暂存区中的文件

Snowpark 库提供了一些类和方法,可让您通过使用暂存区中的文件 将数据加载到 Snowflake 中,以及 从 Snowflake 卸载数据

备注

为了在暂存区上使用这些类和方法,您必须具有 使用暂存区所需的权限

接下来的部分将说明如何使用这些类和方法:

在暂存区中上传和下载文件

要在暂存区中上传和下载文件,请使用 ` FileOperation `_ 对象:

将文件上传到暂存区

要将文件上传到暂存区,请执行以下步骤:

  1. 验证您是否具有 将文件上传到暂存区的权限

  2. 使用 ` Session.file `_ 访问会话的 ` FileOperation `_ 对象。

  3. 调用 ` FileOperation.put `_ 方法将文件上传到暂存区。

    此方法执行 SQL PUT 命令。

    • 要为 PUT 命令指定任何 可选参数,请创建参数和值的 Map,然后传入该 Map 作为 options 实参。例如:

      // Upload a file to a stage without compressing the file.
      val putOptions = Map("AUTO_COMPRESS" -> "FALSE")
      val putResults = session.file.put("file:///tmp/myfile.csv", "@myStage", putOptions)
      
      Copy
    • localFilePath 实参中,可以使用通配符(*?)来标识要上传的一组文件。例如:

      // Upload the CSV files in /tmp with names that start with "file".
      // You can use the wildcard characters "*" and "?" to match multiple files.
      val putResults = session.file.put("file:///tmp/file*.csv", "@myStage/prefix2")
      
      Copy
  4. 检查 put 方法返回的 ` PutResult _ 对象的 :code:`Array,以确定文件是否上传成功。例如,要打印该文件的文件名和 PUT 操作的状态,请运行以下语句:

    // Print the filename and the status of the PUT operation.
    putResults.foreach(r => println(s"  ${r.sourceFileName}: ${r.status}"))
    
    Copy

从暂存区下载文件

要从暂存区下载文件,请执行以下步骤:

  1. 验证您是否具有 从暂存区下载文件的权限

  2. 使用 ` Session.file `_ 访问会话的 ` FileOperation `_ 对象。

  3. 调用 ` FileOperation.get `_ 方法,从暂存区下载文件。

    此方法执行 SQL GET 命令。

    要为 GET 命令指定任何 可选参数,请创建参数和值的 Map,然后传入该 Map 作为 options 实参。例如:

    // Download files with names that match a regular expression pattern.
    val getOptions = Map("PATTERN" -> s"'.*file_.*.csv.gz'")
    val getResults = session.file.get("@myStage", "file:///tmp", getOptions)
    
    Copy
  4. 检查 get 方法返回的 ` GetResult _ 对象的 :code:`Array,以确定文件是否下载成功。例如,要打印该文件的文件名和 GET 操作的状态,请运行以下语句:

    // Print the filename and the status of the GET operation.
    getResults.foreach(r => println(s"  ${r.fileName}: ${r.status}"))
    
    Copy

使用输入流在暂存区中上传和下载数据

备注

此功能是在 Snowpark 1.4.0 中引入的。

要使用输入流将数据上传到暂存区上的文件,以及从暂存区上的文件下载数据,请使用 ` FileOperation _ 对象的 :code:`uploadStreamdownloadStream 方法:

使用输入流将数据上传到暂存区上的文件

要将数据从 ` java.io.InputStream `_ 对象上传到暂存区上的文件,请执行以下步骤:

  1. 验证您是否具有 将文件上传到暂存区的权限

  2. 使用 ` Session.file `_ 访问会话的 ` FileOperation `_ 对象。

  3. 调用 ` FileOperation.uploadStream `_ 方法。

    传入应写入数据的暂存区上文件的完整路径和 InputStream 对象。此外,使用 compress 实参,指定在上传数据之前是否应压缩数据。

例如:

import java.io.InputStream
...
val compressData = true
val pathToFileOnStage = "@myStage/path/file"
session.file.uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData)
Copy

使用输入流从暂存区上的文件下载数据

要将数据从暂存区上的文件下载到 ` java.io.InputStream `_ 对象,请执行以下操作:

  1. 验证您是否具有 从暂存区下载文件的权限

  2. 使用 ` Session.file `_ 访问会话的 ` FileOperation `_ 对象。

  3. 调用 ` FileOperation.downloadStream `_ 方法。

    传入包含要下载的数据的暂存区文件的完整路径。使用 decompress 实参,指定是否要压缩文件中的数据。

例如:

import java.io.InputStream
...
val isDataCompressed = true
val pathToFileOnStage = "@myStage/path/file"
val is = session.file.downloadStream(pathToFileOnStage, isDataCompressed)
Copy

为暂存区中的文件设置 DataFrame

本部分介绍如何为 Snowflake 暂存区中的文件设置 DataFrame。创建此 DataFrame 后,您可以使用 DataFrame 来执行以下操作:

要为 Snowflake 暂存区中的文件设置 DataFrame,请使用 DataFrameReader 类:

  1. 验证您是否具有以下权限:

    • 访问暂存区中文件的权限

    • 以下其中一项:

      • 架构的 CREATE TABLE 权限(如果您计划指定用于确定如何从暂存文件复制数据的 复制选项 )。

      • 否则确保您具备架构的 CREATE FILE FORMAT 权限。

  2. 调用 Session 类中的 read 方法,以访问 DataFrameReader 对象。

  3. 如果文件采用 CSV 格式,请描述文件中的字段。要这样做,请执行以下操作:

    1. 创建一个 ` StructType `_ 对象,该对象包含描述文件中的字段的一系列 ` StructField `_ 对象。

    2. 对于每个 StructField 对象,请指定以下内容:

      • 字段的名称。

      • 字段的数据类型(指定为 com.snowflake.snowpark.types 包中的对象)。

      • 字段是否可为 null。

      例如:

      import com.snowflake.snowpark.types._
      
      val schemaForDataFile = StructType(
          Seq(
              StructField("id", StringType, true),
              StructField("name", StringType, true)))
      
      Copy
    3. 调用 DataFrameReader 对象中的 schema 方法,传入 StructType 对象。

      例如:

      var dfReader = session.read.schema(schemaForDataFile)
      
      Copy

      schema 方法会返回一个 DataFrameReader 对象,该对象配置为读取包含指定字段的文件。

      请注意,对于其他格式(如 JSON)的文件,无需执行此操作。对于这些文件, DataFrameReader 会将数据视为字段名称为 $1、类型为 VARIANT 的单个字段处理。

  4. 如果需要指定有关数据读取方式的其他信息(例如,数据经过压缩,或者 CSV 文件使用分号而非逗号来分隔字段),请调用 ` DataFrameReader.option `_ 或 ` DataFrameReader.options `_ 方法。

    传入要设置的选项的名称和值。可以设置以下类型的选项:

    • 有关 CREATE FILE FORMAT 的文档

    • COPY INTO TABLE 文档

      请注意,当您 将数据检索到 DataFrame 中时,设置复制选项可能造成费用更昂贵的执行策略。

    下面的示例设置了 DataFrameReader 对象,以查询未压缩且使用分号作为字段分隔符的 CSV 文件中的数据。

    dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    
    Copy

    option 方法返回使用指定选项进行了配置的 DataFrameReader 对象。

    若要设置多个选项,可以 链接多个调用option 方法(如上面的示例所示),或是调用 ` DataFrameReader.options`_ 方法,并传入选项名称和值的 Map

  5. 调用与文件格式相对应的方法。您可以调用以下方法之一:

    • ` DataFrameReader.avro `_

    • ` DataFrameReader.csv `_

    • ` DataFrameReader.json `_

    • ` DataFrameReader.orc `_

    • ` DataFrameReader.parquet `_

    • ` DataFrameReader.xml `_

    调用这些方法时,传入要读取的文件的暂存区位置。例如:

    val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
    
    Copy

    若要指定以相同前缀开头的多个文件,请在暂存区名称后指定前缀。例如,要从暂存区 @mystage 加载具有前缀 csv_ 的文件,请执行以下操作:

    val df = dfReader.csv("@mystage/csv_")
    
    Copy

    与文件格式相对应的方法会返回文件的一个 ` CopyableDataFrame _ 对象。:code:`CopyableDataFrame 扩展了 DataFrame,并提供用于处理暂存文件中的数据的其他方法。

  6. 调用操作方法,以执行以下操作:

    与表的 DataFrames 一样,在您调用 :ref:` 操作方法 <label-snowpark_dataframe_action_method>` 之前,不会将数据检索到 DataFrame 中。

将数据从文件加载到 DataFrame 中

为暂存区中的文件设置 DataFrame 之后,可以将文件中的数据加载到 DataFrame 中:

  1. 使用 DataFrame 对象方法,对 :ref:` 数据集执行所需的任何转换 <label-snowpark_dataframe_transform>` (例如,选择特定字段、筛选行等)。

    例如,要从名为 mystage 的暂存区中名为 data.json 的 JSON 文件中提取 color 元素,请运行以下语句:

    val df = session.read.json("@mystage/data.json").select(col("$1")("color"))
    
    Copy

    如前所述,对于格式并非 CSV 的文件(例如 JSON 格式的文件), DataFrameReader 会将文件中的数据视为名为 $1 的单个 VARIANT 列。

  2. 调用 DataFrame.collect 方法以加载数据。例如:

    val results = df.collect()
    
    Copy

将数据从文件复制到表中

为暂存区中的文件设置 DataFrame 之后,可以调用 ` CopyableDataFrame.copyInto _ 方法将数据复制到表中。此方法执行 :doc:/sql-reference/sql/copy-into-table` 命令。

备注

在调用 copyInto 之前,无需调用 collect 方法。在调用 copyInto 之前,文件中的数据不需要在 DataFrame 之中。

例如,以下代码将 myFileStage 指定的 CSV 文件中的数据加载到 mytable 表中。由于数据位于 CSV 文件中,代码还必须 描述文件中的字段。为此,该示例调用 ` DataFrameReader.schema _ 方法,并传入包含描述字段的 ` StructField `_ 对象序列的 ` StructType `_ 对象 (``csvFileSchema`)。

val df = session.read.schema(csvFileSchema).csv(myFileStage)
df.copyInto("mytable")
Copy

将 DataFrame 保存到暂存区上的文件

备注

此功能是在 Snowpark 1.5.0 中引入的。

如果需要将 DataFrame 保存到暂存区上的文件中,可以调用与文件格式对应的 ` DataFrameWriter _ 方法(例如 :code:`csv 方法用于写入 CSV 文件),并传入应将文件保存到的暂存区位置。这些 DataFrameWriter 方法执行 COPY INTO <location> 命令。

备注

在调用这些 DataFrameWriter 方法之前,无需调用 collect 方法。在调用这些方法之前,文件中的数据不需要在 DataFrame 之中。

要将 DataFrame 的内容保存到暂存区上的文件中,请执行以下操作:

  1. 调用 ` DataFrame.write`_ 方法,以获取 ` DataFrameWriter _ 对象。例如,要获取代表名为 ``sample_product_data` 的表的 DataFrame 的 DataFrameWriter 对象,请运行以下语句:

    dfWriter = session.table("sample_product_data").write
    
    Copy
  2. 如果要覆盖文件的内容(如果文件存在),请调用 ` DataFrameWriter.mode _ 方法,并传入 :code:`SaveMode.Overwrite

    否则,默认情况下,如果暂存区上的指定文件已存在, DataFrameWriter 会报告错误。

    mode 方法返回使用指定模式进行了配置的相同 DataFrameWriter 对象。

    例如,要指定 DataFrameWriter 应覆盖暂存区上的文件,请运行以下语句:

    dfWriter = dfWriter.mode(SaveMode.Overwrite)
    
    Copy
  3. 如果需要指定有关应如何保存数据的其他信息(例如,应压缩数据,或者要使用分号来分隔 CSV 文件中的字段),请调用 ` DataFrameWriter.option`_ 方法或 ` DataFrameWriter.options`_ 方法。

    传入要设置的选项的名称和值。可以设置以下类型的选项:

    请注意,不能使用 option 方法设置以下选项:

    • TYPE 格式类型选项。

    • OVERWRITE 复制选项。若要设置此选项,请改为调用 mode 方法(如上一步所述)。

    下面的示例设置 DataFrameWriter 对象,以使用分号(而非逗号)作为字段分隔符,以未压缩的形式将数据保存到 CSV 文件中。

    dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    
    Copy

    option 方法返回使用指定选项进行了配置的 DataFrameWriter 对象。

    要设置多个选项,可以如上例所示 链接多个调用 (调用的是 option 方法),或者调用 ` DataFrameWriter.options _ 方法,并传入选项名称和值的 :code:`Map

  4. 若要返回有关已保存的各文件的详细信息,请将 DETAILED_OUTPUT 复制选项 设置为 TRUE

    默认情况下,DETAILED_OUTPUTFALSE,这代表该方法会返回一行输出,其中包含 "rows_unloaded""input_bytes""output_bytes" 字段。

    DETAILED_OUTPUT 设置为 TRUE 时,该方法会为所保存的每个文件返回一行输出。每行都包含 FILE_NAMEFILE_SIZEROW_COUNT 字段。

  5. 调用文件格式对应的方法,将数据保存到文件中。您可以调用以下方法之一:

    • ` DataFrameWriter.csv `_

    • ` DataFrameWriter.json `_

    • ` DataFrameWriter.parquet `_

    调用这些方法时,传入应写入数据的文件的暂存区位置(如 @mystage)。

    默认情况下,该方法会将数据保存到名称带有前缀 data_ 的文件中(例如 @mystage/data_0_0_0.csv)。如果希望使用不同的前缀命名文件,请在暂存区名称后指定前缀。例如:

    val writeFileResult = dfWriter.csv("@mystage/saved_data")
    
    Copy

    本示例将 DataFrame 的内容保存到名称以 saved_data 为前缀(例如 @mystage/saved_data_0_0_0.csv)的文件中。

  6. 检查返回的 ` WriteFileResult `_ 对象,以获取有关写入文件的数据量的信息。

    WriteFileResult 对象,您可以访问 COPY INTO <location> 命令生成的输出:

    • 若要以 ` Row _ 对象数组的形式访问输出行,请使用 :code:`rows 值成员。

    • 若要确定行中有哪些字段,请使用 schema 值成员,该成员是描述行中字段的 ` StructType `_。

    例如,要在输出行中打印出字段名称和值,请运行以下语句:

    val writeFileResult = dfWriter.csv("@mystage/saved_data")
    for ((row, index) <- writeFileResult.rows.zipWithIndex) {
      (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
        (structField, element) => println(s"${structField.name}: $element")
      }
    }
    
    Copy

以下示例使用 DataFrame 将名为 car_sales 的表的内容保存到 JSON 文件,这些文件位于 @mystage 暂存区中,名称带有 saved_data 前缀(例如 @mystage/saved_data_0_0_0.json)。示例代码:

  • 覆盖文件(如果暂存区中已存在该文件)。

  • 返回有关保存操作的详细输出。

  • 保存未压缩的数据。

最后,示例代码在返回的输出行中打印出每个字段和值:

val df = session.table("car_sales")
val writeFileResult = df.write.mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data")
for ((row, index) <- writeFileResult.rows.zipWithIndex) {
  println(s"Row: $index")
  (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
    (structField, element) => println(s"${structField.name}: $element")
  }
}
Copy

使用半结构化数据

使用 DataFrame,您可以查询和访问 :doc:` 半结构化数据 </user-guide/semistructured-intro>` (例如 JSON 数据)。接下来的几个部分将介绍如何在 DataFrame 中处理半结构化数据。

备注

这些部分中的示例使用 示例中使用的示例数据 中的示例数据。

遍历半结构化数据

若要引用半结构化数据中的特定字段或元素,请使用 ` Column `_ 对象的以下方法:

  • 使用 ` Column.apply("<field_name>") _,返回 OBJECT (或包含 OBJECT 的 VARIANT)中一个字段的 :code:`Column 对象。

  • 使用 ` Column.apply(<index>) _,返回 ARRAY (或包含 ARRAY 的 VARIANT )中一个元素的 :code:`Column 对象。

备注

如果路径中的字段名称或元素不规则,并且导致难以使用这些 Column.apply 方法,则可以使用 ` get _、 get_ignore_case `_ 或 ` get_path `_ 函数作为替代方法。

使用 apply 方法引用列 中所述,可以省略方法名称 apply

col("column_name")("field_name")
col("column_name")(index)
Copy

例如,以下代码会从 :ref:` 示例数据 <label-sample_data_semistructured_data>` 的 src 列的对象内,选择 dealership 字段:

val df = session.table("car_sales")
df.select(col("src")("dealership")).show()
Copy

该代码会打印以下输出:

----------------------------
|"""SRC""['DEALERSHIP']"   |
----------------------------
|"Valley View Auto Sales"  |
|"Tindel Toyota"           |
----------------------------
Copy

备注

DataFrame 中的值放在双引号之间,因为这些值以字符串字面量的形式返回。若要将这些值的类型转换为特定类型,请参阅 显式转换半结构化数据中的值

还可以 链接多个方法调用,以遍历特定字段或元素的路径。

例如,以下代码选择 salesperson 对象中的 name 字段:

val df = session.table("car_sales")
df.select(col("src")("salesperson")("name")).show()
Copy

该代码会打印以下输出:

------------------------------------
|"""SRC""['SALESPERSON']['NAME']"  |
------------------------------------
|"Frank Beasley"                   |
|"Greg Northrup"                   |
------------------------------------
Copy

再举一个例子,下面的代码选择 vehicle 字段的第一个元素,其中包含车辆数组。该示例还会选择第一个元素中的 price 字段。

val df = session.table("car_sales")
df.select(col("src")("vehicle")(0)).show()
df.select(col("src")("vehicle")(0)("price")).show()
Copy

该代码会打印以下输出:

---------------------------
|"""SRC""['VEHICLE'][0]"  |
---------------------------
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "paint protection"   |
|  ],                     |
|  "make": "Honda",       |
|  "model": "Civic",      |
|  "price": "20275",      |
|  "year": "2017"         |
|}                        |
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "rust proofing",     |
|    "fabric protection"  |
|  ],                     |
|  "make": "Toyota",      |
|  "model": "Camry",      |
|  "price": "23500",      |
|  "year": "2017"         |
|}                        |
---------------------------

------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']"  |
------------------------------------
|"20275"                           |
|"23500"                           |
------------------------------------
Copy

作为 apply 方法的替代方法,如果路径中的字段名称或元素不规则,并且导致难以使用 Column.apply 方法,则可以使用 ` get _、 get_ignore_case `_ 或 ` get_path `_ 函数。

例如,以下代码行都会打印对象中指定字段的值:

df.select(get(col("src"), lit("dealership"))).show()
df.select(col("src")("dealership")).show()
Copy

同样,以下代码行都会打印对象中指定路径处的字段值:

df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
df.select(col("src")("vehicle")(0)("make")).show()
Copy

显式转换半结构化数据中的值

默认情况下,字段和元素的值以字符串字面量(包括双引号)的形式返回,如上面的示例所示。

若要避免意外结果,请调用 :ref:` cast <label-snowpark_dataframe_cols_cast>` 方法,将值转换为特定类型。例如,以下代码会打印出未经类型转换和经过类型转换的值:

// Import the objects for the data types, including StringType.
import com.snowflake.snowpark.types._
...
val df = session.table("car_sales")
df.select(col("src")("salesperson")("id")).show()
df.select(col("src")("salesperson")("id").cast(StringType)).show()
Copy

该代码会打印以下输出:

----------------------------------
|"""SRC""['SALESPERSON']['ID']"  |
----------------------------------
|"55"                            |
|"274"                           |
----------------------------------

---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)"  |
---------------------------------------------------
|55                                               |
|274                                              |
---------------------------------------------------
Copy

将对象数组展平为行

如果需要将半结构化数据“展平”为 DataFrame (例如,为数组中的每个对象生成一行),请调用 ` DataFrame.flatten _ 方法。此方法与 :doc:/sql-reference/functions/flatten` SQL 函数等效。如果传入对象或数组的路径,该方法会返回一个 DataFrame,其中包含对象或数组中各字段或元素的行。

例如,在 :ref:` 示例数据 <label-sample_data_semistructured_data>` 中, src:customer 是一个包含有关客户的信息的对象数组。每个对象都包含 nameaddress 字段。

如果将此路径传递给 flatten 函数:

val df = session.table("car_sales")
df.flatten(col("src")("customer")).show()
Copy

该方法返回一个 DataFrame:

----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC"                                      |"SEQ"  |"KEY"  |"PATH"  |"INDEX"  |"VALUE"                            |"THIS"                               |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{                                          |1      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "San Francisco, CA",  |  {                                  |
|    {                                      |       |       |        |         |  "name": "Joyce Ridgely",         |    "address": "San Francisco, CA",  |
|      "address": "San Francisco, CA",      |       |       |        |         |  "phone": "16504378889"           |    "name": "Joyce Ridgely",         |
|      "name": "Joyce Ridgely",             |       |       |        |         |}                                  |    "phone": "16504378889"           |
|      "phone": "16504378889"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Valley View Auto Sales",  |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "55",                            |       |       |        |         |                                   |                                     |
|    "name": "Frank Beasley"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "paint protection"                 |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Honda",                     |       |       |        |         |                                   |                                     |
|      "model": "Civic",                    |       |       |        |         |                                   |                                     |
|      "price": "20275",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
|{                                          |2      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "New York, NY",       |  {                                  |
|    {                                      |       |       |        |         |  "name": "Bradley Greenbloom",    |    "address": "New York, NY",       |
|      "address": "New York, NY",           |       |       |        |         |  "phone": "12127593751"           |    "name": "Bradley Greenbloom",    |
|      "name": "Bradley Greenbloom",        |       |       |        |         |}                                  |    "phone": "12127593751"           |
|      "phone": "12127593751"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Tindel Toyota",           |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "274",                           |       |       |        |         |                                   |                                     |
|    "name": "Greg Northrup"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "rust proofing",                   |       |       |        |         |                                   |                                     |
|        "fabric protection"                |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Toyota",                    |       |       |        |         |                                   |                                     |
|      "model": "Camry",                    |       |       |        |         |                                   |                                     |
|      "price": "23500",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
----------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

从 DataFrame 中,您可以从 VALUE 字段中的每个对象选择 nameaddress 字段:

df.flatten(col("src")("customer")).select(col("value")("name"), col("value")("address")).show()
Copy
-------------------------------------------------
|"""VALUE""['NAME']"   |"""VALUE""['ADDRESS']"  |
-------------------------------------------------
|"Joyce Ridgely"       |"San Francisco, CA"     |
|"Bradley Greenbloom"  |"New York, NY"          |
-------------------------------------------------
Copy

以下代码 将值类型转换为特定类型 并更改列的名称,从而补充了上一个示例:

df.flatten(col("src")("customer")).select(col("value")("name").cast(StringType).as("Customer Name"), col("value")("address").cast(StringType).as("Customer Address")).show()
Copy
-------------------------------------------
|"Customer Name"     |"Customer Address"  |
-------------------------------------------
|Joyce Ridgely       |San Francisco, CA   |
|Bradley Greenbloom  |New York, NY        |
-------------------------------------------
Copy

执行 SQL 语句

若要执行您指定的 SQL 语句,请调用 Session 类中的 sql 方法,然后传入要执行的语句。该方法返回一个 DataFrame。

请注意,在您 :ref:` 调用操作方法 <label-snowpark_dataframe_action_method>` 之前, SQL 语句不会执行。

// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val dfStageFiles = session.sql("ls @myStage")
val files = dfStageFiles.collect()
files.foreach(println)

// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()

val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()
println("Count: " + numRows);
Copy

如果要 :ref:` 调用方法来转换 DataFrame <label-snowpark_dataframe_transform>` (例如 filter、select 等),请注意,仅当基础 SQL 语句是 SELECT 语句时,这些方法才有效。其他类型的 SQL 语句不支持转换方法。

val df = session.sql("select id, category_id, name from sample_product_data where id > 10")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("category_id") < 10).select(col("id")).collect()
results.foreach(println)

// In this example, the underlying SQL statement is not a SELECT statement.
val dfStageFiles = session.sql("ls @myStage")
// Calling the filter method results in an error.
dfStageFiles.filter(...)
Copy
语言: 中文