在 Snowpark Scala 中使用 DataFrames¶
在 Snowpark 中,主要通过 DataFrame 来查询和处理数据。本主题说明如何使用 DataFrames。
本主题内容:
要检索和操作数据,需要使用 ` DataFrame `_ 类。DataFrame 表示延迟评估的关系型数据集,延迟评估是指仅在触发特定操作时执行。从某种意义上说, DataFrame 就像一个需要评估才能检索数据的查询。
要将数据检索到 DataFrame 之中,请执行以下步骤:
-
例如,您可以创建一个 DataFrame,以保存来自表和外部 CSV 文件的数据或执行 SQL 语句时产生的数据。
-
例如,可以指定应该选择哪些列、如何筛选行、如何对结果进行排序和分组等。
-
为了将数据检索到 DataFrame 之中,必须调用执行操作的方法(例如
collect()
方法)。
接下来的部分将更详细地介绍这些步骤。
设置本部分的示例¶
本部分的一些示例使用 DataFrame 查询名为 sample_product_data
的表。若要运行这些示例,可以执行以下 SQL 语句,以创建此表并用一些数据来填充此表:
CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
INSERT INTO sample_product_data VALUES
(1, 0, 5, 'Product 1', 'prod-1', 1, 10),
(2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
(3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
(4, 0, 10, 'Product 2', 'prod-2', 2, 40),
(5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
(6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
(7, 0, 20, 'Product 3', 'prod-3', 3, 70),
(8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
(9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
(10, 0, 50, 'Product 4', 'prod-4', 4, 100),
(11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
(12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);
要验证表是否已创建,请运行:
SELECT * FROM sample_product_data;
构造 DataFrame¶
要构造 DataFrame,可以使用 Session
类中的方法。以下每种方法都基于不同类型的数据源构造 DataFrame :
要基于表、视图或流中的数据创建 DataFrame,请调用
table
方法:// Create a DataFrame from the data in the "sample_product_data" table. val dfTable = session.table("sample_product_data") // To print out the first 10 rows, call: // dfTable.show()
备注
session.table
方法返回一个Updatable
对象。Updatable
扩展了DataFrame
,并提供了用于处理表中数据的其他方法(例如,用于更新和删除数据的方法)。请参阅 更新、删除和合并表中的行。要基于一系列值创建 DataFrame,请调用
createDataFrame
方法:// Create a DataFrame containing a sequence of values. // In the DataFrame, name the columns "i" and "s". val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
备注
构造 DataFrame 时,Snowflake 的保留字不能用作有效的列名。有关保留字的列表,请参阅 保留和受限关键字。
要创建包含特定值范围的 DataFrame,请调用
range
方法:// Create a DataFrame from a range val dfRange = session.range(1, 10, 2)
要 为暂存区中的文件创建 DataFrame,请调用
read
来获取DataFrameReader
对象。在DataFrameReader
对象中,调用与文件中的数据格式对应的方法:// Create a DataFrame from data in a stage. val dfJson = session.read.json("@mystage2/data1.json")
要创建 DataFrame 来保存 SQL 查询的结果,请调用
sql
方法:// Create a DataFrame from a SQL query val dfSql = session.sql("SELECT name from products")
注意:虽然可以使用此方法执行 SELECT 语句,以从表和暂存文件中检索数据,但应改用
table
和read
方法。在开发工具中,table
和read
之类的方法可以提供更好的语法突出显示、错误突出显示和智能代码补全效果。
指定应该如何转换数据集¶
要指定应选择哪些列,以及应如何对结果进行筛选、排序、分组等,请调用能转换数据集的 DataFrame 方法。要在这些方法中标识列,请使用 col
函数或计算结果为列的表达式。(请参阅 指定列和表达式。)
例如:
若要指定应返回的行,请调用
filter
方法:// Import the col function from the functions object. import com.snowflake.snowpark.functions._ // Create a DataFrame for the rows with the ID 1 // in the "sample_product_data" table. // // This example uses the === operator of the Column object to perform an // equality check. val df = session.table("sample_product_data").filter(col("id") === 1) df.show()
若要指定应选择的列,请调用
select
方法:// Import the col function from the functions object. import com.snowflake.snowpark.functions._ // Create a DataFrame that contains the id, name, and serial_number // columns in te "sample_product_data" table. val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number")) df.show()
每个方法都返回一个经过转换的新 DataFrame 对象。(该方法不会影响原始 DataFrame 对象。)这就意味着,如果要应用多个转换,可以 将多个方法调用链接起来,基于前一个方法调用所返回的新 DataFrame 对象来调用每个后续转换方法。
请注意,这些转换方法不会从 Snowflake 数据库中检索数据。( 执行操作以计算 DataFrame 中描述的操作方法会执行数据检索。)转换方法只是指定应如何构造 SQL 语句。
指定列和表达式¶
调用这些转换方法时,可能需要指定列或者使用列的表达式。例如,调用 select
方法时,需要指定应选择的列。
要引用列,请通过调用 com.snowflake.snowpark.functions
对象中的 ` col `_ 函数来创建 ` Column `_ 对象。
// Import the col function from the functions object.
import com.snowflake.snowpark.functions._
val dfProductInfo = session.table("sample_product_data").select(col("id"), col("name"))
dfProductInfo.show()
备注
要为字面量创建 Column
对象,请参阅 将字面量用作列对象。
指定筛选器、投影、联接条件等时,可以在表达式中使用 Column
对象。例如:
可以将
Column
对象与filter
方法一起使用,以指定筛选条件:// Specify the equivalent of "WHERE id = 20" // in an SQL SELECT statement. df.filter(col("id") === 20)
// Specify the equivalent of "WHERE a + b < 10" // in an SQL SELECT statement. df.filter((col("a") + col("b")) < 10)
可以将
Column
对象与select
方法一起使用,以定义别名:// Specify the equivalent of "SELECT b * 10 AS c" // in an SQL SELECT statement. df.select((col("b") * 10) as "c")
可以将
Column
对象与join
方法一起使用,以定义联接条件:// Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y" // in an SQL SELECT statement. dfX.join(dfY, col("a_in_X") === col("b_in_Y"))
引用不同 DataFrames 中的列¶
引用两个不同 DataFrame 对象中具有相同名称的列时(例如,基于该列联接 DataFrames ),可以在一个 DataFrame 对象中使用 DataFrame.col
方法引用该对象中的列(例如 df1.col("name")
和 df2.col("name")
)。
下面的示例演示了如何使用 DataFrame.col
方法来引用特定 DataFrame 中的列。该示例联接两个 DataFrame 对象,两者均具有名为 key
的列。该示例使用 Column.as
方法来更改新创建 DataFrame 中的列名称。
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))
使用 apply
方法引用列¶
作为 DataFrame.col
方法的替代方案,可以使用 DataFrame.apply
方法引用特定 DataFrame 中的列。与 DataFrame.col
方法一样, DataFrame.apply
方法接受列名作为输入并返回一个 Column
对象。
请注意,当对象在 Scala 中具有 apply
方法时,可通过像调用函数一样调用对象,来调用 apply
方法。例如,要调用 df.apply("column_name")
,只需写成 df("column_name")
即可。以下调用是等效的:
df.col("<column_name>")
df.apply("<column_name>")
df("<column_name>")
以下示例与前面的示例相同,但使用 DataFrame.apply
方法引用联接操作中的列:
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))
使用列对象的简写¶
作为使用 col
函数的替代方法,您可以通过以下方式之一引用列:
在带引号的列名称前面使用美元符号 (
$"column_name"
)。在不带引号的列名称前面使用撇号(单引号)(
'column_name
)。
为此,请在创建 Session
对象后从 implicits
对象导入名称:
val session = Session.builder.configFile("/path/to/properties").create
// Import this after you create the session.
import session.implicits._
// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)
// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)
将对象标识符(表名称、列名称等)放在双引号里¶
您指定的数据库、架构、表和暂存区的名称必须符合 Snowflake 标识符要求。指定名称时,Snowflake 会将该名称视为大写形式。例如,以下调用是等效的:
// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))
如果名称不符合标识符要求,则必须将名称放在双引号里 ("
)。对于 Scala 字符串字面量中的双引号字符,请使用反斜杠 (\
) 进行转义。例如,以下表名称并非以字母或下划线开头,因此必须将该名称放在双引号里:
val df = session.table("\"10tablename\"")
请注意,指定 :emph:` 列 ` 的名称时,不需要将名称放在双引号里。如果列名称不符合标识符要求,Snowpark 库会自动将列名称放在双引号里:
// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\""))
// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))
如果已在列名称两侧添加了双引号,则该库不会在列名称两侧插入其他双引号。
在某些情况下,列名称可能包含双引号字符:
describe table quoted;
+------------------------+ ...
| name | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted" | ...
+------------------------+ ...
如 标识符要求 中所述,对于带双引号的标识符中的每个双引号字符,都必须使用两个双引号字符(例如 "name_with_""air""_quotes"
和 """column_name_quoted"""
):
val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()
请注意,如果将标识符放在双引号里(无论是您显式添加了引号,还是由库为您添加了引号), Snowflake 都会将标识符视为区分大小写:
// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))
将字面量用作列对象¶
要在传入 Column
对象的方法中使用字面量,请将字面量传递给 com.snowflake.snowpark.functions
对象中的 lit
函数,从而为字面量创建 Column
对象。例如:
// Import for the lit and col functions.
import com.snowflake.snowpark.functions._
// Show the first 10 rows in which num_items is greater than 5.
// Use `lit(5)` to create a Column object for the literal 5.
df.filter(col("num_items").gt(lit(5))).show()
如果字面量是 Scala 中的浮点或双精度值(例如 0.05
` 默认情况下被视为双精度值 <https://docs.scala-lang.org/overviews/scala-book/built-in-types.html (https://docs.scala-lang.org/overviews/scala-book/built-in-types.html)>`_),则 Snowpark 库会生成 SQL,从而隐式将该值的类型转换为相应的 Snowpark 数据类型(例如 0.05::DOUBLE
)。这可能会产生与指定的确切数字不同的近似值。
例如,以下代码不显示匹配的行,即使筛选器(匹配大于或等于 0.05
的值)应匹配 DataFrame 中的行:
// Create a DataFrame that contains the value 0.05.
val df = session.sql("select 0.05 :: Numeric(5, 2) as a")
// Applying this filter results in no matching rows in the DataFrame.
df.filter(col("a") <= lit(0.06) - lit(0.01)).show()
问题在于 lit(0.06)
和 lit(0.01)
为 0.06
和 0.01
产生近似值而不是确切值。
若要避免此问题,可使用下列方法之一:
选项 1:将字面量的类型转换为要使用的 Snowpark 类型。例如,要使用精度为 5、小数位数为 2 的 NUMBER,请运行以下语句:
df.filter(col("a") <= lit(0.06).cast(new DecimalType(5, 2)) - lit(0.01).cast(new DecimalType(5, 2))).show()
选项 2:将值的类型转换为要使用的类型,然后再将值传递给
lit
函数。例如,如果要使用 BigDecimal 类型 (https://docs.scala-lang.org/overviews/scala-book/built-in-types.html#bigint-and-bigdecimal),请执行如下操作:df.filter(col("a") <= lit(BigDecimal(0.06)) - lit(BigDecimal(0.01))).show()
将列对象的类型转换为特定类型¶
要将 Column
对象的类型转换为特定类型,请调用 ` Column.cast _ 方法,然后从 `com.snowflake.snowpark.types package `_ 传入类型对象。例如,要将字面量转换为精度为 5、小数位数为 2 的 :ref:`label-data_type_number,请运行以下语句:
// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._
val decimalValue = lit(0.05).cast(new DecimalType(5,2))
链接多个方法调用¶
由于每个 转换 DataFrame 对象的方法 都会返回一个应用了转换的新 DataFrame 对象,因此您可以 链接多个方法调用 (link removed),以生成以其他方式转换的新 DataFrame。
下面的示例返回为如下目的而配置的 DataFrame :
查询
sample_product_data
表。返回
id = 1
的行。选择
name
和serial_number
列。
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
在此示例中:
session.table("sample_product_data")
返回sample_product_data
表的 DataFrame。尽管 DataFrame 尚且不包含表中的数据,但该对象确实包含表列的定义。
filter(col("id") === 1)
返回sample_product_data
表的 DataFrame(设置为返回id = 1
的行)。请再次注意, DataFrame 尚未包含表中匹配的行。在 :ref:` 调用操作方法 <label-snowpark_dataframe_action_method>` 之前,不会检索匹配的行。
select(col("name"), col("serial_number"))
返回一个 DataFrame,包含sample_product_data
表中id = 1
的行的name
和serial_number
列。
在链接多个方法调用时,请注意调用顺序非常重要。每个方法调用都返回一个已转换的 DataFrame。确保后续调用使用已转换的 DataFrame。
例如,在下面的代码中,select
方法返回仅包含两列的 DataFrame:name
和 serial_number
。对此 DataFrame 的 filter
方法调用会失败,因为它使用的 id
列不在已转换的 DataFrame 中。
// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") === 1)
相反,以下代码会成功执行,因为对包含 sample_product_data
表中所有列(包括 id
列)的 DataFrame 调用了 filter()
方法:
// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
请注意,可能需要按照不同于在 SQL 语句中使用等效关键字(SELECT 和 WHERE)时的顺序执行 select
和 filter
方法调用。
限制 DataFrame 中的行数¶
要限制 DataFrame 中的行数,可以使用 ` DataFrame.limit `_ 转换方法。
Snowpark API 还提供了以下操作方法,可检索和打印出有限数量的行:
` DataFrame.first _ 操作方法(用于执行查询并返回前 :samp:`{n} 行)
` DataFrame.show _ 操作方法(用于执行查询并打印前 :samp:`{n} 行)
这些方法能有效地将 LIMIT 子句添加到执行的 SQL 语句中。
如 :ref:` LIMIT 使用说明 <label-limit_cmd_usage_notes>` 中所述,除非将排序顺序 (ORDER BY) 与 LIMIT 一起指定,否则结果是不确定的。
要使 ORDER BY 子句与 LIMIT 子句一起出现(从而避免 ORDER BY 出现在另一个子查询等地方),必须调用 limit 方法,以限制 sort
方法返回的 DataFrame 中的结果数。
例如,如果 链接多个方法调用:
// Limit the number of rows to 5, sorted by parent_id.
var dfSubset = df.sort(col("parent_id")).limit(5);
// Return the first 5 rows, sorted by parent_id.
var arrayOfRows = df.sort(col("parent_id")).first(5)
// Print the first 5 rows, sorted by parent_id.
df.sort(col("parent_id")).show(5)
检索列定义¶
要在 DataFrame 的数据集中检索列的定义,请调用 schema
方法。此方法会返回一个 StructType
对象,其中包含 StructField
对象的 Array
。每个 StructField
对象都包含一列的定义。
// Get the StructType object that describes the columns in the
// underlying rowset.
val tableSchema = session.table("sample_product_data").schema
println("Schema for sample_product_data: " + tableSchema);
在返回的 StructType
对象中,列名称始终是规范化的。不带引号的标识符以大写形式返回,带引号的标识符以其定义所用的确切大小写形式返回。
下面的示例创建一个 DataFrame,其中包含名为 ID
和 3rd
的列。对于名为 3rd
的列,Snowpark 库会自动将名称放在双引号里(即 "3rd"
),因为 该名称不符合标识符要求。
该示例调用 schema
方法,随后对返回的 StructType
对象调用 names
方法,以获取列名称的 ArraySeq
。这些名称使用 schema
方法返回的 StructType
进行规范化。
// Create a DataFrame containing the "id" and "3rd" columns.
val dfSelectedColumns = session.table("sample_product_data").select(col("id"), col("3rd"))
// Print out the names of the columns in the schema. This prints out:
// ArraySeq(ID, "3rd")
println(dfSelectedColumns.schema.names.toSeq)
联接 DataFrames¶
若要联接 DataFrame 对象,请调用 ` DataFrame.join `_ 方法。
以下部分说明如何使用 DataFrames 执行联接:
设置联接的示例数据¶
后面部分中的示例使用示例数据,您可以通过执行以下 SQL 语句来设置这些数据:
create or replace table sample_a (
id_a integer,
name_a varchar,
value integer
);
insert into sample_a (id_a, name_a, value) values
(10, 'A1', 5),
(40, 'A2', 10),
(80, 'A3', 15),
(90, 'A4', 20)
;
create or replace table sample_b (
id_b integer,
name_b varchar,
id_a integer,
value integer
);
insert into sample_b (id_b, name_b, id_a, value) values
(4000, 'B1', 40, 10),
(4001, 'B2', 10, 5),
(9000, 'B3', 80, 15),
(9099, 'B4', null, 200)
;
create or replace table sample_c (
id_c integer,
name_c varchar,
id_a integer,
id_b integer
);
insert into sample_c (id_c, name_c, id_a, id_b) values
(1012, 'C1', 10, null),
(1040, 'C2', 40, 4000),
(1041, 'C3', 40, 4001)
;
指定联接的列¶
利用 DataFrame.join
方法,可以通过以下方式之一指定要使用的列:
指定描述联接条件的列表达式。
指定应用作联接中的公共列的一个或多个列。
以下示例对名为 id_a
的列执行内部联接:
// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
请注意,该示例使用 DataFrame.col
方法来指定要用于联接的条件。有关此方法的更多信息,请参阅 指定列和表达式。
这将打印以下输出:
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
----------------------------------------------------------------------
联接结果中重复的相同列名称¶
在联接产生的 DataFrame 中,Snowpark 库使用在已联接的表中找到的列名称,即使这些列名称在表之间相同也是如此。发生这种情况时,这些列名称将在联接产生的 DataFrame 中重复。要按名称访问重复的列,请对表示列的原始表的 DataFrame 调用 col
方法。(有关指定列的更多信息,请参阅 引用不同 DataFrames 中的列。)
以下示例中的代码联接两个 DataFrames,然后对已联接的 DataFrame 调用 select
方法。此代码通过从表示相应 DataFrame 对象( dfRhs
和 dfLhs
)的变量调用 col
方法来指定要选择的列。它使用 as
方法为 select
方法创建的 DataFrame 中的列提供新名称。
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
val dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"))
dfSelected.show()
这将打印以下输出:
------------------------------
|"LEFTVALUE" |"RIGHTVALUE" |
------------------------------
|5 |5 |
|10 |10 |
|15 |15 |
------------------------------
在保存或缓存之前删除重复列¶
请注意,当联接产生的 DataFrame 包含重复的列名称时,必须通过删除重复列或重命名列来移除 DataFrame 中的重复项,然后才能将结果保存到表中或者缓存 DataFrame。对于保存到表或缓存的 DataFrame 中的重复列名称,Snowpark 库会将重复的列名称替换为别名,让它们不再重复。
对于缓存的 DataFrame,下例说明了其输出在以下情况下将如何显示:列名称 ID_A
和 VALUE
在两个表的联接结果中重复,之后,在缓存结果之前未删除重复列或未重命名列。
--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A" |"NAME_A" |"l_ZSz7_VALUE" |"ID_B" |"NAME_B" |"r_heec_ID_A" |"r_heec_VALUE" |
--------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
--------------------------------------------------------------------------------------------------
执行自然联接¶
要执行 :ref:` 自然联接 <label-querying_join_natural>` (在具有相同名称的列上联接 DataFrames),请调用 ` DataFrame.naturalJoin `_ 方法。
以下示例在 sample_a
和 sample_b
的公共列( id_a
列)上联接这两个表的 DataFrames :
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
这将打印以下输出:
---------------------------------------------------
|"ID_A" |"VALUE" |"NAME_A" |"ID_B" |"NAME_B" |
---------------------------------------------------
|10 |5 |A1 |4001 |B2 |
|40 |10 |A2 |4000 |B1 |
|80 |15 |A3 |9000 |B3 |
---------------------------------------------------
指定联接类型¶
默认情况下, DataFrame.join
方法创建内部联接。要指定其他类型的联接,请将 joinType
实参设置为以下值之一:
联接类型 |
|
---|---|
内部联接 |
|
左外部联接 |
|
右外部联接 |
|
完整外部联接 |
|
交叉联接 |
|
例如:
// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
dfLeftOuterJoin.show()
这将打印以下输出:
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|40 |A2 |10 |4000 |B1 |40 |10 |
|10 |A1 |5 |4001 |B2 |10 |5 |
|80 |A3 |15 |9000 |B3 |80 |15 |
|90 |A4 |20 |NULL |NULL |NULL |NULL |
----------------------------------------------------------------------
联接多个表¶
要联接多个表,请执行以下步骤:
为每个表创建一个 DataFrame。
对第一个 DataFrame 调用
DataFrame.join
方法,并传入第二个 DataFrame。使用
join
方法返回的 DataFrame 来调用join
方法,并传入第三个 DataFrame。
可以 链接 join
调用,如下所示:
val dfFirst = session.table("sample_a")
val dfSecond = session.table("sample_b")
val dfThird = session.table("sample_c")
val dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a") === dfSecond.col("id_a")).join(dfThird, dfFirst.col("id_a") === dfThird.col("id_a"))
dfJoinThreeTables.show()
这将打印以下输出:
------------------------------------------------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |"ID_C" |"NAME_C" |"ID_A" |"ID_B" |
------------------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |1012 |C1 |10 |NULL |
|40 |A2 |10 |4000 |B1 |40 |10 |1040 |C2 |40 |4000 |
|40 |A2 |10 |4000 |B1 |40 |10 |1041 |C3 |40 |4001 |
------------------------------------------------------------------------------------------------------------
执行自联接¶
如果需要基于不同的列将表与其自身联接,则不能使用单个 DataFrame 执行自联接。以下使用单个 DataFrame 执行自联接的示例最终失败,因为联接的左侧和右侧都存在 "id"
的列表达式:
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, col("id") === col("parent_id"))
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, df("id") === df("parent_id"))
上面两个示例都失败了,并引发以下异常:
Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
Instead, join this DataFrame to a clone() of itself.
请改用 ` DataFrame.clone `_ 方法创建 DataFrame 对象的克隆,并使用这两个 DataFrame 对象执行联接:
// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
val dfLhs = session.table("sample_product_data")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()
// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))
dfJoined.show()
如果要对同一列执行自联接,请调用 join
方法,为 USING
子句传入列表达式的 Seq
:
// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
val df = session.table("sample_product_data");
val dfJoined = df.join(df, Seq("key"))
执行操作以计算 DataFrame¶
如前所述, DataFrame 是延迟计算的,也就是说,在您执行操作之前, SQL 语句不会发送到服务器执行。执行操作会导致对 DataFrame 进行计算,并将相应的 SQL 语句发送到服务器执行。
以下部分介绍如何同步和异步地对 DataFrame 执行操作:
同步执行操作¶
要同步执行操作,请调用以下操作方法之一:
同步执行操作的方法 |
描述 |
---|---|
|
计算 DataFrame,并将生成的数据集作为 ` Row _ 对象的 :code:`Array 返回。请参阅 返回所有行。 |
|
计算 DataFrame,并返回 ` Row _ 对象的 ` Iterator `_。如果结果集很大,请使用此方法,以避免将所有结果同时加载到内存中。请参阅 :ref:`label-snowpark_dataframe_return_iterator。 |
|
计算 DataFrame 并返回行数。 |
|
计算 DataFrame,并将行打印到控制台。请注意,此方法将行数限制为 10 行(默认值)。请参阅 打印 DataFrame 中的行。 |
|
执行查询,创建临时表,并将结果放入表中。该方法返回一个 |
|
将 DataFrame 中的数据保存到指定表中。请参阅 将数据保存到表中。 |
|
将 DataFrame 保存到暂存区中的指定文件。请参阅 将 DataFrame 保存到暂存区上的文件。 |
|
将 DataFrame 中的数据复制到指定表中。请参阅 将数据从文件复制到表中。 |
|
删除指定表中的行。请参阅 更新、删除和合并表中的行。 |
|
更新指定表中的行。请参阅 更新、删除和合并表中的行。 |
|
将行合并到指定的表中。请参阅 更新、删除和合并表中的行。 |
若要执行查询并返回结果数量,请调用 count
方法:
// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")
// Send the query to the server for execution and
// print the count of rows in the table.
println("Rows returned: " + dfProducts.count())
还可以调用操作方法来:
注意:如果调用 schema
方法来获取 DataFrame 中列的定义,则无需调用操作方法。
异步执行操作¶
备注
此功能是在 Snowpark 0.11.0 中引入的。
要异步执行操作,请调用 async
方法以返回“异步执行者”对象(例如 DataFrameAsyncActor
),然后在该对象中调用异步操作方法。
异步执行者对象的这些操作方法会返回一个 TypedAsyncJob
对象,您可以使用该对象检查异步操作的状态和检索操作的结果。
接下来的部分将介绍如何异步执行操作和检查结果。
了解异步操作的基本流程¶
可以使用以下方法异步执行操作:
异步执行操作的方法 |
描述 |
---|---|
|
异步计算 DataFrame,以 ` Row _ 对象的 :code:`Array 的形式检索生成的数据集。请参阅 返回所有行。 |
|
异步计算 DataFrame,以检索 ` Row _ 对象的 ` Iterator `_。如果结果集很大,请使用此方法,以避免将所有结果同时加载到内存中。请参阅 :ref:`label-snowpark_dataframe_return_iterator。 |
|
异步计算 DataFrame 以检索行数。 |
|
将 DataFrame 中的数据异步保存到指定表中。请参阅 将数据保存到表中。 |
|
将 DataFrame 保存到暂存区中的指定文件。请参阅 将 DataFrame 保存到暂存区上的文件。 |
|
将 DataFrame 中的数据异步复制到指定的表中。请参阅 将数据从文件复制到表中。 |
|
异步删除指定表中的行。请参阅 更新、删除和合并表中的行。 |
|
异步更新指定表中的行。请参阅 更新、删除和合并表中的行。 |
|
将行异步合并到指定表中。1.3.0 或更高版本中支持此方法。请参阅 更新、删除和合并表中的行。 |
从返回的 ` TypedAsyncJob `_ 对象中,您可以执行以下操作:
要确定操作是否已完成,请调用
isDone
方法。要获取与操作对应的查询 ID,请调用
getQueryId
方法。要返回操作的结果(例如,对于
collect
方法为Row
对象的Array
;对于count
方法为行数),请调用getResult
方法。请注意,
getResult
是阻塞调用。要取消操作,请调用
cancel
方法。
例如,要异步执行查询,并以 Row
对象的 Array
的形式检索结果,请调用 DataFrame.async.collect
:
// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
val df = session.table("sample_product_data").select(col("id"), col("name"))
// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.collect()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
val results = asyncJob.getResult()
results.foreach(println)
要异步执行查询并检索结果数,请调用 DataFrame.async.count
:
// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")
// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.count()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Print the count of rows in the table.
// Note that getResult is a blocking call.
println("Rows returned: " + asyncJob.getResult())
指定等待的最大秒数¶
调用 getResult
方法时,可以使用 maxWaitTimeInSeconds
实参指定在尝试检索结果之前等待查询完成的最大秒数。例如:
// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
val results = asyncJob.getResult(10)
如果省略此实参,该方法将等待 :ref:` snowpark_request_timeout_in_seconds <label-snowpark_request_timeout_in_seconds>` 配置属性指定的最大秒数。( :ref:` 创建 Session 对象 <label-snowpark_creating_session>` 时可以设置此属性。)
通过 ID 访问异步查询¶
如果有之前提交的异步查询的查询 ID,则可以调用 Session.createAsyncJob
方法创建一个 ` AsyncJob `_ 对象,该对象可用于检查查询状态、检索查询结果或取消查询。
请注意,与 TypedAsyncJob
不同的是, AsyncJob
没有提供用于检索结果的 getResult
方法。如果需要检索结果,请改为调用 getRows
或 getIterator
方法。
例如:
val asyncJob = session.createAsyncJob(myQueryId)
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
val rows = asyncJob.getRows()
rows.foreach(println)
将行检索到 DataFrame 之中¶
指定 DataFrame 应如何转换 来执行查询并返回结果。可以返回 Array
中的所有行,也可以返回一个 ` Iterator `_,以便逐行遍历访问结果。在后一种情况下,如果数据量较大,则行将按块加载到内存中,以免将大量数据加载到内存中。
返回所有行¶
要同时返回所有行,请调用 ` DataFrame.collect _ 方法。此方法返回 ` Row `_ 对象的数组。要从行中检索值,请调用 :samp:`get{Type} 方法(例如 getString
、getInt
等)。
例如:
import com.snowflake.snowpark.functions_
val rows = session.table("sample_product_data").select(col("name"), col("category_id")).sort(col("name")).collect()
for (row <- rows) {
println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
返回行的迭代器¶
如果要使用 ` Iterator `_ 遍历结果中的 ` Row `_ 对象,请调用 ` DataFrame.toLocalIterator `_。如果结果中的数据量很大,该方法将按块加载行,以免同时将所有行加载到内存中。
例如:
import com.snowflake.snowpark.functions_
while (rowIterator.hasNext) {
val row = rowIterator.next()
println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
返回前 n
行¶
要返回前 n
行,请调用 ` DataFrame.first `_ 方法,并传入要返回的行数。
如 限制 DataFrame 中的行数 中所述,结果是非确定性的。如果希望结果是确定性的,请对经过排序的 DataFrame 调用此方法 (df.sort().first()
)。
例如:
import com.snowflake.snowpark.functions_
val df = session.table("sample_product_data")
val rows = df.sort(col("name")).first(5)
rows.foreach(println)
打印 DataFrame 中的行¶
要将 DataFrame 中的前 10 行打印到控制台,请调用 ` DataFrame.show `_ 方法。要打印出其他数量的行,请传入要打印的行数。
如 限制 DataFrame 中的行数 中所述,结果是非确定性的。如果希望结果是确定性的,请对经过排序的 DataFrame 调用此方法 (df.sort().show()
)。
例如:
import com.snowflake.snowpark.functions_
val df = session.table("sample_product_data")
df.sort(col("name")).show()
更新、删除和合并表中的行¶
备注
此功能是在 Snowpark 0.7.0 中引入的。
当您调用 Session.table
为表创建 DataFrame
对象时,该方法将返回一个 Updatable
对象,此对象通过用于更新和删除表中数据的其他方法扩展了 DataFrame
。(请参阅 ` Updatable `_。)
如果需要更新或删除表中的行,可以使用 Updatable
类的以下方法:
调用
update
,以更新表中的现有行。请参阅 更新表中的行。调用
delete
以删除表中的行。请参阅 删除表中的行。调用
merge
,以根据另一个表或子查询中的数据在一个表中插入、更新和删除行。(这等效于 SQL 中的 MERGE 命令。)请参阅 将行合并到一个表中。
更新表中的行¶
对于 update
方法,传入一个 Map
,它将要更新的列与要分配给这些列的相应值关联起来。update
会返回一个 UpdateResult
对象,其中包含已更新的行数。(请参阅 ` UpdateResult `_。)
备注
update
是 操作方法,这意味着调用方法会将 SQL 语句发送到服务器执行。
例如,要将名为 count
的列中的值替换为值 1
,请运行以下语句:
val updatableDf = session.table("sample_product_data")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")
上面的示例使用列名称来标识列。您也可以使用列表达式:
val updateResult = updatableDf.update(Map(col("count") -> lit(1)))
如果仅在满足条件时才应进行更新,则可以将该条件指定为实参。例如,对于 category_id
列的值为 20
的行,要替换名为 count
的列中的值,请运行以下语句:
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), col("category_id") === 20)
如果需要让条件基于与另一个 DataFrame
对象进行的联接,则可以将该 DataFrame
作为实参传入,并在条件中使用该 DataFrame
。例如,对于 category_id
列与 DataFrame
dfParts
中的 category_id
匹配的行,要替换名为 count
的列中的值,请运行以下语句:
val updatableDf = session.table("sample_product_data")
val dfParts = session.table("parts")
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), updatableDf("category_id") === dfParts("category_id"), dfParts)
删除表中的行¶
对于 delete
方法,可以指定一个条件来标识要删除的行,并且可以使该条件基于与另一个 DataFrame 进行的联接。delete
会返回一个 DeleteResult
对象,其中包含已删除的行数。(请参阅 ` DeleteResult `_。)
备注
delete
是 操作方法,这意味着调用方法会将 SQL 语句发送到服务器执行。
例如,要删除 category_id
列中的值为 1
的行,请运行以下语句:
val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
如果条件引用另一个 DataFrame 中的列,则将该 DataFrame 作为第二个实参传入。例如,要删除 category_id
列与 DataFrame
dfParts
中的 category_id
匹配的行,请将 dfParts
作为第二个实参传入:
val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === dfParts("category_id"), dfParts)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
将行合并到一个表中¶
要根据另一个表或子查询中的值在一个表中插入、更新和删除行(等效于 SQL 中的 MERGE 命令),请执行以下步骤:
在要将数据合并到的表的
Updatable
对象中,调用merge
方法,并传入另一个表的DataFrame
对象和联接条件的列表达式。这将返回一个
MergeBuilder
对象,您可以使用该对象指定要对匹配的行和不匹配的行执行的操作(例如插入、更新或删除)。(请参阅 ` MergeBuilder `_。)使用
MergeBuilder
对象:要指定应对匹配的行执行的更新或删除,请调用
whenMatched
方法。如果需要指定有关何时应更新或删除行的附加条件,则可以传入该条件的列表达式。
此方法返回一个
MatchedClauseBuilder
对象,可用于指定要执行的操作。(请参阅 ` MatchedClauseBuilder `_。)调用
MatchedClauseBuilder
对象中的update
或delete
方法,以指定应对匹配的行执行的更新或删除操作。这些方法返回一个MergeBuilder
对象,可用于指定其他子句。要指定应在行不匹配时执行的插入,请调用
whenNotMatched
方法。如果需要指定有关何时应插入行的附加条件,则可以传入该条件的列表达式。
此方法返回一个
NotMatchedClauseBuilder
对象,可用于指定要执行的操作。(请参阅 ` NotMatchedClauseBuilder `_。)调用
NotMatchedClauseBuilder
对象中的insert
方法,以指定应在行不匹配时执行的插入操作。这些方法返回一个MergeBuilder
对象,可用于指定其他子句。
指定了应执行的插入、更新和删除后,调用
MergeBuilder
对象的collect
方法,以对表执行指定的插入、更新和删除。collect
会返回一个MergeResult
对象,其中包含已插入、更新和删除的行数。(请参阅 ` MergeResult `_。)
以下示例将 source
表中带有 id
和 value
列的行插入到 target
表中(如果 target
表未包含具有匹配 ID 的行):
val mergeResult = target.merge(source, target("id") === source("id"))
.whenNotMatched.insert(Seq(source("id"), source("value")))
.collect()
以下示例使用 source
表中具有同一 ID 的行的 value
列值更新 target
表中的行:
val mergeResult = target.merge(source, target("id") === source("id"))
.whenMatched.update(Map("value" -> source("value")))
.collect()
将数据保存到表中¶
可以将 DataFrame 的内容保存到新表或现有表中。为此,您必须具有以下权限:
对架构的 CREATE TABLE 权限(如果表不存在)。
对表的 INSERT 权限。
要将 DataFrame 的内容保存到表中,请执行以下操作:
调用 ` DataFrameWriter.mode `_ 方法,传入一个 ` SaveMode `_ 对象,该对象指定有关写入表的首选项:
要插入行,请传入
SaveMode.Append
。要覆盖现有表,请传入
SaveMode.Overwrite
。
此方法返回使用指定模式进行了配置的相同
DataFrameWriter
对象。如果要在现有表中插入行 (
SaveMode.Append
),并且 DataFrame 中的列名称与表中的列名称匹配,请调用 ` DataFrameWriter.option _ 方法,并传入 :code:"columnOrder"` 和"name"
作为实参。备注
此方法是在 Snowpark 1.4.0 中引入的。
默认情况下,
columnOrder
选项设置为"index"
,这意味着DataFrameWriter
按照列的出现顺序插入值。例如,DataFrameWriter
在表中第一列内插入 DataFrame 中第一列的值,在表中第二列内插入 DataFrame 中第二列的值,依此类推。此方法返回使用指定选项进行了配置的同一个
DataFrameWriter
对象。调用 ` DataFrameWriter.`_ saveAsTable 将 DataFrame 的内容保存到指定的表中。
无需调用单独的方法(例如
collect
)来执行将数据保存到表中的 SQL 语句。saveAsTable
是执行 SQL 语句的 操作方法。
以下示例使用 DataFrame df
的内容覆盖现有表(由 tableName
变量标识):
df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)
以下示例将 DataFrame df
中的行插入到现有表(由 tableName
变量标识)。在此示例中,表和 DataFrame 都包含列 c1
和 c2
。
该示例展示了这两种做法的区别:将 columnOrder
选项设置为 "name"
(这会将值插入到与 DataFrame 列同名的表列中),以及使用默认的 columnOrder
选项(这会根据 DataFrame 中列的顺序将值插入到表列中)。
val df = session.sql("SELECT 1 AS c2, 2 as c1")
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write.mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName)
// With the default value of the columnOrder option ("index"), the DataFrameWriter the uses column positions
// and inserts a row with the values (1, 2).
df.write.mode(SaveMode.Append).saveAsTable(tableName)
基于 DataFrame 创建视图¶
要基于 DataFrame 创建视图,请调用 ` DataFrame.createOrReplaceView `_ 方法:
df.createOrReplaceView("db.schema.viewName")
请注意,调用 createOrReplaceView
会立即创建新视图。更重要的是,它不会导致对 DataFrame 进行计算。(在您 :ref:` 执行操作 <label-snowpark_dataframe_action_method>` 之前,不会对 DataFrame 本身进行计算。)
通过调用 createOrReplaceView
创建的视图是持久保留的。如果不再需要该视图,可以 手动删除视图。
如果仅需要为会话创建临时视图,请改为调用 ` DataFrame.createOrReplaceTempView `_ 方法:
df.createOrReplaceTempView("db.schema.viewName")
缓存 DataFrame¶
在某些情况下,可能需要执行复杂的查询,并将结果保留下来,以供后续操作使用(而不必再次执行相同的查询)。在此类情况下,可以调用 ` DataFrame.cacheResult `_ 方法来缓存 DataFrame 的内容。
此方法会:
运行查询。
在调用
cacheResult
之前,无需 调用单独的操作方法来检索结果。cacheResult
是执行查询的操作方法。将结果保存在临时表中
由于
cacheResult
会创建临时表,因此,您必须对正在使用的架构具有 CREATE TABLE 权限。返回一个 ` HasCachedResult `_ 对象,该对象提供对临时表中结果的访问权。
由于
HasCachedResult
扩展了DataFrame
,因此,可以对此缓存数据执行一些同样可以对 DataFrame 执行的操作。
备注
由于 cacheResult
执行查询并将结果保存到表中,因此该方法可能会导致计算和存储成本增加。
例如:
import com.snowflake.snowpark.functions_
// Set up a DataFrame to query a table.
val df = session.table("sample_product_data").filter(col("category_id") > 10)
// Retrieve the results and cache the data.
val cachedDf = df.cacheResult()
// Create a DataFrame containing a subset of the cached data.
val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
dfSubset.show()
请注意,调用该方法时,原始的 DataFrame 不受影响。例如,假设 dfTable
是 sample_product_data
表的 DataFrame :
val dfTempTable = dfTable.cacheResult()
调用 cacheResult
后, dfTable
仍指向 sample_product_data
表,而且您可以继续使用 dfTable
来查询和更新该表。
要使用临时表中的缓存数据,请使用 dfTempTable
(即 cacheResult
返回的 HasCachedResult
对象)。
处理暂存区中的文件¶
Snowpark 库提供了一些类和方法,可让您通过使用暂存区中的文件 将数据加载到 Snowflake 中,以及 从 Snowflake 卸载数据。
备注
为了在暂存区上使用这些类和方法,您必须具有 使用暂存区所需的权限。
接下来的部分将说明如何使用这些类和方法:
在暂存区中上传和下载文件¶
要在暂存区中上传和下载文件,请使用 ` FileOperation `_ 对象:
将文件上传到暂存区¶
要将文件上传到暂存区,请执行以下步骤:
验证您是否具有 将文件上传到暂存区的权限。
调用 ` FileOperation.put `_ 方法将文件上传到暂存区。
此方法执行 SQL PUT 命令。
要为 PUT 命令指定任何 可选参数,请创建参数和值的
Map
,然后传入该Map
作为options
实参。例如:// Upload a file to a stage without compressing the file. val putOptions = Map("AUTO_COMPRESS" -> "FALSE") val putResults = session.file.put("file:///tmp/myfile.csv", "@myStage", putOptions)
在
localFilePath
实参中,可以使用通配符(*
和?
)来标识要上传的一组文件。例如:// Upload the CSV files in /tmp with names that start with "file". // You can use the wildcard characters "*" and "?" to match multiple files. val putResults = session.file.put("file:///tmp/file*.csv", "@myStage/prefix2")
检查
put
方法返回的 ` PutResult _ 对象的 :code:`Array,以确定文件是否上传成功。例如,要打印该文件的文件名和 PUT 操作的状态,请运行以下语句:// Print the filename and the status of the PUT operation. putResults.foreach(r => println(s" ${r.sourceFileName}: ${r.status}"))
从暂存区下载文件¶
要从暂存区下载文件,请执行以下步骤:
验证您是否具有 从暂存区下载文件的权限。
调用 ` FileOperation.get `_ 方法,从暂存区下载文件。
此方法执行 SQL GET 命令。
要为 GET 命令指定任何 可选参数,请创建参数和值的
Map
,然后传入该Map
作为options
实参。例如:// Download files with names that match a regular expression pattern. val getOptions = Map("PATTERN" -> s"'.*file_.*.csv.gz'") val getResults = session.file.get("@myStage", "file:///tmp", getOptions)
检查
get
方法返回的 ` GetResult _ 对象的 :code:`Array,以确定文件是否下载成功。例如,要打印该文件的文件名和 GET 操作的状态,请运行以下语句:// Print the filename and the status of the GET operation. getResults.foreach(r => println(s" ${r.fileName}: ${r.status}"))
使用输入流在暂存区中上传和下载数据¶
备注
此功能是在 Snowpark 1.4.0 中引入的。
要使用输入流将数据上传到暂存区上的文件,以及从暂存区上的文件下载数据,请使用 ` FileOperation _ 对象的 :code:`uploadStream 和 downloadStream
方法:
使用输入流将数据上传到暂存区上的文件¶
要将数据从 ` java.io.InputStream `_ 对象上传到暂存区上的文件,请执行以下步骤:
验证您是否具有 将文件上传到暂存区的权限。
调用 ` FileOperation.uploadStream `_ 方法。
传入应写入数据的暂存区上文件的完整路径和
InputStream
对象。此外,使用compress
实参,指定在上传数据之前是否应压缩数据。
例如:
import java.io.InputStream
...
val compressData = true
val pathToFileOnStage = "@myStage/path/file"
session.file.uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData)
使用输入流从暂存区上的文件下载数据¶
要将数据从暂存区上的文件下载到 ` java.io.InputStream `_ 对象,请执行以下操作:
验证您是否具有 从暂存区下载文件的权限。
调用 ` FileOperation.downloadStream `_ 方法。
传入包含要下载的数据的暂存区文件的完整路径。使用
decompress
实参,指定是否要压缩文件中的数据。
例如:
import java.io.InputStream
...
val isDataCompressed = true
val pathToFileOnStage = "@myStage/path/file"
val is = session.file.downloadStream(pathToFileOnStage, isDataCompressed)
为暂存区中的文件设置 DataFrame¶
本部分介绍如何为 Snowflake 暂存区中的文件设置 DataFrame。创建此 DataFrame 后,您可以使用 DataFrame 来执行以下操作:
要为 Snowflake 暂存区中的文件设置 DataFrame,请使用 DataFrameReader
类:
验证您是否具有以下权限:
以下其中一项:
架构的 CREATE TABLE 权限(如果您计划指定用于确定如何从暂存文件复制数据的 复制选项 )。
否则确保您具备架构的 CREATE FILE FORMAT 权限。
调用
Session
类中的read
方法,以访问DataFrameReader
对象。如果文件采用 CSV 格式,请描述文件中的字段。要这样做,请执行以下操作:
创建一个 ` StructType `_ 对象,该对象包含描述文件中的字段的一系列 ` StructField `_ 对象。
对于每个
StructField
对象,请指定以下内容:字段的名称。
字段的数据类型(指定为
com.snowflake.snowpark.types
包中的对象)。字段是否可为 null。
例如:
import com.snowflake.snowpark.types._ val schemaForDataFile = StructType( Seq( StructField("id", StringType, true), StructField("name", StringType, true)))
调用
DataFrameReader
对象中的schema
方法,传入StructType
对象。例如:
var dfReader = session.read.schema(schemaForDataFile)
schema
方法会返回一个DataFrameReader
对象,该对象配置为读取包含指定字段的文件。请注意,对于其他格式(如 JSON)的文件,无需执行此操作。对于这些文件,
DataFrameReader
会将数据视为字段名称为$1
、类型为 VARIANT 的单个字段处理。
如果需要指定有关数据读取方式的其他信息(例如,数据经过压缩,或者 CSV 文件使用分号而非逗号来分隔字段),请调用 ` DataFrameReader.option `_ 或 ` DataFrameReader.options `_ 方法。
传入要设置的选项的名称和值。可以设置以下类型的选项:
有关 CREATE FILE FORMAT 的文档。
COPY INTO TABLE 文档。
请注意,当您 将数据检索到 DataFrame 中时,设置复制选项可能造成费用更昂贵的执行策略。
下面的示例设置了
DataFrameReader
对象,以查询未压缩且使用分号作为字段分隔符的 CSV 文件中的数据。dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
option
方法返回使用指定选项进行了配置的DataFrameReader
对象。若要设置多个选项,可以 链接多个调用 到
option
方法(如上面的示例所示),或是调用 ` DataFrameReader.options`_ 方法,并传入选项名称和值的Map
。调用与文件格式相对应的方法。您可以调用以下方法之一:
` DataFrameReader.avro `_
` DataFrameReader.csv `_
` DataFrameReader.json `_
` DataFrameReader.orc `_
` DataFrameReader.parquet `_
` DataFrameReader.xml `_
调用这些方法时,传入要读取的文件的暂存区位置。例如:
val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
若要指定以相同前缀开头的多个文件,请在暂存区名称后指定前缀。例如,要从暂存区
@mystage
加载具有前缀csv_
的文件,请执行以下操作:val df = dfReader.csv("@mystage/csv_")
与文件格式相对应的方法会返回文件的一个 ` CopyableDataFrame _ 对象。:code:`CopyableDataFrame 扩展了
DataFrame
,并提供用于处理暂存文件中的数据的其他方法。调用操作方法,以执行以下操作:
与表的 DataFrames 一样,在您调用 :ref:` 操作方法 <label-snowpark_dataframe_action_method>` 之前,不会将数据检索到 DataFrame 中。
将数据从文件加载到 DataFrame 中¶
在 为暂存区中的文件设置 DataFrame 之后,可以将文件中的数据加载到 DataFrame 中:
使用 DataFrame 对象方法,对 :ref:` 数据集执行所需的任何转换 <label-snowpark_dataframe_transform>` (例如,选择特定字段、筛选行等)。
例如,要从名为
mystage
的暂存区中名为data.json
的 JSON 文件中提取color
元素,请运行以下语句:val df = session.read.json("@mystage/data.json").select(col("$1")("color"))
如前所述,对于格式并非 CSV 的文件(例如 JSON 格式的文件),
DataFrameReader
会将文件中的数据视为名为$1
的单个 VARIANT 列。调用
DataFrame.collect
方法以加载数据。例如:val results = df.collect()
将数据从文件复制到表中¶
为暂存区中的文件设置 DataFrame 之后,可以调用 ` CopyableDataFrame.copyInto _ 方法将数据复制到表中。此方法执行 :doc:/sql-reference/sql/copy-into-table` 命令。
备注
在调用 copyInto
之前,无需调用 collect
方法。在调用 copyInto
之前,文件中的数据不需要在 DataFrame 之中。
例如,以下代码将 myFileStage
指定的 CSV 文件中的数据加载到 mytable
表中。由于数据位于 CSV 文件中,代码还必须 描述文件中的字段。为此,该示例调用 ` DataFrameReader.schema _ 方法,并传入包含描述字段的 ` StructField `_ 对象序列的 ` StructType `_ 对象 (``csvFileSchema`)。
val df = session.read.schema(csvFileSchema).csv(myFileStage)
df.copyInto("mytable")
将 DataFrame 保存到暂存区上的文件¶
备注
此功能是在 Snowpark 1.5.0 中引入的。
如果需要将 DataFrame 保存到暂存区上的文件中,可以调用与文件格式对应的 ` DataFrameWriter _ 方法(例如 :code:`csv 方法用于写入 CSV 文件),并传入应将文件保存到的暂存区位置。这些 DataFrameWriter
方法执行 COPY INTO <location> 命令。
备注
在调用这些 DataFrameWriter
方法之前,无需调用 collect
方法。在调用这些方法之前,文件中的数据不需要在 DataFrame 之中。
要将 DataFrame 的内容保存到暂存区上的文件中,请执行以下操作:
调用 ` DataFrame.write`_ 方法,以获取 ` DataFrameWriter _ 对象。例如,要获取代表名为 ``sample_product_data` 的表的 DataFrame 的
DataFrameWriter
对象,请运行以下语句:dfWriter = session.table("sample_product_data").write
如果要覆盖文件的内容(如果文件存在),请调用 ` DataFrameWriter.mode _ 方法,并传入 :code:`SaveMode.Overwrite。
否则,默认情况下,如果暂存区上的指定文件已存在,
DataFrameWriter
会报告错误。mode
方法返回使用指定模式进行了配置的相同DataFrameWriter
对象。例如,要指定
DataFrameWriter
应覆盖暂存区上的文件,请运行以下语句:dfWriter = dfWriter.mode(SaveMode.Overwrite)
如果需要指定有关应如何保存数据的其他信息(例如,应压缩数据,或者要使用分号来分隔 CSV 文件中的字段),请调用 ` DataFrameWriter.option`_ 方法或 ` DataFrameWriter.options`_ 方法。
传入要设置的选项的名称和值。可以设置以下类型的选项:
有关 COPY INTO <location> 的文档。
有关 COPY INTO <location> 的文档中介绍的 复制选项。
请注意,不能使用
option
方法设置以下选项:TYPE 格式类型选项。
OVERWRITE 复制选项。若要设置此选项,请改为调用
mode
方法(如上一步所述)。
下面的示例设置
DataFrameWriter
对象,以使用分号(而非逗号)作为字段分隔符,以未压缩的形式将数据保存到 CSV 文件中。dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE")
option
方法返回使用指定选项进行了配置的DataFrameWriter
对象。要设置多个选项,可以如上例所示 链接多个调用 (调用的是
option
方法),或者调用 ` DataFrameWriter.options _ 方法,并传入选项名称和值的 :code:`Map。若要返回有关已保存的各文件的详细信息,请将
DETAILED_OUTPUT
复制选项 设置为TRUE
。默认情况下,
DETAILED_OUTPUT
为FALSE
,这代表该方法会返回一行输出,其中包含"rows_unloaded"
、"input_bytes"
和"output_bytes"
字段。将
DETAILED_OUTPUT
设置为TRUE
时,该方法会为所保存的每个文件返回一行输出。每行都包含FILE_NAME
、FILE_SIZE
和ROW_COUNT
字段。调用文件格式对应的方法,将数据保存到文件中。您可以调用以下方法之一:
调用这些方法时,传入应写入数据的文件的暂存区位置(如
@mystage
)。默认情况下,该方法会将数据保存到名称带有前缀
data_
的文件中(例如@mystage/data_0_0_0.csv
)。如果希望使用不同的前缀命名文件,请在暂存区名称后指定前缀。例如:val writeFileResult = dfWriter.csv("@mystage/saved_data")
本示例将 DataFrame 的内容保存到名称以
saved_data
为前缀(例如@mystage/saved_data_0_0_0.csv
)的文件中。检查返回的 ` WriteFileResult `_ 对象,以获取有关写入文件的数据量的信息。
从
WriteFileResult
对象,您可以访问 COPY INTO <location> 命令生成的输出:若要以 ` Row _ 对象数组的形式访问输出行,请使用 :code:`rows 值成员。
若要确定行中有哪些字段,请使用
schema
值成员,该成员是描述行中字段的 ` StructType `_。
例如,要在输出行中打印出字段名称和值,请运行以下语句:
val writeFileResult = dfWriter.csv("@mystage/saved_data") for ((row, index) <- writeFileResult.rows.zipWithIndex) { (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach { (structField, element) => println(s"${structField.name}: $element") } }
以下示例使用 DataFrame 将名为 car_sales
的表的内容保存到 JSON 文件,这些文件位于 @mystage
暂存区中,名称带有 saved_data
前缀(例如 @mystage/saved_data_0_0_0.json
)。示例代码:
覆盖文件(如果暂存区中已存在该文件)。
返回有关保存操作的详细输出。
保存未压缩的数据。
最后,示例代码在返回的输出行中打印出每个字段和值:
val df = session.table("car_sales")
val writeFileResult = df.write.mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data")
for ((row, index) <- writeFileResult.rows.zipWithIndex) {
println(s"Row: $index")
(writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
(structField, element) => println(s"${structField.name}: $element")
}
}
使用半结构化数据¶
使用 DataFrame,您可以查询和访问 :doc:` 半结构化数据 </user-guide/semistructured-intro>` (例如 JSON 数据)。接下来的几个部分将介绍如何在 DataFrame 中处理半结构化数据。
备注
这些部分中的示例使用 示例中使用的示例数据 中的示例数据。
遍历半结构化数据¶
若要引用半结构化数据中的特定字段或元素,请使用 ` Column `_ 对象的以下方法:
使用 ` Column.apply("<field_name>") _,返回 OBJECT (或包含 OBJECT 的 VARIANT)中一个字段的 :code:`Column 对象。
使用 ` Column.apply(<index>) _,返回 ARRAY (或包含 ARRAY 的 VARIANT )中一个元素的 :code:`Column 对象。
备注
如果路径中的字段名称或元素不规则,并且导致难以使用这些 Column.apply
方法,则可以使用 ` get _、 get_ignore_case `_ 或 ` get_path `_ 函数作为替代方法。
如 使用 apply 方法引用列 中所述,可以省略方法名称 apply
:
col("column_name")("field_name")
col("column_name")(index)
例如,以下代码会从 :ref:` 示例数据 <label-sample_data_semistructured_data>` 的 src
列的对象内,选择 dealership
字段:
val df = session.table("car_sales")
df.select(col("src")("dealership")).show()
该代码会打印以下输出:
----------------------------
|"""SRC""['DEALERSHIP']" |
----------------------------
|"Valley View Auto Sales" |
|"Tindel Toyota" |
----------------------------
备注
DataFrame 中的值放在双引号之间,因为这些值以字符串字面量的形式返回。若要将这些值的类型转换为特定类型,请参阅 显式转换半结构化数据中的值。
还可以 链接多个方法调用,以遍历特定字段或元素的路径。
例如,以下代码选择 salesperson
对象中的 name
字段:
val df = session.table("car_sales")
df.select(col("src")("salesperson")("name")).show()
该代码会打印以下输出:
------------------------------------
|"""SRC""['SALESPERSON']['NAME']" |
------------------------------------
|"Frank Beasley" |
|"Greg Northrup" |
------------------------------------
再举一个例子,下面的代码选择 vehicle
字段的第一个元素,其中包含车辆数组。该示例还会选择第一个元素中的 price
字段。
val df = session.table("car_sales")
df.select(col("src")("vehicle")(0)).show()
df.select(col("src")("vehicle")(0)("price")).show()
该代码会打印以下输出:
---------------------------
|"""SRC""['VEHICLE'][0]" |
---------------------------
|{ |
| "extras": [ |
| "ext warranty", |
| "paint protection" |
| ], |
| "make": "Honda", |
| "model": "Civic", |
| "price": "20275", |
| "year": "2017" |
|} |
|{ |
| "extras": [ |
| "ext warranty", |
| "rust proofing", |
| "fabric protection" |
| ], |
| "make": "Toyota", |
| "model": "Camry", |
| "price": "23500", |
| "year": "2017" |
|} |
---------------------------
------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']" |
------------------------------------
|"20275" |
|"23500" |
------------------------------------
作为 apply
方法的替代方法,如果路径中的字段名称或元素不规则,并且导致难以使用 Column.apply
方法,则可以使用 ` get _、 get_ignore_case `_ 或 ` get_path `_ 函数。
例如,以下代码行都会打印对象中指定字段的值:
df.select(get(col("src"), lit("dealership"))).show()
df.select(col("src")("dealership")).show()
同样,以下代码行都会打印对象中指定路径处的字段值:
df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
df.select(col("src")("vehicle")(0)("make")).show()
显式转换半结构化数据中的值¶
默认情况下,字段和元素的值以字符串字面量(包括双引号)的形式返回,如上面的示例所示。
若要避免意外结果,请调用 :ref:` cast <label-snowpark_dataframe_cols_cast>` 方法,将值转换为特定类型。例如,以下代码会打印出未经类型转换和经过类型转换的值:
// Import the objects for the data types, including StringType.
import com.snowflake.snowpark.types._
...
val df = session.table("car_sales")
df.select(col("src")("salesperson")("id")).show()
df.select(col("src")("salesperson")("id").cast(StringType)).show()
该代码会打印以下输出:
----------------------------------
|"""SRC""['SALESPERSON']['ID']" |
----------------------------------
|"55" |
|"274" |
----------------------------------
---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)" |
---------------------------------------------------
|55 |
|274 |
---------------------------------------------------
将对象数组展平为行¶
如果需要将半结构化数据“展平”为 DataFrame (例如,为数组中的每个对象生成一行),请调用 ` DataFrame.flatten _ 方法。此方法与 :doc:/sql-reference/functions/flatten` SQL 函数等效。如果传入对象或数组的路径,该方法会返回一个 DataFrame,其中包含对象或数组中各字段或元素的行。
例如,在 :ref:` 示例数据 <label-sample_data_semistructured_data>` 中, src:customer
是一个包含有关客户的信息的对象数组。每个对象都包含 name
和 address
字段。
如果将此路径传递给 flatten
函数:
val df = session.table("car_sales")
df.flatten(col("src")("customer")).show()
该方法返回一个 DataFrame:
----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC" |"SEQ" |"KEY" |"PATH" |"INDEX" |"VALUE" |"THIS" |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{ |1 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "San Francisco, CA", | { |
| { | | | | | "name": "Joyce Ridgely", | "address": "San Francisco, CA", |
| "address": "San Francisco, CA", | | | | | "phone": "16504378889" | "name": "Joyce Ridgely", |
| "name": "Joyce Ridgely", | | | | |} | "phone": "16504378889" |
| "phone": "16504378889" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Valley View Auto Sales", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "55", | | | | | | |
| "name": "Frank Beasley" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "paint protection" | | | | | | |
| ], | | | | | | |
| "make": "Honda", | | | | | | |
| "model": "Civic", | | | | | | |
| "price": "20275", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
|{ |2 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "New York, NY", | { |
| { | | | | | "name": "Bradley Greenbloom", | "address": "New York, NY", |
| "address": "New York, NY", | | | | | "phone": "12127593751" | "name": "Bradley Greenbloom", |
| "name": "Bradley Greenbloom", | | | | |} | "phone": "12127593751" |
| "phone": "12127593751" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Tindel Toyota", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "274", | | | | | | |
| "name": "Greg Northrup" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "rust proofing", | | | | | | |
| "fabric protection" | | | | | | |
| ], | | | | | | |
| "make": "Toyota", | | | | | | |
| "model": "Camry", | | | | | | |
| "price": "23500", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
----------------------------------------------------------------------------------------------------------------------------------------------------------
从 DataFrame 中,您可以从 VALUE
字段中的每个对象选择 name
和 address
字段:
df.flatten(col("src")("customer")).select(col("value")("name"), col("value")("address")).show()
-------------------------------------------------
|"""VALUE""['NAME']" |"""VALUE""['ADDRESS']" |
-------------------------------------------------
|"Joyce Ridgely" |"San Francisco, CA" |
|"Bradley Greenbloom" |"New York, NY" |
-------------------------------------------------
以下代码 将值类型转换为特定类型 并更改列的名称,从而补充了上一个示例:
df.flatten(col("src")("customer")).select(col("value")("name").cast(StringType).as("Customer Name"), col("value")("address").cast(StringType).as("Customer Address")).show()
-------------------------------------------
|"Customer Name" |"Customer Address" |
-------------------------------------------
|Joyce Ridgely |San Francisco, CA |
|Bradley Greenbloom |New York, NY |
-------------------------------------------
执行 SQL 语句¶
若要执行您指定的 SQL 语句,请调用 Session
类中的 sql
方法,然后传入要执行的语句。该方法返回一个 DataFrame。
请注意,在您 :ref:` 调用操作方法 <label-snowpark_dataframe_action_method>` 之前, SQL 语句不会执行。
// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val dfStageFiles = session.sql("ls @myStage")
val files = dfStageFiles.collect()
files.foreach(println)
// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()
val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()
println("Count: " + numRows);
如果要 :ref:` 调用方法来转换 DataFrame <label-snowpark_dataframe_transform>` (例如 filter、select 等),请注意,仅当基础 SQL 语句是 SELECT 语句时,这些方法才有效。其他类型的 SQL 语句不支持转换方法。
val df = session.sql("select id, category_id, name from sample_product_data where id > 10")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("category_id") < 10).select(col("id")).collect()
results.foreach(println)
// In this example, the underlying SQL statement is not a SELECT statement.
val dfStageFiles = session.sql("ls @myStage")
// Calling the filter method results in an error.
dfStageFiles.filter(...)