在 Snowpark Python 中使用 DataFrames¶
在 Snowpark 中,主要通过 DataFrame 来查询和处理数据。本主题说明如何使用 DataFrames。
本主题内容:
要检索和操作数据,需要使用 DataFrame
类。DataFrame 表示延迟评估的关系型数据集,延迟评估是指仅在触发特定操作时执行。从某种意义上说, DataFrame 就像一个需要评估才能检索数据的查询。
要将数据检索到 DataFrame 之中,请执行以下步骤:
-
例如,您可以创建一个 DataFrame 来保存源自表、外部 CSV 文件、本地数据或 SQL 语句的执行的数据。
-
例如,可以指定应该选择哪些列、如何筛选行、如何对结果进行排序和分组等。
-
为了将数据检索到 DataFrame 之中,必须调用执行操作的方法(例如
collect()
方法)。
接下来的部分将更详细地介绍这些步骤。
设置本部分的示例¶
本部分的一些示例使用 DataFrame 查询名为 sample_product_data
的表。若要运行这些示例,可以执行以下 SQL 语句,以创建此表并用一些数据来填充此表。
可以使用 Snowpark Python 运行 SQL 语句:
>>> session.sql('CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT)').collect()
[Row(status='Table SAMPLE_PRODUCT_DATA successfully created.')]
>>> session.sql("""
... INSERT INTO sample_product_data VALUES
... (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
... (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
... (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
... (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
... (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
... (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
... (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
... (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
... (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
... (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
... (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
... (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100)
... """).collect()
[Row(number of rows inserted=12)]
要验证表是否已创建,请运行:
>>> session.sql("SELECT count(*) FROM sample_product_data").collect()
[Row(COUNT(*)=12)]
在 Python 工作表中设置示例¶
要在 :doc:` Python 工作表 </developer-guide/snowpark/python/python-worksheets>` 中设置和运行这些示例,请创建示例表,并设置您的 Python 工作表。
创建 SQL 工作表并运行以下命令:
CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT); INSERT INTO sample_product_data VALUES (1, 0, 5, 'Product 1', 'prod-1', 1, 10), (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20), (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30), (4, 0, 10, 'Product 2', 'prod-2', 2, 40), (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50), (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60), (7, 0, 20, 'Product 3', 'prod-3', 3, 70), (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80), (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90), (10, 0, 50, 'Product 4', 'prod-4', 4, 100), (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100), (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100); SELECT count(*) FROM sample_product_data;
创建 Python 工作表,设置与用于创建
sample_product_data
表的 SQL 工作表相同的数据库和架构上下文。
要在 Python 工作表中使用本主题中的示例,请在处理程序函数(例如 main
)中使用该示例,并使用传递到函数中的 Session
对象创建 DataFrames。
例如,调用 session
对象的 table
方法,为表创建一个 DataFrame :
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
def main(session: snowpark.Session):
df_table = session.table("sample_product_data")
要查看该函数生成的输出(例如通过调用 DataFrame 对象的 show
方法),请使用 Output 选项卡。
要检查该函数返回的值,请从 Settings » Return type 中选择返回值的数据类型,然后使用 Results 选项卡:
如果函数返回 DataFrame,则使用默认返回类型 Table。
如果函数从 DataFrame 对象的
collect
方法返回Row
的list
,请为返回类型使用 Variant。如果函数返回可通过类型转换操作转换为字符串的其他任何值,或者函数未返回值,请使用 String 作为返回类型。
有关更多详细信息,请参阅 运行 Python 工作表。
构造 DataFrame¶
要构造 DataFrame,可以使用 Session
类的方法和属性。以下每种方法都基于不同类型的数据源构造 DataFrame。
可以在本地开发环境中运行这些示例,也可以在 :doc:` Python 工作表 </developer-guide/snowpark/python/python-worksheets>` 中定义的 main
函数中调用它们。
要基于表、视图或流中的数据创建 DataFrame,请调用
table
方法:>>> # Create a DataFrame from the data in the "sample_product_data" table. >>> df_table = session.table("sample_product_data") # To print out the first 10 rows, call df_table.show()
要基于指定值创建 DataFrame,请调用
create_dataframe
方法:>>> # Create a DataFrame with one column named a from specified values. >>> df1 = session.create_dataframe([1, 2, 3, 4]).to_df("a") >>> df1.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df1 ------- |"A" | ------- |1 | |2 | |3 | |4 | -------
创建一个包含 4 列的 DataFrame,列分别为“a”、“b”、“c”和“d”:
>>> # Create a DataFrame with 4 columns, "a", "b", "c" and "d". >>> df2 = session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"]) >>> df2.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df2 ------------------------- |"A" |"B" |"C" |"D" | ------------------------- |1 |2 |3 |4 | -------------------------
再创建一个包含 4 列的 DataFrame,列分别为“a”、“b”、“c”和“d”:
>>> # Create another DataFrame with 4 columns, "a", "b", "c" and "d". >>> from snowflake.snowpark import Row >>> df3 = session.create_dataframe([Row(a=1, b=2, c=3, d=4)]) >>> df3.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df3 ------------------------- |"A" |"B" |"C" |"D" | ------------------------- |1 |2 |3 |4 | -------------------------
创建 DataFrame 并指定架构:
>>> # Create a DataFrame and specify a schema >>> from snowflake.snowpark.types import IntegerType, StringType, StructType, StructField >>> schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())]) >>> df4 = session.create_dataframe([[1, "snow"], [3, "flake"]], schema) >>> df4.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df4 --------------- |"A" |"B" | --------------- |1 |snow | |3 |flake | ---------------
要创建包含特定值范围的 DataFrame,请调用
range
方法:>>> # Create a DataFrame from a range >>> # The DataFrame contains rows with values 1, 3, 5, 7, and 9 respectively. >>> df_range = session.range(1, 10, 2).to_df("a") >>> df_range.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df_range ------- |"A" | ------- |1 | |3 | |5 | |7 | |9 | -------
要创建 DataFrame 来保存暂存区内文件中的数据,请使用
read
属性获取DataFrameReader
对象。在DataFrameReader
对象中,调用与文件中的数据格式对应的方法:>>> from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType >>> # Create DataFrames from data in a stage. >>> df_json = session.read.json("@my_stage2/data1.json") >>> df_catalog = session.read.schema(StructType([StructField("name", StringType()), StructField("age", IntegerType())])).csv("@stage/some_dir")
要创建 DataFrame 来保存 SQL 查询的结果,请调用
sql
方法:>>> # Create a DataFrame from a SQL query >>> df_sql = session.sql("SELECT name from sample_product_data") >>> df_sql.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df_sql -------------- |"NAME" | -------------- |Product 1 | |Product 1A | |Product 1B | |Product 2 | |Product 2A | |Product 2B | |Product 3 | |Product 3A | |Product 3B | |Product 4 | --------------
可以使用
sql
方法执行 SELECT 语句,从表和暂存文件中检索数据,但使用table
方法和read
属性可在开发工具中提供更好的语法突出显示、错误突出显示和智能代码补全。
指定应该如何转换数据集¶
要指定需要选择的列,以及如何对结果执行筛选、排序、分组等操作,请调用用于转换数据集的 DataFrame 方法。要在这些方法中标识列,请使用 col
函数或计算结果为列的表达式。请参阅 指定列和表达式。
例如:
若要指定应返回的行,请调用
filter
方法:>>> # Import the col function from the functions module. >>> # Python worksheets import this function by default >>> from snowflake.snowpark.functions import col >>> # Create a DataFrame for the rows with the ID 1 >>> # in the "sample_product_data" table. >>> # This example uses the == operator of the Column object to perform an >>> # equality check. >>> df = session.table("sample_product_data").filter(col("id") == 1) >>> df.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df ------------------------------------------------------------------------------------ |"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" | ------------------------------------------------------------------------------------ |1 |0 |5 |Product 1 |prod-1 |1 |10 | ------------------------------------------------------------------------------------
若要指定应选择的列,请调用
select
方法:>>> # Import the col function from the functions module. >>> from snowflake.snowpark.functions import col >>> # Create a DataFrame that contains the id, name, and serial_number >>> # columns in the "sample_product_data" table. >>> df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number")) >>> df.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df --------------------------------------- |"ID" |"NAME" |"SERIAL_NUMBER" | --------------------------------------- |1 |Product 1 |prod-1 | |2 |Product 1A |prod-1-A | |3 |Product 1B |prod-1-B | |4 |Product 2 |prod-2 | |5 |Product 2A |prod-2-A | |6 |Product 2B |prod-2-B | |7 |Product 3 |prod-3 | |8 |Product 3A |prod-3-A | |9 |Product 3B |prod-3-B | |10 |Product 4 |prod-4 | ---------------------------------------
您还可以按如下方式引用列:
>>> # Import the col function from the functions module. >>> from snowflake.snowpark.functions import col >>> df_product_info = session.table("sample_product_data") >>> df1 = df_product_info.select(df_product_info["id"], df_product_info["name"], df_product_info["serial_number"]) >>> df2 = df_product_info.select(df_product_info.id, df_product_info.name, df_product_info.serial_number) >>> df3 = df_product_info.select("id", "name", "serial_number")
每个方法都返回一个经过转换的新 DataFrame 对象。该方法不会影响原始 DataFrame 对象。要应用多个转换,可以 将多个方法调用链接起来,基于前一个方法调用返回的新 DataFrame 对象调用每个后续转换方法。
这些转换方法指定了如何构造 SQL 语句,并且不会从 Snowflake 数据库检索数据。执行操作以计算 DataFrame 中描述的操作方法会执行数据检索。
联接 DataFrames¶
若要联接 DataFrame 对象,请调用 join
方法:
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> # Create a DataFrame that joins the two DataFrames
>>> # on the column named "key".
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2").show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2")
-------------------------------
|"KEY" |"VALUE1" |"VALUE2" |
-------------------------------
|a |1 |3 |
|b |2 |4 |
-------------------------------
如果两个 DataFrames 都包含要用于联接的相同列,则可以使用以下示例语法:
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> # If both dataframes have the same column "key", the following is more convenient.
>>> df_lhs.join(df_rhs, ["key"]).show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, ["key"])
-------------------------------
|"KEY" |"VALUE1" |"VALUE2" |
-------------------------------
|a |1 |3 |
|b |2 |4 |
-------------------------------
您还可以使用 & 运算符来连接多个联接表达式:
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> # Use & operator connect join expression. '|' and ~ are similar.
>>> df_joined_multi_column = df_lhs.join(df_rhs, (df_lhs.col("key") == df_rhs.col("key")) & (df_lhs.col("value1") < df_rhs.col("value2"))).select(df_lhs["key"].as_("key"), "value1", "value2")
>>> df_joined_multi_column.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_joined_multi_column
-------------------------------
|"KEY" |"VALUE1" |"VALUE2" |
-------------------------------
|a |1 |3 |
|b |2 |4 |
-------------------------------
如果要执行自联接,则必须复制 DataFrame:
>>> # copy the DataFrame if you want to do a self-join
>>> from copy import copy
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> df_lhs_copied = copy(df_lhs)
>>> df_self_joined = df_lhs.join(df_lhs_copied, (df_lhs.col("key") == df_lhs_copied.col("key")) & (df_lhs.col("value1") == df_lhs_copied.col("value1")))
如果 DataFrames 中存在重叠的列,Snowpark 会在联接结果中的列前面附加一个随机生成的前缀:
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"))
-----------------------------------------------------
|"l_av5t_KEY" |"VALUE1" |"r_1p6k_KEY" |"VALUE2" |
-----------------------------------------------------
|a |1 |a |3 |
|b |2 |b |4 |
-----------------------------------------------------
可以使用 Column.alias
来重命名重叠的列:
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].alias("key1"), df_rhs["key"].alias("key2"), "value1", "value2").show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].alias("key1"), df_rhs["key"].alias("key2"), "value1", "value2")
-----------------------------------------
|"KEY1" |"KEY2" |"VALUE1" |"VALUE2" |
-----------------------------------------
|a |a |1 |3 |
|b |b |2 |4 |
-----------------------------------------
要避免使用随机前缀,还可以指定要追加到重叠列的后缀:
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"), lsuffix="_left", rsuffix="_right").show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"), lsuffix="_left", rsuffix="_right")
--------------------------------------------------
|"KEY_LEFT" |"VALUE1" |"KEY_RIGHT" |"VALUE2" |
--------------------------------------------------
|a |1 |a |3 |
|b |2 |b |4 |
--------------------------------------------------
这些示例使用 DataFrame.col
来指定要在联接中使用的列。有关指定列的更多方法,请参阅 指定列和表达式。
如果需要基于不同的列将表与其自身联接,则不能使用单个 DataFrame 执行自联接。以下示例使用单个 DataFrame 执行自联接,操作会失败,因为联接的左侧和右侧都存在 "id"
的列表达式:
>>> from snowflake.snowpark.exceptions import SnowparkJoinException
>>> df = session.table("sample_product_data")
>>> # This fails because columns named "id" and "parent_id"
>>> # are in the left and right DataFrames in the join.
>>> try:
... df_joined = df.join(df, col("id") == col("parent_id")) # fails
... except SnowparkJoinException as e:
... print(e.message)
You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy.
>>> # This fails because columns named "id" and "parent_id"
>>> # are in the left and right DataFrames in the join.
>>> try:
... df_joined = df.join(df, df["id"] == df["parent_id"]) # fails
... except SnowparkJoinException as e:
... print(e.message)
You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy.
相反,请使用 Python 内置的 copy()
方法创建 DataFrame 对象的克隆,并使用这两个 DataFrame 对象执行联接:
>>> from copy import copy
>>> # Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
>>> df_lhs = session.table("sample_product_data")
>>> # Clone the DataFrame object to use as the right-hand side of the join.
>>> df_rhs = copy(df_lhs)
>>> # Create a DataFrame that joins the two DataFrames
>>> # for the "sample_product_data" table on the
>>> # "id" and "parent_id" columns.
>>> df_joined = df_lhs.join(df_rhs, df_lhs.col("id") == df_rhs.col("parent_id"))
>>> df_joined.count()
8
指定列和表达式¶
调用这些转换方法时,可能需要指定列或者使用列的表达式。例如,调用 select
方法时,需要指定要选择的列。
要引用列,请通过调用 snowflake.snowpark.functions
模块中的 col
函数来创建 Column
对象。
>>> # Import the col function from the functions module.
>>> from snowflake.snowpark.functions import col
>>> df_product_info = session.table("sample_product_data").select(col("id"), col("name"))
>>> df_product_info.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_product_info
---------------------
|"ID" |"NAME" |
---------------------
|1 |Product 1 |
|2 |Product 1A |
|3 |Product 1B |
|4 |Product 2 |
|5 |Product 2A |
|6 |Product 2B |
|7 |Product 3 |
|8 |Product 3A |
|9 |Product 3B |
|10 |Product 4 |
---------------------
备注
要为字面量创建 Column
对象,请参阅 将字面量用作列对象。
指定筛选器、投影、联接条件等时,可以在表达式中使用 Column
对象。例如:
可以将
Column
对象与filter
方法一起使用,以指定筛选条件:>>> # Specify the equivalent of "WHERE id = 20" >>> # in a SQL SELECT statement. >>> df_filtered = df.filter(col("id") == 20)
>>> df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"]) >>> # Specify the equivalent of "WHERE a + b < 10" >>> # in a SQL SELECT statement. >>> df_filtered = df.filter((col("a") + col("b")) < 10) >>> df_filtered.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df_filtered ------------- |"A" |"B" | ------------- |1 |3 | -------------
可以将
Column
对象与select
方法一起使用,以定义别名:>>> df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"]) >>> # Specify the equivalent of "SELECT b * 10 AS c" >>> # in a SQL SELECT statement. >>> df_selected = df.select((col("b") * 10).as_("c")) >>> df_selected.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df_selected ------- |"C" | ------- |30 | |100 | -------
可以将
Column
对象与join
方法一起使用,以定义联接条件:>>> dfX = session.create_dataframe([[1], [2]], schema=["a_in_X"]) >>> dfY = session.create_dataframe([[1], [3]], schema=["b_in_Y"]) >>> # Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y" >>> # in a SQL SELECT statement. >>> df_joined = dfX.join(dfY, col("a_in_X") == col("b_in_Y")).select(dfX["a_in_X"].alias("the_joined_column")) >>> df_joined.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df_joined ----------------------- |"THE_JOINED_COLUMN" | ----------------------- |1 | -----------------------
引用两个不同 DataFrame 对象中具有相同名称的列时(例如,基于该列联接 DataFrames ),可以在一个 DataFrame 对象中使用 DataFrame.col
方法引用该对象中的列(例如 df1.col("name")
和 df2.col("name")
)。
下面的示例演示了如何使用 DataFrame.col
方法来引用特定 DataFrame 中的列。该示例联接两个 DataFrame 对象,两者均具有名为 key
的列。该示例使用 Column.as
方法来更改新创建 DataFrame 中的列名称。
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value"])
>>> # Create a DataFrame that joins two other DataFrames (df_lhs and df_rhs).
>>> # Use the DataFrame.col method to refer to the columns used in the join.
>>> df_joined = df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs.col("key").as_("key"), df_lhs.col("value").as_("L"), df_rhs.col("value").as_("R"))
>>> df_joined.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_joined
---------------------
|"KEY" |"L" |"R" |
---------------------
|a |1 |3 |
|b |2 |4 |
---------------------
将对象标识符(表名称、列名称等)放在双引号里¶
您指定的数据库、架构、表和暂存区的名称必须符合 Snowflake 标识符要求。
创建一个包含区分大小写的列的表:
>>> session.sql("""
... create or replace temp table "10tablename"(
... id123 varchar, -- case insensitive because it's not quoted.
... "3rdID" varchar, -- case sensitive.
... "id with space" varchar -- case sensitive.
... )""").collect()
>>> # Add return to the statement to return the collect() results in a Python worksheet
[Row(status='Table 10tablename successfully created.')]
随后向表中添加值:
>>> session.sql("""insert into "10tablename" (id123, "3rdID", "id with space") values ('a', 'b', 'c')""").collect()
>>> # Add return to the statement to return the collect() results in a Python worksheet
[Row(number of rows inserted=1)]
随后为表创建一个 DataFrame,并查询该表:
>>> df = session.table('"10tablename"')
>>> df.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df
---------------------------------------
|"ID123" |"3rdID" |"id with space" |
---------------------------------------
|a |b |c |
---------------------------------------
指定名称时,Snowflake 会将该名称视为大写形式。例如,以下调用是等效的:
>>> df.select(col("id123")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(ID123='a')]
>>> df.select(col("ID123")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(ID123='a')]
如果名称不符合标识符要求,则必须将名称放在双引号里 ("
)。使用反斜杠 (\
) 对字符串字面量中的双引号字符进行转义。例如,以下表名称并非以字母或下划线开头,因此必须将该名称放在双引号里:
>>> df = session.table("\"10tablename\"")
也可以使用单引号而非反斜杠来转义字符串字面量中的双引号字符。
>>> df = session.table('"10tablename"')
请注意,指定 :emph:` 列 ` 的名称时,不需要将名称放在双引号里。如果列名称不符合标识符要求,Snowpark 库会自动将列名称放在双引号里:
>>> df.select(col("3rdID")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(3rdID='b')]
>>> df.select(col("\"3rdID\"")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(3rdID='b')]
再举一个例子,以下调用是等效的:
>>> df.select(col("id with space")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
>>> df.select(col("\"id with space\"")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
如果已在列名称两侧添加了双引号,则该库不会在列名称两侧插入其他双引号。
在某些情况下,列名称可能包含双引号字符:
>>> session.sql('''
... create or replace temp table quoted(
... "name_with_""air""_quotes" varchar,
... """column_name_quoted""" varchar
... )''').collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Table QUOTED successfully created.')]
>>> session.sql('''insert into quoted ("name_with_""air""_quotes", """column_name_quoted""") values ('a', 'b')''').collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(number of rows inserted=1)]
如 标识符要求 中所述,对于带双引号的标识符中的每个双引号字符,都必须使用两个双引号字符(例如 "name_with_""air""_quotes"
和 """column_name_quoted"""
):
>>> df_table = session.table("quoted")
>>> df_table.select("\"name_with_\"\"air\"\"_quotes\"").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(name_with_"air"_quotes='a')]
>>> df_table.select("\"\"\"column_name_quoted\"\"\"").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row("column_name_quoted"='b')]
如果将标识符放在双引号里(无论是您显式添加了引号,还是由库为您添加了引号), Snowflake 都会将标识符视为区分大小写:
>>> # The following calls are NOT equivalent!
>>> # The Snowpark library adds double quotes around the column name,
>>> # which makes Snowflake treat the column name as case-sensitive.
>>> df.select(col("id with space")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
对比此示例:
>>> from snowflake.snowpark.exceptions import SnowparkSQLException
>>> try:
... df.select(col("ID WITH SPACE")).collect()
... except SnowparkSQLException as e:
... print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 7
invalid identifier '"ID WITH SPACE"'
将字面量用作列对象¶
若要在将 Column
对象作为实参的方法中使用字面量,请将字面量传递给 snowflake.snowpark.functions
模块中的 lit
函数,从而为字面量创建 Column
对象。例如:
>>> # Import for the lit and col functions.
>>> from snowflake.snowpark.functions import col, lit
>>> # Show the first 10 rows in which num_items is greater than 5.
>>> # Use `lit(5)` to create a Column object for the literal 5.
>>> df_filtered = df.filter(col("num_items") > lit(5))
将列对象的类型转换为特定类型¶
要将 Column
对象的类型转换为特定类型,请调用 cast
方法,然后从 snowflake.snowpark.types
模块传入类型对象。例如,要将字面量转换为精度为 5、小数位数为 2 的 NUMBER,请运行以下语句:
>>> # Import for the lit function.
>>> from snowflake.snowpark.functions import lit
>>> # Import for the DecimalType class.
>>> from snowflake.snowpark.types import DecimalType
>>> decimal_value = lit(0.05).cast(DecimalType(5,2))
链接多个方法调用¶
由于每个 转换 DataFrame 对象的方法 都会返回一个应用了转换的新 DataFrame 对象,因此您可以 链接多个方法调用 (link removed),以生成以其他方式转换的新 DataFrame。
下面的示例返回为如下目的而配置的 DataFrame :
查询
sample_product_data
表。返回
id = 1
的行。选择
name
和serial_number
列。>>> df_product_info = session.table("sample_product_data").filter(col("id") == 1).select(col("name"), col("serial_number")) >>> df_product_info.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df_product_info ------------------------------- |"NAME" |"SERIAL_NUMBER" | ------------------------------- |Product 1 |prod-1 | -------------------------------
在此示例中:
session.table("sample_product_data")
返回sample_product_data
表的 DataFrame。尽管 DataFrame 尚且不包含表中的数据,但该对象确实包含表列的定义。
filter(col("id") == 1)
返回sample_product_data
表的 DataFrame(设置为返回id = 1
的行)。请注意, DataFrame 还不包含表中匹配的行。在 :ref:` 调用操作方法 <label-snowpark_python_dataframe_action_method>` 之前,不会检索匹配的行。
select(col("name"), col("serial_number"))
返回一个 DataFrame,包含sample_product_data
表中id = 1
的行的name
和serial_number
列。
在链接多个方法调用时,调用顺序非常重要。每个方法调用都返回一个已转换的 DataFrame。确保后续调用使用已转换的 DataFrame。
例如,在下面的代码中,select
方法返回仅包含两列的 DataFrame:name
和 serial_number
。对此 DataFrame 的 filter
方法调用会失败,因为它使用的 id
列不在已转换的 DataFrame 中。
>>> # This fails with the error "invalid identifier 'ID'."
>>> df_product_info = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") == 1)
>>> try:
... df_product_info.show()
... except SnowparkSQLException as e:
... print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 121
invalid identifier 'ID'
相反,以下代码会成功执行,因为对包含 sample_product_data
表中所有列(包括 id
列)的 DataFrame 调用了 filter()
方法:
>>> # This succeeds because the DataFrame returned by the table() method
>>> # includes the "id" column.
>>> df_product_info = session.table("sample_product_data").filter(col("id") == 1).select(col("name"), col("serial_number"))
>>> df_product_info.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_product_info
-------------------------------
|"NAME" |"SERIAL_NUMBER" |
-------------------------------
|Product 1 |prod-1 |
-------------------------------
使用 Snowpark Python 时,可能需要按照不同于在 SQL 语句中使用等效关键字(SELECT 和 WHERE)时的顺序执行 select
和 filter
方法调用。
检索列定义¶
要在数据集中为 DataFrame 检索列的定义,请调用 schema
属性。此方法会返回一个 StructType
对象,其中包含 StructField
对象的 list
。每个 StructField
对象都包含一列的定义。
# Import the StructType
from snowflake.snowpark.types import *
# Get the StructType object that describes the columns in the
# underlying rowset.
table_schema = session.table("sample_product_data").schema
table_schema
StructType([StructField('ID', LongType(), nullable=True), StructField('PARENT_ID', LongType(), nullable=True), StructField('CATEGORY_ID', LongType(), nullable=True), StructField('NAME', StringType(), nullable=True), StructField('SERIAL_NUMBER', StringType(), nullable=True), StructField('KEY', LongType(), nullable=True), StructField('"3rd"', LongType(), nullable=True)])
在返回的 StructType
对象中,列名称始终是规范化的。不带引号的标识符以大写形式返回,带引号的标识符以其定义所用的确切大小写形式返回。
下面的示例创建一个 DataFrame,其中包含名为 ID
和 3rd
的列。对于名为 3rd
的列,Snowpark 库会自动将名称放在双引号里(即 "3rd"
),因为 该名称不符合标识符要求。
该示例调用 schema
属性,随后调用返回的 StructType
对象的 names
属性,以获取列名称的 list
。这些名称使用 schema
属性返回的 StructType
进行规范化。
>>> # Create a DataFrame containing the "id" and "3rd" columns.
>>> df_selected_columns = session.table("sample_product_data").select(col("id"), col("3rd"))
>>> # Print out the names of the columns in the schema. This prints out:
>>> # This prints List["ID", "\"3rd\""]
>>> df_selected_columns.schema.names
['ID', '"3rd"']
执行操作以计算 DataFrame¶
如前所述, DataFrame 是延迟计算的,也就是说,在您执行操作之前, SQL 语句不会发送到服务器执行。执行操作会导致对 DataFrame 进行计算,并将相应的 SQL 语句发送到服务器执行。
以下方法可执行操作:
类 |
方法 |
描述 |
---|---|---|
|
|
计算 DataFrame,并将生成的数据集作为 |
|
|
计算 DataFrame 并返回行数。 |
|
|
计算 DataFrame,并将行打印到控制台。此方法将行数限制为 10 行(默认值)。 |
|
|
将 DataFrame 中的数据保存到指定表中。请参阅 将数据保存到表中。 |
例如,若要对表执行查询并返回结果,请调用 collect
方法:
>>> # Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
>>> # This does not execute the query.
>>> df = session.table("sample_product_data").select(col("id"), col("name"))
>>> # Send the query to the server for execution and
>>> # return a list of Rows containing the results.
>>> results = df.collect()
>>> # Use a return statement to return the collect() results in a Python worksheet
>>> # return results
若要执行查询并返回结果数量,请调用 count
方法:
>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")
>>> # Send the query to the server for execution and
>>> # print the count of rows in the table.
>>> print(df_products.count())
12
若要执行查询并将结果打印到控制台,请调用 show
方法:
>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")
>>> # Send the query to the server for execution and
>>> # print the results to the console.
>>> # The query limits the number of rows to 10 by default.
>>> df_products.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_products
-------------------------------------------------------------------------------------
|"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" |
-------------------------------------------------------------------------------------
|1 |0 |5 |Product 1 |prod-1 |1 |10 |
|2 |1 |5 |Product 1A |prod-1-A |1 |20 |
|3 |1 |5 |Product 1B |prod-1-B |1 |30 |
|4 |0 |10 |Product 2 |prod-2 |2 |40 |
|5 |4 |10 |Product 2A |prod-2-A |2 |50 |
|6 |4 |10 |Product 2B |prod-2-B |2 |60 |
|7 |0 |20 |Product 3 |prod-3 |3 |70 |
|8 |7 |20 |Product 3A |prod-3-A |3 |80 |
|9 |7 |20 |Product 3B |prod-3-B |3 |90 |
|10 |0 |50 |Product 4 |prod-4 |4 |100 |
-------------------------------------------------------------------------------------
若要将行数限制为 20 行:
>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")
>>> # Limit the number of rows to 20, rather than 10.
>>> df_products.show(20)
>>> # All rows are returned when you use return in a Python worksheet to return the DataFrame as a table
>>> # return df_products
-------------------------------------------------------------------------------------
|"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" |
-------------------------------------------------------------------------------------
|1 |0 |5 |Product 1 |prod-1 |1 |10 |
|2 |1 |5 |Product 1A |prod-1-A |1 |20 |
|3 |1 |5 |Product 1B |prod-1-B |1 |30 |
|4 |0 |10 |Product 2 |prod-2 |2 |40 |
|5 |4 |10 |Product 2A |prod-2-A |2 |50 |
|6 |4 |10 |Product 2B |prod-2-B |2 |60 |
|7 |0 |20 |Product 3 |prod-3 |3 |70 |
|8 |7 |20 |Product 3A |prod-3-A |3 |80 |
|9 |7 |20 |Product 3B |prod-3-B |3 |90 |
|10 |0 |50 |Product 4 |prod-4 |4 |100 |
|11 |10 |50 |Product 4A |prod-4-A |4 |100 |
|12 |10 |50 |Product 4B |prod-4-B |4 |100 |
-------------------------------------------------------------------------------------
备注
如果调用 schema
属性以获取 DataFrame 中列的定义,则不需要调用操作方法。
以 Pandas DataFrame 的形式返回 DataFrame 的内容¶
要以 Pandas DataFrame 的形式返回 DataFrame 的内容,请使用 to_pandas
方法。
例如:
>>> python_df = session.create_dataframe(["a", "b", "c"])
>>> pandas_df = python_df.to_pandas()
将数据保存到表中¶
要将 DataFrame 的内容保存到表中,请执行以下操作:
调用
write
属性以获取DataFrameWriter
对象。调用
DataFrameWriter
对象中的mode
方法并指定模式。有关更多信息,请参阅 ` API 文档 <https://docs.snowflake.cn/developer-guide/snowpark/reference/python/latest/api/snowflake.snowpark.DataFrameWriter.mode#snowflake.snowpark.DataFrameWriter.mode>`_。此方法返回使用指定模式进行了配置的新DataFrameWriter
对象。在
DataFrameWriter
对象中调用save_as_table
方法,以将 DataFrame 的内容保存到指定的表中。
请注意,无需调用单独的方法(例如 collect
)来执行将数据保存到表中的 SQL 语句。
例如:
>>> df.write.mode("overwrite").save_as_table("table1")
基于 DataFrame 创建视图¶
若要基于 DataFrame 创建视图,请调用 create_or_replace_view
方法,该方法会立即创建新视图:
>>> import os
>>> database = os.environ["snowflake_database"] # use your own database and schema
>>> schema = os.environ["snowflake_schema"]
>>> view_name = "my_view"
>>> df.create_or_replace_view(f"{database}.{schema}.{view_name}")
[Row(status='View MY_VIEW successfully created.')]
在 Python 工作表中,由于工作表在数据库和架构的上下文中运行,您可以运行以下命令来创建视图:
# Define a DataFrame
df_products = session.table("sample_product_data")
# Define a View name
view_name = "my_view"
# Create the view
df_products.create_or_replace_view(f"{view_name}")
# return the view name
return view_name + " successfully created"
my_view successfully created
通过调用 create_or_replace_view
创建的视图是持久保留的。如果不再需要该视图,可以 手动删除视图。
也可使用 create_or_replace_temp_view
方法来创建临时视图。临时视图仅在创建临时视图的会话中可用。
处理暂存区中的文件¶
本节介绍如何查询 Snowflake 暂存区中文件内的数据。要对文件执行其他操作,请 使用 SQL 语句。
要查询 Snowflake 暂存区中文件内的数据,请使用 DataFrameReader
类:
调用
Session
类中的read
方法,以访问DataFrameReader
对象。如果文件采用 CSV 格式,请描述文件中的字段。要这样做,请执行以下操作:
创建一个
StructType
对象,该对象包含描述文件中字段的StructField
对象list
。对于每个
StructField
对象,请指定以下内容:字段的名称。
字段的数据类型(在
snowflake.snowpark.types
模块中指定为对象)。字段是否可为 null。
例如:
>>> from snowflake.snowpark.types import * >>> schema_for_data_file = StructType([ ... StructField("id", StringType()), ... StructField("name", StringType()) ... ])
调用
DataFrameReader
对象中的schema
属性,传入StructType
对象。例如:
>>> df_reader = session.read.schema(schema_for_data_file)
schema
属性会返回一个DataFrameReader
对象,该对象配置为读取包含指定字段的文件。请注意,对于其他格式(如 JSON)的文件,无需执行此操作。对于这些文件,
DataFrameReader
会将数据视为字段名称为$1
、类型为 VARIANT 的单个字段处理。
如果需要指定有关数据读取方式的其他信息(例如,数据经过压缩,或者 CSV 文件使用分号而非逗号来分隔字段),请调用
DataFrameReader
对象的option
或options
方法。option
方法接受要设置的选项的名称和值,并允许合并多个链接的调用,options
方法接受选项名称及其相应值构成的字典。有关文件格式选项的名称和值,请参阅 有关 CREATE FILE FORMAT 的文档。
您还可以设置 :doc:` 有关 COPY INTO TABLE 的文档 </sql-reference/sql/copy-into-table>` 中所述的复制选项。请注意,当您 :ref:`将数据检索到 DataFrame 中 <label-snowpark_python_dataframe_action_method>`时,设置复制选项可能造成费用更昂贵的执行策略。
下面的示例设置了
DataFrameReader
对象,以查询未压缩且使用分号作为字段分隔符的 CSV 文件中的数据。>>> df_reader = df_reader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
option
和options
方法返回使用指定选项配置的DataFrameReader
对象。调用与文件格式相对应的方法(例如
csv
方法),传入文件的位置。>>> df = df_reader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
与文件格式对应的方法返回一个 DataFrame 对象,该对象配置为保存该文件中的数据。
使用 DataFrame 对象方法,对 :ref:` 数据集执行所需的任何转换 <label-snowpark_python_dataframe_transform>` (例如,选择特定字段、筛选行等)。
例如,要从名为
my_stage
的暂存区中的一个 JSON 文件内提取color
元素:>>> # Import the sql_expr function from the functions module. >>> from snowflake.snowpark.functions import sql_expr >>> df = session.read.json("@my_stage").select(sql_expr("$1:color"))
如前所述,对于格式并非 CSV 的文件(例如 JSON 格式的文件),
DataFrameReader
会将文件中的数据视为名为$1
的单个 VARIANT 列。此示例使用
snowflake.snowpark.functions
模块中的sql_expr
函数来指定color
元素的路径。请注意,
sql_expr
函数不会解释或修改输入实参。该函数仅允许您使用 SQL 构造 Snowpark API 还不支持的表达式和代码片段。调用操作方法 来查询文件中的数据。
与表的 DataFrames 一样,在您调用操作方法之前,不会将数据检索到 DataFrame 中。
使用半结构化数据¶
使用 DataFrame,您可以查询和访问 :doc:` 半结构化数据 </user-guide/semistructured-intro>` (例如 JSON 数据)。接下来的几个部分将介绍如何在 DataFrame 中处理半结构化数据。
备注
这些部分中的示例使用 示例中使用的示例数据 中的示例数据。
遍历半结构化数据¶
若要引用半结构化数据中的特定字段或元素,请使用 Column
对象的以下方法:
获取
col_object["<field_name>"]
属性,以返回 OBJECT(或包含 OBJECT 的 VARIANT )中字段的Column
对象。使用
col_object[<index>]
,返回 ARRAY(或包含 ARRAY 的 VARIANT)中元素的Column
对象。
备注
如果路径中的字段名称或元素不规则,并且导致难以使用上述索引,则可以使用 get
、get_ignore_case
或 get_path
作为替代方法。
例如,以下代码会从 :ref:` 示例数据 <label-sample_data_semistructured_data>` 的 src
列的对象内,选择 dealership
字段:
>>> from snowflake.snowpark.functions import col
>>> df = session.table("car_sales")
>>> df.select(col("src")["dealership"]).show()
该代码会打印以下输出:
----------------------------
|"""SRC""['DEALERSHIP']" |
----------------------------
|"Valley View Auto Sales" |
|"Tindel Toyota" |
----------------------------
备注
DataFrame 中的值放在双引号之间,因为这些值以字符串字面量的形式返回。若要将这些值的类型转换为特定类型,请参阅 显式转换半结构化数据中的值。
还可以 链接多个方法调用,以遍历特定字段或元素的路径。
例如,以下代码选择 salesperson
对象中的 name
字段:
>>> df = session.table("car_sales")
>>> df.select(df["src"]["salesperson"]["name"]).show()
该代码会打印以下输出:
------------------------------------
|"""SRC""['SALESPERSON']['NAME']" |
------------------------------------
|"Frank Beasley" |
|"Greg Northrup" |
------------------------------------
再举一个例子,下面的代码选择 vehicle
字段的第一个元素,其中包含车辆数组。该示例还会选择第一个元素中的 price
字段。
>>> df = session.table("car_sales")
>>> df.select(df["src"]["vehicle"][0]).show()
>>> df.select(df["src"]["vehicle"][0]["price"]).show()
该代码会打印以下输出:
---------------------------
|"""SRC""['VEHICLE'][0]" |
---------------------------
|{ |
| "extras": [ |
| "ext warranty", |
| "paint protection" |
| ], |
| "make": "Honda", |
| "model": "Civic", |
| "price": "20275", |
| "year": "2017" |
|} |
|{ |
| "extras": [ |
| "ext warranty", |
| "rust proofing", |
| "fabric protection" |
| ], |
| "make": "Toyota", |
| "model": "Camry", |
| "price": "23500", |
| "year": "2017" |
|} |
---------------------------
------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']" |
------------------------------------
|"20275" |
|"23500" |
------------------------------------
除了上述访问字段的方法之外,还有一种替代方法,如果路径中的字段名称或元素不规则,您可以使用 get
、get_ignore_case
或 get_path
函数。
例如,以下代码行都会打印对象中指定字段的值:
>>> from snowflake.snowpark.functions import get, get_path, lit
>>> df.select(get(col("src"), lit("dealership"))).show()
>>> df.select(col("src")["dealership"]).show()
同样,以下代码行都会打印对象中指定路径处的字段值:
>>> df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
>>> df.select(col("src")["vehicle"][0]["make"]).show()
显式转换半结构化数据中的值¶
默认情况下,字段和元素的值以字符串字面量(包括双引号)的形式返回,如上面的示例所示。
若要避免意外结果,请调用 cast 方法,将值转换为特定类型。例如,以下代码会打印出未经类型转换和经过类型转换的值:
>>> # Import the objects for the data types, including StringType.
>>> from snowflake.snowpark.types import *
>>> df = session.table("car_sales")
>>> df.select(col("src")["salesperson"]["id"]).show()
>>> df.select(col("src")["salesperson"]["id"].cast(StringType())).show()
该代码会打印以下输出:
----------------------------------
|"""SRC""['SALESPERSON']['ID']" |
----------------------------------
|"55" |
|"274" |
----------------------------------
---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)" |
---------------------------------------------------
|55 |
|274 |
---------------------------------------------------
将对象数组展平为行¶
如果需要将半结构化数据“展平”为 DataFrame(例如,为数组中的每个对象生成一行),请使用 join_table_function
方法调用 flatten
。此方法与 FLATTEN SQL 函数等效。如果传入对象或数组的路径,该方法会返回一个 DataFrame,其中包含对象或数组中各字段或元素的行。
例如,在 :ref:` 示例数据 <label-sample_data_semistructured_data>` 中, src:customer
是一个包含有关客户的信息的对象数组。每个对象都包含 name
和 address
字段。
如果将此路径传递给 flatten
函数:
>>> df = session.table("car_sales")
>>> df.join_table_function("flatten", col("src")["customer"]).show()
该方法返回一个 DataFrame:
----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC" |"SEQ" |"KEY" |"PATH" |"INDEX" |"VALUE" |"THIS" |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{ |1 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "San Francisco, CA", | { |
| { | | | | | "name": "Joyce Ridgely", | "address": "San Francisco, CA", |
| "address": "San Francisco, CA", | | | | | "phone": "16504378889" | "name": "Joyce Ridgely", |
| "name": "Joyce Ridgely", | | | | |} | "phone": "16504378889" |
| "phone": "16504378889" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Valley View Auto Sales", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "55", | | | | | | |
| "name": "Frank Beasley" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "paint protection" | | | | | | |
| ], | | | | | | |
| "make": "Honda", | | | | | | |
| "model": "Civic", | | | | | | |
| "price": "20275", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
|{ |2 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "New York, NY", | { |
| { | | | | | "name": "Bradley Greenbloom", | "address": "New York, NY", |
| "address": "New York, NY", | | | | | "phone": "12127593751" | "name": "Bradley Greenbloom", |
| "name": "Bradley Greenbloom", | | | | |} | "phone": "12127593751" |
| "phone": "12127593751" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Tindel Toyota", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "274", | | | | | | |
| "name": "Greg Northrup" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "rust proofing", | | | | | | |
| "fabric protection" | | | | | | |
| ], | | | | | | |
| "make": "Toyota", | | | | | | |
| "model": "Camry", | | | | | | |
| "price": "23500", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
----------------------------------------------------------------------------------------------------------------------------------------------------------
从 DataFrame 中,您可以从 VALUE
字段中的每个对象选择 name
和 address
字段:
>>> df.join_table_function("flatten", col("src")["customer"]).select(col("value")["name"], col("value")["address"]).show()
-------------------------------------------------
|"""VALUE""['NAME']" |"""VALUE""['ADDRESS']" |
-------------------------------------------------
|"Joyce Ridgely" |"San Francisco, CA" |
|"Bradley Greenbloom" |"New York, NY" |
-------------------------------------------------
以下代码 将值类型转换为特定类型 并更改列的名称,从而补充了上一个示例:
>>> df.join_table_function("flatten", col("src")["customer"]).select(col("value")["name"].cast(StringType()).as_("Customer Name"), col("value")["address"].cast(StringType()).as_("Customer Address")).show()
-------------------------------------------
|"Customer Name" |"Customer Address" |
-------------------------------------------
|Joyce Ridgely |San Francisco, CA |
|Bradley Greenbloom |New York, NY |
-------------------------------------------
执行 SQL 语句¶
若要执行您指定的 SQL 语句,请调用 Session
类中的 sql
方法,然后传入要执行的语句。该方法返回一个 DataFrame。
请注意,在您 :ref:` 调用操作方法 <label-snowpark_python_dataframe_action_method>` 之前, SQL 语句不会执行。
>>> # Get the list of the files in a stage.
>>> # The collect() method causes this SQL statement to be executed.
>>> session.sql("create or replace temp stage my_stage").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Stage area MY_STAGE successfully created.')]
>>> stage_files_df = session.sql("ls @my_stage").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
>>> # Resume the operation of a warehouse.
>>> # Note that you must call the collect method to execute
>>> # the SQL statement.
>>> session.sql("alter warehouse if exists my_warehouse resume if suspended").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Statement executed successfully.')]
>>> # Set up a SQL statement to copy data from a stage to a table.
>>> session.sql("copy into sample_product_data from @my_stage file_format=(type = csv)").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Copy executed with 0 files processed.')]
如果要 :ref:` 调用方法来转换 DataFrame <label-snowpark_python_dataframe_transform>` (例如 filter
、select
等),请注意,这些方法仅在基础 SQL 语句是 SELECT 语句时才有效。其他类型的 SQL 语句不支持转换方法。
>>> df = session.sql("select id, parent_id from sample_product_data where id < 10")
>>> # Because the underlying SQL statement for the DataFrame is a SELECT statement,
>>> # you can call the filter method to transform this DataFrame.
>>> results = df.filter(col("id") < 3).select(col("id")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
>>> # In this example, the underlying SQL statement is not a SELECT statement.
>>> df = session.sql("ls @my_stage")
>>> # Calling the filter method results in an error.
>>> try:
... df.filter(col("size") > 50).collect()
... except SnowparkSQLException as e:
... print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 104
invalid identifier 'SIZE'