在 Snowpark Python 中使用 DataFrames¶
在 Snowpark 中,主要通过 DataFrame 来查询和处理数据。本主题说明如何使用 DataFrames。
本主题内容:
要检索和操作数据,需要使用 DataFrame
类。DataFrame 表示延迟评估的关系型数据集,延迟评估是指仅在触发特定操作时执行。从某种意义上说, DataFrame 就像一个需要评估才能检索数据的查询。
要将数据检索到 DataFrame 之中,请执行以下步骤:
-
例如,您可以创建一个 DataFrame 来保存源自表、外部 CSV 文件、本地数据或 SQL 语句的执行的数据。
-
例如,可以指定应该选择哪些列、如何筛选行、如何对结果进行排序和分组等。
-
为了将数据检索到 DataFrame 之中,必须调用执行操作的方法(例如
collect()
方法)。
接下来的部分将更详细地介绍这些步骤。
设置本部分的示例¶
本部分的一些示例使用 DataFrame 查询名为 sample_product_data
的表。若要运行这些示例,可以执行以下 SQL 语句,以创建此表并用一些数据来填充此表。
可以使用 Snowpark Python 运行 SQL 语句:
session.sql('CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT)').collect()
[Row(status='Table SAMPLE_PRODUCT_DATA successfully created.')]
session.sql("""
INSERT INTO sample_product_data VALUES
(1, 0, 5, 'Product 1', 'prod-1', 1, 10),
(2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
(3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
(4, 0, 10, 'Product 2', 'prod-2', 2, 40),
(5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
(6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
(7, 0, 20, 'Product 3', 'prod-3', 3, 70),
(8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
(9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
(10, 0, 50, 'Product 4', 'prod-4', 4, 100),
(11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
(12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100)
""").collect()
[Row(number of rows inserted=12)]
要验证表是否已创建,请运行:
session.sql("SELECT count(*) FROM sample_product_data").collect()
[Row(COUNT(*)=12)]
在 Python 工作表中设置示例¶
要在 :doc:` Python 工作表 </developer-guide/snowpark/python/python-worksheets>` 中设置和运行这些示例,请创建示例表,并设置您的 Python 工作表。
创建 SQL 工作表并运行以下命令:
CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT); INSERT INTO sample_product_data VALUES (1, 0, 5, 'Product 1', 'prod-1', 1, 10), (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20), (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30), (4, 0, 10, 'Product 2', 'prod-2', 2, 40), (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50), (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60), (7, 0, 20, 'Product 3', 'prod-3', 3, 70), (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80), (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90), (10, 0, 50, 'Product 4', 'prod-4', 4, 100), (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100), (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100); SELECT count(*) FROM sample_product_data;
创建 Python 工作表,设置与用于创建
sample_product_data
表的 SQL 工作表相同的数据库和架构上下文。
要在 Python 工作表中使用本主题中的示例,请在处理程序函数(例如 main
)中使用该示例,并使用传递到函数中的 Session
对象创建 DataFrames。
例如,调用 session
对象的 table
方法,为表创建一个 DataFrame :
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
def main(session: snowpark.Session):
df_table = session.table("sample_product_data")
要查看该函数生成的输出(例如通过调用 DataFrame 对象的 show
方法),请使用 Output 选项卡。
要检查该函数返回的值,请从 Settings » Return type 中选择返回值的数据类型,然后使用 Results 选项卡:
如果函数返回 DataFrame,则使用默认返回类型 Table。
如果函数从 DataFrame 对象的
collect
方法返回Row
的list
,请为返回类型使用 Variant。如果函数返回可通过类型转换操作转换为字符串的其他任何值,或者函数未返回值,请使用 String 作为返回类型。
有关更多详细信息,请参阅 运行 Python 工作表。
构造 DataFrame¶
要构造 DataFrame,可以使用 Session
类的方法和属性。以下每种方法都基于不同类型的数据源构造 DataFrame。
可以在本地开发环境中运行这些示例,也可以在 :doc:` Python 工作表 </developer-guide/snowpark/python/python-worksheets>` 中定义的 main
函数中调用它们。
要基于表、视图或流中的数据创建 DataFrame,请调用
table
方法:# Create a DataFrame from the data in the "sample_product_data" table. df_table = session.table("sample_product_data") # To print out the first 10 rows, call df_table.show()
要基于指定值创建 DataFrame,请调用
create_dataframe
方法:# Create a DataFrame with one column named a from specified values. df1 = session.create_dataframe([1, 2, 3, 4]).to_df("a") df1.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() # return df1
------- |"A" | ------- |1 | |2 | |3 | |4 | -------
创建一个包含 4 列的 DataFrame,列分别为“a”、“b”、“c”和“d”:
# Create a DataFrame with 4 columns, "a", "b", "c" and "d". df2 = session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"]) df2.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() # return df2
------------------------- |"A" |"B" |"C" |"D" | ------------------------- |1 |2 |3 |4 | -------------------------
再创建一个包含 4 列的 DataFrame,列分别为“a”、“b”、“c”和“d”:
# Create another DataFrame with 4 columns, "a", "b", "c" and "d". from snowflake.snowpark import Row df3 = session.create_dataframe([Row(a=1, b=2, c=3, d=4)]) df3.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() # return df3
------------------------- |"A" |"B" |"C" |"D" | ------------------------- |1 |2 |3 |4 | -------------------------
创建 DataFrame 并指定架构:
# Create a DataFrame and specify a schema from snowflake.snowpark.types import IntegerType, StringType, StructType, StructField schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())]) df4 = session.create_dataframe([[1, "snow"], [3, "flake"]], schema) df4.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() # return df4
--------------- |"A" |"B" | --------------- |1 |snow | |3 |flake | ---------------
要创建包含特定值范围的 DataFrame,请调用
range
方法:# Create a DataFrame from a range # The DataFrame contains rows with values 1, 3, 5, 7, and 9 respectively. df_range = session.range(1, 10, 2).to_df("a") df_range.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() # return df_range
------- |"A" | ------- |1 | |3 | |5 | |7 | |9 | -------
要创建 DataFrame 来保存暂存区内文件中的数据,请使用
read
属性获取DataFrameReader
对象。在DataFrameReader
对象中,调用与文件中的数据格式对应的方法:from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType # Create DataFrames from data in a stage. df_json = session.read.json("@my_stage2/data1.json") df_catalog = session.read.schema(StructType([StructField("name", StringType()), StructField("age", IntegerType())])).csv("@stage/some_dir")
要创建 DataFrame 来保存 SQL 查询的结果,请调用
sql
方法:# Create a DataFrame from a SQL query df_sql = session.sql("SELECT name from sample_product_data") df_sql.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() # return df_sql
-------------- |"NAME" | -------------- |Product 1 | |Product 1A | |Product 1B | |Product 2 | |Product 2A | |Product 2B | |Product 3 | |Product 3A | |Product 3B | |Product 4 | --------------
可以使用 sql
方法执行 SELECT 语句,从表和暂存文件中检索数据,但使用 table
方法和 read
属性可在开发工具中提供更好的语法突出显示、错误突出显示和智能代码补全。
指定应该如何转换数据集¶
要指定需要选择的列,以及如何对结果执行筛选、排序、分组等操作,请调用用于转换数据集的 DataFrame 方法。要在这些方法中标识列,请使用 col
函数或计算结果为列的表达式。请参阅 指定列和表达式。
例如:
若要指定应返回的行,请调用
filter
方法:# Import the col function from the functions module. # Python worksheets import this function by default from snowflake.snowpark.functions import col # Create a DataFrame for the rows with the ID 1 # in the "sample_product_data" table. # This example uses the == operator of the Column object to perform an # equality check. df = session.table("sample_product_data").filter(col("id") == 1) df.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() return df
------------------------------------------------------------------------------------ |"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" | ------------------------------------------------------------------------------------ |1 |0 |5 |Product 1 |prod-1 |1 |10 | ------------------------------------------------------------------------------------
若要指定应选择的列,请调用
select
方法:# Import the col function from the functions module. from snowflake.snowpark.functions import col # Create a DataFrame that contains the id, name, and serial_number # columns in the "sample_product_data" table. df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number")) df.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() return df
--------------------------------------- |"ID" |"NAME" |"SERIAL_NUMBER" | --------------------------------------- |1 |Product 1 |prod-1 | |2 |Product 1A |prod-1-A | |3 |Product 1B |prod-1-B | |4 |Product 2 |prod-2 | |5 |Product 2A |prod-2-A | |6 |Product 2B |prod-2-B | |7 |Product 3 |prod-3 | |8 |Product 3A |prod-3-A | |9 |Product 3B |prod-3-B | |10 |Product 4 |prod-4 | ---------------------------------------
您还可以按如下方式引用列:
# Import the col function from the functions module. from snowflake.snowpark.functions import col df_product_info = session.table("sample_product_data") df1 = df_product_info.select(df_product_info["id"], df_product_info["name"], df_product_info["serial_number"]) df2 = df_product_info.select(df_product_info.id, df_product_info.name, df_product_info.serial_number) df3 = df_product_info.select("id", "name", "serial_number")
每个方法都返回一个经过转换的新 DataFrame 对象。该方法不会影响原始 DataFrame 对象。要应用多个转换,可以 将多个方法调用链接起来,基于前一个方法调用返回的新 DataFrame 对象调用每个后续转换方法。
这些转换方法指定了如何构造 SQL 语句,并且不会从 Snowflake 数据库检索数据。执行操作以计算 DataFrame 中描述的操作方法会执行数据检索。
联接 DataFrames¶
若要联接 DataFrame 对象,请调用 join
方法:
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
# Create a DataFrame that joins the two DataFrames
# on the column named "key".
df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2").show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2")
-------------------------------
|"KEY" |"VALUE1" |"VALUE2" |
-------------------------------
|a |1 |3 |
|b |2 |4 |
-------------------------------
如果两个 DataFrames 都包含要用于联接的相同列,则可以使用以下示例语法:
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
# If both dataframes have the same column "key", the following is more convenient.
df_lhs.join(df_rhs, ["key"]).show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_lhs.join(df_rhs, ["key"])
-------------------------------
|"KEY" |"VALUE1" |"VALUE2" |
-------------------------------
|a |1 |3 |
|b |2 |4 |
-------------------------------
您还可以使用 & 运算符来连接多个联接表达式:
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
# Use & operator connect join expression. '|' and ~ are similar.
df_joined_multi_column = df_lhs.join(df_rhs, (df_lhs.col("key") == df_rhs.col("key")) & (df_lhs.col("value1") < df_rhs.col("value2"))).select(df_lhs["key"].as_("key"), "value1", "value2")
df_joined_multi_column.show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_joined_multi_column
-------------------------------
|"KEY" |"VALUE1" |"VALUE2" |
-------------------------------
|a |1 |3 |
|b |2 |4 |
-------------------------------
如果要执行自联接,则必须复制 DataFrame:
# copy the DataFrame if you want to do a self-join
from copy import copy
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
df_lhs_copied = copy(df_lhs)
df_self_joined = df_lhs.join(df_lhs_copied, (df_lhs.col("key") == df_lhs_copied.col("key")) & (df_lhs.col("value1") == df_lhs_copied.col("value1")))
如果 DataFrames 中存在重叠的列,Snowpark 会在联接结果中的列前面附加一个随机生成的前缀:
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"))
-----------------------------------------------------
|"l_av5t_KEY" |"VALUE1" |"r_1p6k_KEY" |"VALUE2" |
-----------------------------------------------------
|a |1 |a |3 |
|b |2 |b |4 |
-----------------------------------------------------
可以使用 Column.alias
来重命名重叠的列:
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].alias("key1"), df_rhs["key"].alias("key2"), "value1", "value2").show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].alias("key1"), df_rhs["key"].alias("key2"), "value1", "value2")
-----------------------------------------
|"KEY1" |"KEY2" |"VALUE1" |"VALUE2" |
-----------------------------------------
|a |a |1 |3 |
|b |b |2 |4 |
-----------------------------------------
要避免使用随机前缀,还可以指定要追加到重叠列的后缀:
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"), lsuffix="_left", rsuffix="_right").show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"), lsuffix="_left", rsuffix="_right")
--------------------------------------------------
|"KEY_LEFT" |"VALUE1" |"KEY_RIGHT" |"VALUE2" |
--------------------------------------------------
|a |1 |a |3 |
|b |2 |b |4 |
--------------------------------------------------
这些示例使用 DataFrame.col
来指定要在联接中使用的列。有关指定列的更多方法,请参阅 指定列和表达式。
如果需要基于不同的列将表与其自身联接,则不能使用单个 DataFrame 执行自联接。以下示例使用单个 DataFrame 执行自联接,操作会失败,因为联接的左侧和右侧都存在 "id"
的列表达式:
from snowflake.snowpark.exceptions import SnowparkJoinException
df = session.table("sample_product_data")
# This fails because columns named "id" and "parent_id"
# are in the left and right DataFrames in the join.
try:
df_joined = df.join(df, col("id") == col("parent_id")) # fails
except SnowparkJoinException as e:
print(e.message)
You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy.
# This fails because columns named "id" and "parent_id"
# are in the left and right DataFrames in the join.
try:
df_joined = df.join(df, df["id"] == df["parent_id"]) # fails
except SnowparkJoinException as e:
print(e.message)
You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy.
相反,请使用 Python 内置的 copy()
方法创建 DataFrame 对象的克隆,并使用这两个 DataFrame 对象执行联接:
from copy import copy
# Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
df_lhs = session.table("sample_product_data")
# Clone the DataFrame object to use as the right-hand side of the join.
df_rhs = copy(df_lhs)
# Create a DataFrame that joins the two DataFrames
# for the "sample_product_data" table on the
# "id" and "parent_id" columns.
df_joined = df_lhs.join(df_rhs, df_lhs.col("id") == df_rhs.col("parent_id"))
df_joined.count()
指定列和表达式¶
调用这些转换方法时,可能需要指定列或者使用列的表达式。例如,调用 select
方法时,需要指定要选择的列。
要引用列,请通过调用 snowflake.snowpark.functions
模块中的 col
函数来创建 Column
对象。
# Import the col function from the functions module.
from snowflake.snowpark.functions import col
df_product_info = session.table("sample_product_data").select(col("id"), col("name"))
df_product_info.show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_product_info
---------------------
|"ID" |"NAME" |
---------------------
|1 |Product 1 |
|2 |Product 1A |
|3 |Product 1B |
|4 |Product 2 |
|5 |Product 2A |
|6 |Product 2B |
|7 |Product 3 |
|8 |Product 3A |
|9 |Product 3B |
|10 |Product 4 |
---------------------
备注
要为字面量创建 Column
对象,请参阅 将字面量用作列对象。
指定筛选器、投影、联接条件等时,可以在表达式中使用 Column
对象。例如:
可以将
Column
对象与filter
方法一起使用,以指定筛选条件:# Specify the equivalent of "WHERE id = 20" # in a SQL SELECT statement. df_filtered = df.filter(col("id") == 20)
df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"]) # Specify the equivalent of "WHERE a + b < 10" # in a SQL SELECT statement. df_filtered = df.filter((col("a") + col("b")) < 10) df_filtered.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() return df_filtered
------------- |"A" |"B" | ------------- |1 |3 | -------------
可以将
Column
对象与select
方法一起使用,以定义别名:df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"]) # Specify the equivalent of "SELECT b * 10 AS c" # in a SQL SELECT statement. df_selected = df.select((col("b") * 10).as_("c")) df_selected.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() return df_selected
------- |"C" | ------- |30 | |100 | -------
可以将
Column
对象与join
方法一起使用,以定义联接条件:dfX = session.create_dataframe([[1], [2]], schema=["a_in_X"]) dfY = session.create_dataframe([[1], [3]], schema=["b_in_Y"]) # Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y" # in a SQL SELECT statement. df_joined = dfX.join(dfY, col("a_in_X") == col("b_in_Y")).select(dfX["a_in_X"].alias("the_joined_column")) df_joined.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() return df_joined
----------------------- |"THE_JOINED_COLUMN" | ----------------------- |1 | -----------------------
引用两个不同 DataFrame 对象中具有相同名称的列时(例如,基于该列联接 DataFrames ),可以在一个 DataFrame 对象中使用 DataFrame.col
方法引用该对象中的列(例如 df1.col("name")
和 df2.col("name")
)。
下面的示例演示了如何使用 DataFrame.col
方法来引用特定 DataFrame 中的列。该示例联接两个 DataFrame 对象,两者均具有名为 key
的列。该示例使用 Column.as
方法来更改新创建 DataFrame 中的列名称。
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value"])
# Create a DataFrame that joins two other DataFrames (df_lhs and df_rhs).
# Use the DataFrame.col method to refer to the columns used in the join.
df_joined = df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs.col("key").as_("key"), df_lhs.col("value").as_("L"), df_rhs.col("value").as_("R"))
df_joined.show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_joined
---------------------
|"KEY" |"L" |"R" |
---------------------
|a |1 |3 |
|b |2 |4 |
---------------------
将对象标识符(表名称、列名称等)放在双引号里¶
您指定的数据库、架构、表和暂存区的名称必须符合 Snowflake 标识符要求。
创建一个包含区分大小写的列的表:
session.sql("""
create or replace temp table "10tablename"(
id123 varchar, -- case insensitive because it's not quoted.
"3rdID" varchar, -- case sensitive.
"id with space" varchar -- case sensitive.
)""").collect()
# Add return to the statement to return the collect() results in a Python worksheet
[Row(status='Table 10tablename successfully created.')]
随后向表中添加值:
session.sql("""insert into "10tablename" (id123, "3rdID", "id with space") values ('a', 'b', 'c')""").collect()
# Add return to the statement to return the collect() results in a Python worksheet
[Row(number of rows inserted=1)]
随后为表创建一个 DataFrame,并查询该表:
df = session.table('"10tablename"')
df.show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df
---------------------------------------
|"ID123" |"3rdID" |"id with space" |
---------------------------------------
|a |b |c |
---------------------------------------
指定名称时,Snowflake 会将该名称视为大写形式。例如,以下调用是等效的:
df.select(col("id123")).collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(ID123='a')]
如果名称不符合标识符要求,则必须将名称放在双引号里 ("
)。使用反斜杠 (\
) 对字符串字面量中的双引号字符进行转义。例如,以下表名称并非以字母或下划线开头,因此必须将该名称放在双引号里:
df = session.table("\"10tablename\"")
也可以使用单引号而非反斜杠来转义字符串字面量中的双引号字符。
df = session.table('"10tablename"')
请注意,指定 :emph:` 列 ` 的名称时,不需要将名称放在双引号里。如果列名称不符合标识符要求,Snowpark 库会自动将列名称放在双引号里:
df.select(col("3rdID")).collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(3rdID='b')]
再举一个例子,以下调用是等效的:
df.select(col("id with space")).collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
df.select(col("\"id with space\"")).collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
如果已在列名称两侧添加了双引号,则该库不会在列名称两侧插入其他双引号。
在某些情况下,列名称可能包含双引号字符:
session.sql('''
create or replace temp table quoted(
"name_with_""air""_quotes" varchar,
"""column_name_quoted""" varchar
)''').collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Table QUOTED successfully created.')]
session.sql('''insert into quoted ("name_with_""air""_quotes", """column_name_quoted""") values ('a', 'b')''').collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(number of rows inserted=1)]
如 标识符要求 中所述,对于带双引号的标识符中的每个双引号字符,都必须使用两个双引号字符(例如 "name_with_""air""_quotes"
和 """column_name_quoted"""
):
df_table = session.table("quoted")
df_table.select("\"name_with_\"\"air\"\"_quotes\"").collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(name_with_"air"_quotes='a')]
df_table.select("\"\"\"column_name_quoted\"\"\"").collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row("column_name_quoted"='b')]
如果将标识符放在双引号里(无论是您显式添加了引号,还是由库为您添加了引号), Snowflake 都会将标识符视为区分大小写:
# The following calls are NOT equivalent!
# The Snowpark library adds double quotes around the column name,
# which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space")).collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
对比此示例:
from snowflake.snowpark.exceptions import SnowparkSQLException
try:
df.select(col("ID WITH SPACE")).collect()
except SnowparkSQLException as e:
print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 7
invalid identifier '"ID WITH SPACE"'
将字面量用作列对象¶
若要在将 Column
对象作为实参的方法中使用字面量,请将字面量传递给 snowflake.snowpark.functions
模块中的 lit
函数,从而为字面量创建 Column
对象。例如:
# Import for the lit and col functions.
from snowflake.snowpark.functions import col, lit
# Show the first 10 rows in which num_items is greater than 5.
# Use `lit(5)` to create a Column object for the literal 5.
df_filtered = df.filter(col("num_items") > lit(5))
将列对象的类型转换为特定类型¶
要将 Column
对象的类型转换为特定类型,请调用 cast
方法,然后从 snowflake.snowpark.types
模块传入类型对象。例如,要将字面量转换为精度为 5、小数位数为 2 的 NUMBER,请运行以下语句:
# Import for the lit function.
from snowflake.snowpark.functions import lit
# Import for the DecimalType class.
from snowflake.snowpark.types import DecimalType
decimal_value = lit(0.05).cast(DecimalType(5,2))
链接多个方法调用¶
由于每个 转换 DataFrame 对象的方法 都会返回一个应用了转换的新 DataFrame 对象,因此您可以 链接多个方法调用 (link removed),以生成以其他方式转换的新 DataFrame。
下面的示例返回为如下目的而配置的 DataFrame :
查询
sample_product_data
表。返回
id = 1
的行。选择
name
和serial_number
列。df_product_info = session.table("sample_product_data").filter(col("id") == 1).select(col("name"), col("serial_number")) df_product_info.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() return df_product_info
------------------------------- |"NAME" |"SERIAL_NUMBER" | ------------------------------- |Product 1 |prod-1 | -------------------------------
在此示例中:
session.table("sample_product_data")
返回sample_product_data
表的 DataFrame。尽管 DataFrame 尚且不包含表中的数据,但该对象确实包含表列的定义。
filter(col("id") == 1)
返回sample_product_data
表的 DataFrame(设置为返回id = 1
的行)。请注意, DataFrame 还不包含表中匹配的行。在 :ref:` 调用操作方法 <label-snowpark_python_dataframe_action_method>` 之前,不会检索匹配的行。
select(col("name"), col("serial_number"))
返回一个 DataFrame,包含sample_product_data
表中id = 1
的行的name
和serial_number
列。
在链接多个方法调用时,调用顺序非常重要。每个方法调用都返回一个已转换的 DataFrame。确保后续调用使用已转换的 DataFrame。
使用 Snowpark Python 时,可能需要按照不同于在 SQL 语句中使用等效关键字(SELECT 和 WHERE)时的顺序执行 select
和 filter
方法调用。
检索列定义¶
要在数据集中为 DataFrame 检索列的定义,请调用 schema
属性。此方法会返回一个 StructType
对象,其中包含 StructField
对象的 list
。每个 StructField
对象都包含一列的定义。
# Import the StructType
from snowflake.snowpark.types import *
# Get the StructType object that describes the columns in the
# underlying rowset.
table_schema = session.table("sample_product_data").schema
table_schema
StructType([StructField('ID', LongType(), nullable=True), StructField('PARENT_ID', LongType(), nullable=True), StructField('CATEGORY_ID', LongType(), nullable=True), StructField('NAME', StringType(), nullable=True), StructField('SERIAL_NUMBER', StringType(), nullable=True), StructField('KEY', LongType(), nullable=True), StructField('"3rd"', LongType(), nullable=True)])
在返回的 StructType
对象中,列名称始终是规范化的。不带引号的标识符以大写形式返回,带引号的标识符以其定义所用的确切大小写形式返回。
下面的示例创建一个 DataFrame,其中包含名为 ID
和 3rd
的列。对于名为 3rd
的列,Snowpark 库会自动将名称放在双引号里(即 "3rd"
),因为 该名称不符合标识符要求。
该示例调用 schema
属性,随后调用返回的 StructType
对象的 names
属性,以获取列名称的 list
。这些名称使用 schema
属性返回的 StructType
进行规范化。
# Create a DataFrame containing the "id" and "3rd" columns.
df_selected_columns = session.table("sample_product_data").select(col("id"), col("3rd"))
# Print out the names of the columns in the schema.
# This prints List["ID", "\"3rd\""]
df_selected_columns.schema.names
['ID', '"3rd"']
执行操作以计算 DataFrame¶
如前所述, DataFrame 是延迟计算的,也就是说,在您执行操作之前, SQL 语句不会发送到服务器执行。执行操作会导致对 DataFrame 进行计算,并将相应的 SQL 语句发送到服务器执行。
以下方法可执行操作:
类 |
方法 |
描述 |
---|---|---|
|
|
计算 DataFrame,并将生成的数据集作为 |
|
|
计算 DataFrame 并返回行数。 |
|
|
计算 DataFrame,并将行打印到控制台。此方法将行数限制为 10 行(默认值)。 |
|
|
将 DataFrame 中的数据保存到指定表中。请参阅 将数据保存到表中。 |
例如,若要对表执行查询并返回结果,请调用 collect
方法:
# Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
# This does not execute the query.
df = session.table("sample_product_data").select(col("id"), col("name"))
# Send the query to the server for execution and
# return a list of Rows containing the results.
results = df.collect()
# Use a return statement to return the collect() results in a Python worksheet
# return results
若要执行查询并返回结果数量,请调用 count
方法:
# Create a DataFrame for the "sample_product_data" table.
df_products = session.table("sample_product_data")
# Send the query to the server for execution and
# print the count of rows in the table.
print(df_products.count())
12
若要执行查询并将结果打印到控制台,请调用 show
方法:
# Create a DataFrame for the "sample_product_data" table.
df_products = session.table("sample_product_data")
# Send the query to the server for execution and
# print the results to the console.
# The query limits the number of rows to 10 by default.
df_products.show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_products
-------------------------------------------------------------------------------------
|"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" |
-------------------------------------------------------------------------------------
|1 |0 |5 |Product 1 |prod-1 |1 |10 |
|2 |1 |5 |Product 1A |prod-1-A |1 |20 |
|3 |1 |5 |Product 1B |prod-1-B |1 |30 |
|4 |0 |10 |Product 2 |prod-2 |2 |40 |
|5 |4 |10 |Product 2A |prod-2-A |2 |50 |
|6 |4 |10 |Product 2B |prod-2-B |2 |60 |
|7 |0 |20 |Product 3 |prod-3 |3 |70 |
|8 |7 |20 |Product 3A |prod-3-A |3 |80 |
|9 |7 |20 |Product 3B |prod-3-B |3 |90 |
|10 |0 |50 |Product 4 |prod-4 |4 |100 |
-------------------------------------------------------------------------------------
若要将行数限制为 20 行:
# Create a DataFrame for the "sample_product_data" table.
df_products = session.table("sample_product_data")
# Limit the number of rows to 20, rather than 10.
df_products.show(20)
# All rows are returned when you use return in a Python worksheet to return the DataFrame as a table
return df_products
-------------------------------------------------------------------------------------
|"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" |
-------------------------------------------------------------------------------------
|1 |0 |5 |Product 1 |prod-1 |1 |10 |
|2 |1 |5 |Product 1A |prod-1-A |1 |20 |
|3 |1 |5 |Product 1B |prod-1-B |1 |30 |
|4 |0 |10 |Product 2 |prod-2 |2 |40 |
|5 |4 |10 |Product 2A |prod-2-A |2 |50 |
|6 |4 |10 |Product 2B |prod-2-B |2 |60 |
|7 |0 |20 |Product 3 |prod-3 |3 |70 |
|8 |7 |20 |Product 3A |prod-3-A |3 |80 |
|9 |7 |20 |Product 3B |prod-3-B |3 |90 |
|10 |0 |50 |Product 4 |prod-4 |4 |100 |
|11 |10 |50 |Product 4A |prod-4-A |4 |100 |
|12 |10 |50 |Product 4B |prod-4-B |4 |100 |
-------------------------------------------------------------------------------------
备注
如果调用 schema
属性以获取 DataFrame 中列的定义,则不需要调用操作方法。
将数据保存到表中¶
要将 DataFrame 的内容保存到表中,请执行以下操作:
调用
write
属性以获取DataFrameWriter
对象。调用
DataFrameWriter
对象中的mode
方法并指定模式。有关更多信息,请参阅 ` API 文档 <https://docs.snowflake.cn/developer-guide/snowpark/reference/python/latest/api/snowflake.snowpark.DataFrameWriter.mode#snowflake.snowpark.DataFrameWriter.mode>`_。此方法返回使用指定模式进行了配置的新DataFrameWriter
对象。在
DataFrameWriter
对象中调用save_as_table
方法,以将 DataFrame 的内容保存到指定的表中。
请注意,无需调用单独的方法(例如 collect
)来执行将数据保存到表中的 SQL 语句。
例如:
df.write.mode("overwrite").save_as_table("table1")
基于 DataFrame 创建视图¶
若要基于 DataFrame 创建视图,请调用 create_or_replace_view
方法,该方法会立即创建新视图:
import os
database = os.environ["snowflake_database"] # use your own database and schema
schema = os.environ["snowflake_schema"]
view_name = "my_view"
df.create_or_replace_view(f"{database}.{schema}.{view_name}")
[Row(status='View MY_VIEW successfully created.')]
在 Python 工作表中,由于工作表在数据库和架构的上下文中运行,您可以运行以下命令来创建视图:
# Define a DataFrame
df_products = session.table("sample_product_data")
# Define a View name
view_name = "my_view"
# Create the view
df_products.create_or_replace_view(f"{view_name}")
# return the view name
return view_name + " successfully created"
my_view successfully created
通过调用 create_or_replace_view
创建的视图是持久保留的。如果不再需要该视图,可以 手动删除视图。
也可使用 create_or_replace_temp_view
方法来创建临时视图。临时视图仅在创建临时视图的会话中可用。
处理暂存区中的文件¶
本节介绍如何查询 Snowflake 暂存区中文件内的数据。要对文件执行其他操作,请 使用 SQL 语句。
要查询 Snowflake 暂存区中文件内的数据,请使用 DataFrameReader
类:
调用
Session
类中的read
方法,以访问DataFrameReader
对象。如果文件采用 CSV 格式,请描述文件中的字段。要这样做,请执行以下操作:
创建一个
StructType
对象,该对象包含描述文件中字段的StructField
对象list
。对于每个
StructField
对象,请指定以下内容:字段的名称。
字段的数据类型(在
snowflake.snowpark.types
模块中指定为对象)。字段是否可为 null。
例如:
from snowflake.snowpark.types import * schema_for_data_file = StructType([ StructField("id", StringType()), StructField("name", StringType()) ])
调用
DataFrameReader
对象中的schema
属性,传入StructType
对象。例如:
df_reader = session.read.schema(schema_for_data_file)
schema
属性会返回一个DataFrameReader
对象,该对象配置为读取包含指定字段的文件。请注意,对于其他格式(如 JSON)的文件,无需执行此操作。对于这些文件,
DataFrameReader
会将数据视为字段名称为$1
、类型为 VARIANT 的单个字段处理。
如果需要指定有关数据读取方式的其他信息(例如,数据经过压缩,或者 CSV 文件使用分号而非逗号来分隔字段),请调用
DataFrameReader
对象的option
或options
方法。option
方法接受要设置的选项的名称和值,并允许合并多个链接的调用,options
方法接受选项名称及其相应值构成的字典。有关文件格式选项的名称和值,请参阅 有关 CREATE FILE FORMAT 的文档。
您还可以设置 :doc:` 有关 COPY INTO TABLE 的文档 </sql-reference/sql/copy-into-table>` 中所述的复制选项。请注意,当您 :ref:`将数据检索到 DataFrame 中 <label-snowpark_python_dataframe_action_method>`时,设置复制选项可能造成费用更昂贵的执行策略。
下面的示例设置了
DataFrameReader
对象,以查询未压缩且使用分号作为字段分隔符的 CSV 文件中的数据。df_reader = df_reader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
option
和options
方法返回使用指定选项配置的DataFrameReader
对象。调用与文件格式相对应的方法(例如
csv
方法),传入文件的位置。df = df_reader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
与文件格式对应的方法返回一个 DataFrame 对象,该对象配置为保存该文件中的数据。
使用 DataFrame 对象方法,对 :ref:` 数据集执行所需的任何转换 <label-snowpark_python_dataframe_transform>` (例如,选择特定字段、筛选行等)。
例如,要从名为
my_stage
的暂存区中的一个 JSON 文件内提取color
元素:# Import the sql_expr function from the functions module. from snowflake.snowpark.functions import sql_expr df = session.read.json("@my_stage").select(sql_expr("$1:color"))
如前所述,对于格式并非 CSV 的文件(例如 JSON 格式的文件),
DataFrameReader
会将文件中的数据视为名为$1
的单个 VARIANT 列。此示例使用
snowflake.snowpark.functions
模块中的sql_expr
函数来指定color
元素的路径。请注意,
sql_expr
函数不会解释或修改输入实参。该函数仅允许您使用 SQL 构造 Snowpark API 还不支持的表达式和代码片段。调用操作方法 来查询文件中的数据。
与表的 DataFrames 一样,在您调用操作方法之前,不会将数据检索到 DataFrame 中。
使用半结构化数据¶
使用 DataFrame,您可以查询和访问 :doc:` 半结构化数据 </user-guide/semistructured-intro>` (例如 JSON 数据)。接下来的几个部分将介绍如何在 DataFrame 中处理半结构化数据。
备注
这些部分中的示例使用 示例中使用的示例数据 中的示例数据。
遍历半结构化数据¶
若要引用半结构化数据中的特定字段或元素,请使用 Column
对象的以下方法:
获取
col_object["<field_name>"]
属性,以返回 OBJECT(或包含 OBJECT 的 VARIANT )中字段的Column
对象。使用
col_object[<index>]
,返回 ARRAY(或包含 ARRAY 的 VARIANT)中元素的Column
对象。
备注
如果路径中的字段名称或元素不规则,并且导致难以使用上述索引,则可以使用 get
、get_ignore_case
或 get_path
作为替代方法。
例如,以下代码会从 :ref:` 示例数据 <label-sample_data_semistructured_data>` 的 src
列的对象内,选择 dealership
字段:
from snowflake.snowpark.functions import col
df = session.table("car_sales")
df.select(col("src")["dealership"]).show()
该代码会打印以下输出:
----------------------------
|"""SRC""['DEALERSHIP']" |
----------------------------
|"Valley View Auto Sales" |
|"Tindel Toyota" |
----------------------------
备注
DataFrame 中的值放在双引号之间,因为这些值以字符串字面量的形式返回。若要将这些值的类型转换为特定类型,请参阅 显式转换半结构化数据中的值。
还可以 链接多个方法调用,以遍历特定字段或元素的路径。
例如,以下代码选择 salesperson
对象中的 name
字段:
df = session.table("car_sales")
df.select(df["src"]["salesperson"]["name"]).show()
该代码会打印以下输出:
------------------------------------
|"""SRC""['SALESPERSON']['NAME']" |
------------------------------------
|"Frank Beasley" |
|"Greg Northrup" |
------------------------------------
再举一个例子,下面的代码选择 vehicle
字段的第一个元素,其中包含车辆数组。该示例还会选择第一个元素中的 price
字段。
df = session.table("car_sales")
df.select(df["src"]["vehicle"][0]).show()
df.select(df["src"]["vehicle"][0]["price"]).show()
该代码会打印以下输出:
---------------------------
|"""SRC""['VEHICLE'][0]" |
---------------------------
|{ |
| "extras": [ |
| "ext warranty", |
| "paint protection" |
| ], |
| "make": "Honda", |
| "model": "Civic", |
| "price": "20275", |
| "year": "2017" |
|} |
|{ |
| "extras": [ |
| "ext warranty", |
| "rust proofing", |
| "fabric protection" |
| ], |
| "make": "Toyota", |
| "model": "Camry", |
| "price": "23500", |
| "year": "2017" |
|} |
---------------------------
------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']" |
------------------------------------
|"20275" |
|"23500" |
------------------------------------
除了上述访问字段的方法之外,还有一种替代方法,如果路径中的字段名称或元素不规则,您可以使用 get
、get_ignore_case
或 get_path
函数。
例如,以下代码行都会打印对象中指定字段的值:
from snowflake.snowpark.functions import get, get_path, lit
df.select(get(col("src"), lit("dealership"))).show()
df.select(col("src")["dealership"]).show()
同样,以下代码行都会打印对象中指定路径处的字段值:
df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
df.select(col("src")["vehicle"][0]["make"]).show()
显式转换半结构化数据中的值¶
默认情况下,字段和元素的值以字符串字面量(包括双引号)的形式返回,如上面的示例所示。
若要避免意外结果,请调用 :ref:` cast <label-snowpark_python_dataframe_cols_cast>` 方法,将值转换为特定类型。例如,以下代码会打印出未经类型转换和经过类型转换的值:
# Import the objects for the data types, including StringType.
from snowflake.snowpark.types import *
df = session.table("car_sales")
df.select(col("src")["salesperson"]["id"]).show()
df.select(col("src")["salesperson"]["id"].cast(StringType())).show()
该代码会打印以下输出:
----------------------------------
|"""SRC""['SALESPERSON']['ID']" |
----------------------------------
|"55" |
|"274" |
----------------------------------
---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)" |
---------------------------------------------------
|55 |
|274 |
---------------------------------------------------
将对象数组展平为行¶
如果需要将半结构化数据“展平”为 DataFrame(例如,为数组中的每个对象生成一行),请使用 join_table_function
方法调用 flatten
。此方法与 FLATTEN SQL 函数等效。如果传入对象或数组的路径,该方法会返回一个 DataFrame,其中包含对象或数组中各字段或元素的行。
例如,在 :ref:` 示例数据 <label-sample_data_semistructured_data>` 中, src:customer
是一个包含有关客户的信息的对象数组。每个对象都包含 name
和 address
字段。
如果将此路径传递给 flatten
函数:
df = session.table("car_sales")
df.join_table_function("flatten", col("src")["customer"]).show()
该方法返回一个 DataFrame:
----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC" |"SEQ" |"KEY" |"PATH" |"INDEX" |"VALUE" |"THIS" |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{ |1 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "San Francisco, CA", | { |
| { | | | | | "name": "Joyce Ridgely", | "address": "San Francisco, CA", |
| "address": "San Francisco, CA", | | | | | "phone": "16504378889" | "name": "Joyce Ridgely", |
| "name": "Joyce Ridgely", | | | | |} | "phone": "16504378889" |
| "phone": "16504378889" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Valley View Auto Sales", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "55", | | | | | | |
| "name": "Frank Beasley" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "paint protection" | | | | | | |
| ], | | | | | | |
| "make": "Honda", | | | | | | |
| "model": "Civic", | | | | | | |
| "price": "20275", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
|{ |2 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "New York, NY", | { |
| { | | | | | "name": "Bradley Greenbloom", | "address": "New York, NY", |
| "address": "New York, NY", | | | | | "phone": "12127593751" | "name": "Bradley Greenbloom", |
| "name": "Bradley Greenbloom", | | | | |} | "phone": "12127593751" |
| "phone": "12127593751" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Tindel Toyota", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "274", | | | | | | |
| "name": "Greg Northrup" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "rust proofing", | | | | | | |
| "fabric protection" | | | | | | |
| ], | | | | | | |
| "make": "Toyota", | | | | | | |
| "model": "Camry", | | | | | | |
| "price": "23500", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
----------------------------------------------------------------------------------------------------------------------------------------------------------
从 DataFrame 中,您可以从 VALUE
字段中的每个对象选择 name
和 address
字段:
df.join_table_function("flatten", col("src")["customer"]).select(col("value")["name"], col("value")["address"]).show()
-------------------------------------------------
|"""VALUE""['NAME']" |"""VALUE""['ADDRESS']" |
-------------------------------------------------
|"Joyce Ridgely" |"San Francisco, CA" |
|"Bradley Greenbloom" |"New York, NY" |
-------------------------------------------------
以下代码 将值类型转换为特定类型 并更改列的名称,从而补充了上一个示例:
df.join_table_function("flatten", col("src")["customer"]).select(col("value")["name"].cast(StringType()).as_("Customer Name"), col("value")["address"].cast(StringType()).as_("Customer Address")).show()
-------------------------------------------
|"Customer Name" |"Customer Address" |
-------------------------------------------
|Joyce Ridgely |San Francisco, CA |
|Bradley Greenbloom |New York, NY |
-------------------------------------------
执行 SQL 语句¶
若要执行您指定的 SQL 语句,请调用 Session
类中的 sql
方法,然后传入要执行的语句。该方法返回一个 DataFrame。
请注意,在您 :ref:` 调用操作方法 <label-snowpark_python_dataframe_action_method>` 之前, SQL 语句不会执行。
# Get the list of the files in a stage.
# The collect() method causes this SQL statement to be executed.
session.sql("create or replace temp stage my_stage").collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Stage area MY_STAGE successfully created.')]
stage_files_df = session.sql("ls @my_stage").collect()
# Prepend a return statement to return the collect() results in a Python worksheet
# Resume the operation of a warehouse.
# Note that you must call the collect method to execute
# the SQL statement.
session.sql("alter warehouse if exists my_warehouse resume if suspended").collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Statement executed successfully.')]
# Set up a SQL statement to copy data from a stage to a table.
session.sql("copy into sample_product_data from @my_stage file_format=(type = csv)").collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Copy executed with 0 files processed.')]
如果要 :ref:` 调用方法来转换 DataFrame <label-snowpark_python_dataframe_transform>` (例如 filter
、select
等),请注意,这些方法仅在基础 SQL 语句是 SELECT 语句时才有效。其他类型的 SQL 语句不支持转换方法。
df = session.sql("select id, parent_id from sample_product_data where id < 10")
# Because the underlying SQL statement for the DataFrame is a SELECT statement,
# you can call the filter method to transform this DataFrame.
results = df.filter(col("id") < 3).select(col("id")).collect()
# Prepend a return statement to return the collect() results in a Python worksheet
# In this example, the underlying SQL statement is not a SELECT statement.
df = session.sql("ls @my_stage")
# Calling the filter method results in an error.
try:
df.filter(col("size") > 50).collect()
except SnowparkSQLException as e:
print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 104
invalid identifier 'SIZE'
并发提交 Snowpark 查询¶
备注
此功能需要 Snowpark Library for Python 1.24 或更高版本以及服务器版本 8.46 或更高版本。
线程安全的会话对象允许 Snowpark Python 代码的不同部分在使用同一会话时并发运行。这使得多个操作(例如多个 DataFrames 上的转换)能够同时执行。当您处理可以在 Snowflake 服务器上独立处理的查询时,这特别有用,而且它符合更传统的多线程方法。
Python 中的全局解释器锁 (GIL) 是一种互斥体,用于保护对 Python 对象的访问,防止多个原生线程同时执行 Python 字节码。虽然 I/O 绑定的操作仍然可以从 Python 的线程模型中受益,因为在 I/O 操作期间会释放 GIL,但是 CPU 绑定的线程不会实现真正的并行,因为一次只能有一个线程执行。
此外,当在 Snowflake 内部使用时(例如在存储过程中),Snowpark Python 服务器通过在向 Snowflake 提交查询之前释放全局解释器锁 (GIL) 来管理全局解释器锁。这确保了在从独立线程对多个查询进行排队时可以实现真正的并发性。通过这种管理,Snowpark 允许多个线程并发提交查询,从而确保最佳的并行执行。
在 Snowpark 中使用线程安全会话对象的好处¶
能够同时运行多个 DataFrame 操作可以为 Snowpark 用户带来以下好处:
提高性能:线程安全会话对象允许您同时运行多个 Snowpark Python 查询,从而缩短总体运行时。例如,如果您需要独立处理多个表,此功能可显著缩短完成作业所需的时间,因为您不再需要等待每个表的处理完成之后再开始下一个表。
高效的计算利用率:并发提交查询可确保 Snowflake 的计算资源得到有效利用,从而减少空闲时间。
可用性:线程安全会话对象与 Python 的原生多线程 APIs 无缝集成,这使得开发人员能够利用 Python 的内置工具来控制线程行为并优化并行执行。
线程安全会话对象和异步作业可以根据您的用例相互补充。当您不需要等待作业完成时,异步作业非常有用,它允许在没有线程池管理的情况下执行非阻塞作业。另一方面,线程安全会话对象在您想要从客户端并发提交多个查询时非常有用。在某些情况下,代码块还可以包含异步作业,这样两种方法就可以有效地一起使用。
以下是线程安全会话对象可以增强数据管道的一些示例。
示例 1:并发加载多个表¶
以下示例演示了使用三个线程同时运行 COPY INTO
命令,将三个不同 CSV 文件中的数据加载到三个单独的表中。
import threading
from snowflake.snowpark import Session
# Define the list of tables
tables = ["customers", "orders", "products"]
# Function to copy data from stage to tables
def execute_copy(table_name):
try:
# Read data from the stage using DataFrameReader
df = (
session.read.option("SKIP_HEADER", 1)
.option("PATTERN", f"{table_name}[.]csv")
.option("FORCE", True)
.csv(f"@my_stage")
)
# Copy data into the target table
df.copy_into_table(
table_name=table_name, target_columns=session.table(table_name).columns
)
except Exception as e:
print(f"Failed to copy data into {table_name}, Error: {e}")
# Create an empty list of threads
threads = []
# Loop through and start a thread for each table
for table in tables:
thread = threading.Thread(target=execute_copy, args=(table,))
threads.append(thread)
thread.start()
# Wait for all threads to finish
for thread in threads:
thread.join()
示例 2:并发处理多个表¶
以下示例演示了如何使用多线程从每个客户事务表(transaction_customer1、transaction_customer2 和 transaction_customer3)并发筛选、汇总数据并将数据插入到结果表中。
from concurrent.futures import ThreadPoolExecutor
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, month, sum, lit
# List of customers
customers = ["customer1", "customer2", "customer3"]
# Define a function to process each customer transaction table
def process_customer_table(customer_name):
table_name = f"transaction_{customer_name}"
try:
# Load the customer transaction table
df = session.table(table_name)
print(f"Processing {table_name}...")
# Filter data by positive values and non null categories
df_filtered = df.filter((col("value") > 0) & col("category").is_not_null())
# Perform aggregation: Sum of value by category and month
df_aggregated = df_filtered.with_column("month", month(col("date"))).with_column("customer_name", lit(customer_name)).group_by(col("category"), col("month"), col("customer_name")).agg(sum("value").alias("total_value"))
# Save the processed data into a new result table
df_aggregated.show()
df_aggregated.write.save_as_table("aggregate_customers", mode="append")
print(f"Data from {table_name} processed and saved")
except Exception as e:
print(f"Error processing {table_name}: {e}")
# Using ThreadPoolExecutor to handle concurrency
with ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks for each customer table
executor.map(process_customer_table, customers)
# Display the results from the aggregate table
session.table("aggregate_customers").show()
使用线程安全会话对象的限制¶
如果您需要同时管理多个事务,务必使用多个会话对象,因为单个会话的多个线程不支持并发事务。
在其他线程处于活动状态时更改会话运行时配置(包括 Snowflake 会话变量,如数据库、架构、仓库和客户端配置,如 cte_optimization_enabled、sql_simplifier_enabled)可能会导致意外行为。为了避免冲突,如果不同的线程需要不同的配置,最好使用单独的会话对象。例如,如果您需要对不同的数据库并行执行操作,请确保每个线程都有其自己的会话对象,而不是共享同一个会话。
以 Pandas DataFrame 的形式返回 DataFrame 的内容¶
要以 Pandas DataFrame 的形式返回 DataFrame 的内容,请使用 to_pandas
方法。
例如:
python_df = session.create_dataframe(["a", "b", "c"])
pandas_df = python_df.to_pandas()
Snowpark DataFrames 与 Snowpark Pandas DataFrame:我应该选择哪个?¶
通过安装 Snowpark Python 库,您可以选择使用 DataFrames API 或 Pandas on Snowflake。
Snowpark DataFrames 基于 PySpark 建模,而 Snowpark Pandas 旨在扩展 Snowpark DataFrame 功能,并为 Pandas 用户提供熟悉的界面,以方便迁移和采用。建议根据您的用例和偏好使用不同的 APIs:
在以下情况下使用 Snowpark Pandas … |
在以下情况下使用 Snowpark DataFrames … |
---|---|
您更喜欢使用 Pandas 或具有用 Pandas 编写的现有代码 |
您更喜欢使用 Spark 或具有用 Spark 编写的现有代码 |
具有包括交互式分析和迭代探索的工作流程 |
具有包括批处理和有限迭代开发的工作流程 |
熟悉如何使用立即执行的 DataFrame 操作 |
熟悉如何使用惰性求值的 DataFrame 操作 |
希望数据在操作期间保持一致和有序 |
可以接受未排序的数据 |
可以接受性能稍慢(与 Snowpark DataFrames 相比),希望更轻松地使用 API |
性能对您来说比易用性更重要 |
从实施的角度来看,Snowpark DataFrames 和 Pandas DataFrames 在语义上不同。由于 Snowpark DataFrames 基于 PySpark 建模,它会对原始数据源进行操作,获取最新更新的数据,因此它不会保持操作顺序。Snowpark Pandas 基于 Pandas 建模,Pandas 对数据快照进行操作,在操作过程中保持顺序,并允许基于顺序的位置索引。在交互式数据分析中,顺序保持对于数据的目视检查非常有用。
有关更多信息,请参阅 将 Pandas on Snowflake 与 Snowpark DataFrames 结合使用。