在 Snowpark Python 中使用 DataFrames

在 Snowpark 中,主要通过 DataFrame 来查询和处理数据。本主题说明如何使用 DataFrames。

本主题内容:

要检索和操作数据,需要使用 DataFrame 类。DataFrame 表示延迟评估的关系型数据集,延迟评估是指仅在触发特定操作时执行。从某种意义上说, DataFrame 就像一个需要评估才能检索数据的查询。

要将数据检索到 DataFrame 之中,请执行以下步骤:

  1. 构造 DataFrame,为数据集指定数据源

    例如,您可以创建一个 DataFrame 来保存源自表、外部 CSV 文件、本地数据或 SQL 语句的执行的数据。

  2. 指定 DataFrame 中的数据集应如何转换

    例如,可以指定应该选择哪些列、如何筛选行、如何对结果进行排序和分组等。

  3. 执行语句以将数据检索到 DataFrame 之中

    为了将数据检索到 DataFrame 之中,必须调用执行操作的方法(例如 collect() 方法)。

接下来的部分将更详细地介绍这些步骤。

设置本部分的示例

本部分的一些示例使用 DataFrame 查询名为 sample_product_data 的表。若要运行这些示例,可以执行以下 SQL 语句,以创建此表并用一些数据来填充此表。

可以使用 Snowpark Python 运行 SQL 语句:

>>> session.sql('CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT)').collect()
[Row(status='Table SAMPLE_PRODUCT_DATA successfully created.')]
>>> session.sql("""
... INSERT INTO sample_product_data VALUES
... (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
... (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
... (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
... (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
... (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
... (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
... (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
... (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
... (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
... (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
... (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
... (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100)
... """).collect()
[Row(number of rows inserted=12)]
Copy

要验证表是否已创建,请运行:

>>> session.sql("SELECT count(*) FROM sample_product_data").collect()
[Row(COUNT(*)=12)]
Copy

在 Python 工作表中设置示例

要在 :doc:` Python 工作表 </developer-guide/snowpark/python/python-worksheets>` 中设置和运行这些示例,请创建示例表,并设置您的 Python 工作表。

  1. 创建 SQL 工作表并运行以下命令:

    CREATE OR REPLACE TABLE sample_product_data
      (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
    
    INSERT INTO sample_product_data VALUES
      (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
      (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
      (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
      (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
      (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
      (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
      (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
      (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
      (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
      (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
      (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
      (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);
    
    SELECT count(*) FROM sample_product_data;
    
    Copy
  1. 创建 Python 工作表,设置与用于创建 sample_product_data 表的 SQL 工作表相同的数据库和架构上下文。

要在 Python 工作表中使用本主题中的示例,请在处理程序函数(例如 main)中使用该示例,并使用传递到函数中的 Session 对象创建 DataFrames。

例如,调用 session 对象的 table 方法,为表创建一个 DataFrame :

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col

def main(session: snowpark.Session):
  df_table = session.table("sample_product_data")
Copy

要查看该函数生成的输出(例如通过调用 DataFrame 对象的 show 方法),请使用 Output 选项卡。

要检查该函数返回的值,请从 Settings » Return type 中选择返回值的数据类型,然后使用 Results 选项卡:

  • 如果函数返回 DataFrame,则使用默认返回类型 Table

  • 如果函数从 DataFrame 对象的 collect 方法返回 Rowlist,请为返回类型使用 Variant

  • 如果函数返回可通过类型转换操作转换为字符串的其他任何值,或者函数未返回值,请使用 String 作为返回类型。

有关更多详细信息,请参阅 运行 Python 工作表

构造 DataFrame

要构造 DataFrame,可以使用 Session 类的方法和属性。以下每种方法都基于不同类型的数据源构造 DataFrame。

可以在本地开发环境中运行这些示例,也可以在 :doc:` Python 工作表 </developer-guide/snowpark/python/python-worksheets>` 中定义的 main 函数中调用它们。

  • 要基于表、视图或流中的数据创建 DataFrame,请调用 table 方法:

    >>> # Create a DataFrame from the data in the "sample_product_data" table.
    >>> df_table = session.table("sample_product_data")
    
    # To print out the first 10 rows, call df_table.show()
    
    Copy
  • 要基于指定值创建 DataFrame,请调用 create_dataframe 方法:

    >>> # Create a DataFrame with one column named a from specified values.
    >>> df1 = session.create_dataframe([1, 2, 3, 4]).to_df("a")
    >>> df1.show()
    >>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
    >>> # return df1
    -------
    |"A"  |
    -------
    |1    |
    |2    |
    |3    |
    |4    |
    -------
    
    Copy

    创建一个包含 4 列的 DataFrame,列分别为“a”、“b”、“c”和“d”:

    >>> # Create a DataFrame with 4 columns, "a", "b", "c" and "d".
    >>> df2 = session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"])
    >>> df2.show()
    >>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
    >>> # return df2
    -------------------------
    |"A"  |"B"  |"C"  |"D"  |
    -------------------------
    |1    |2    |3    |4    |
    -------------------------
    
    Copy

    再创建一个包含 4 列的 DataFrame,列分别为“a”、“b”、“c”和“d”:

    >>> # Create another DataFrame with 4 columns, "a", "b", "c" and "d".
    >>> from snowflake.snowpark import Row
    >>> df3 = session.create_dataframe([Row(a=1, b=2, c=3, d=4)])
    >>> df3.show()
    >>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
    >>> # return df3
    -------------------------
    |"A"  |"B"  |"C"  |"D"  |
    -------------------------
    |1    |2    |3    |4    |
    -------------------------
    
    Copy

    创建 DataFrame 并指定架构:

    >>> # Create a DataFrame and specify a schema
    >>> from snowflake.snowpark.types import IntegerType, StringType, StructType, StructField
    >>> schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())])
    >>> df4 = session.create_dataframe([[1, "snow"], [3, "flake"]], schema)
    >>> df4.show()
    >>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
    >>> # return df4
    ---------------
    |"A"  |"B"    |
    ---------------
    |1    |snow   |
    |3    |flake  |
    ---------------
    
    Copy
  • 要创建包含特定值范围的 DataFrame,请调用 range 方法:

    >>> # Create a DataFrame from a range
    >>> # The DataFrame contains rows with values 1, 3, 5, 7, and 9 respectively.
    >>> df_range = session.range(1, 10, 2).to_df("a")
    >>> df_range.show()
    >>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
    >>> # return df_range
    -------
    |"A"  |
    -------
    |1    |
    |3    |
    |5    |
    |7    |
    |9    |
    -------
    
    Copy
  • 要创建 DataFrame 来保存暂存区内文件中的数据,请使用 read 属性获取 DataFrameReader 对象。在 DataFrameReader 对象中,调用与文件中的数据格式对应的方法:

    >>> from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType
    
    >>> # Create DataFrames from data in a stage.
    >>> df_json = session.read.json("@my_stage2/data1.json")
    >>> df_catalog = session.read.schema(StructType([StructField("name", StringType()), StructField("age", IntegerType())])).csv("@stage/some_dir")
    
    Copy
  • 要创建 DataFrame 来保存 SQL 查询的结果,请调用 sql 方法:

    >>> # Create a DataFrame from a SQL query
    >>> df_sql = session.sql("SELECT name from sample_product_data")
    >>> df_sql.show()
    >>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
    >>> # return df_sql
    --------------
    |"NAME"      |
    --------------
    |Product 1   |
    |Product 1A  |
    |Product 1B  |
    |Product 2   |
    |Product 2A  |
    |Product 2B  |
    |Product 3   |
    |Product 3A  |
    |Product 3B  |
    |Product 4   |
    --------------
    
    Copy

    可以使用 sql 方法执行 SELECT 语句,从表和暂存文件中检索数据,但使用 table 方法和 read 属性可在开发工具中提供更好的语法突出显示、错误突出显示和智能代码补全。

指定应该如何转换数据集

要指定需要选择的列,以及如何对结果执行筛选、排序、分组等操作,请调用用于转换数据集的 DataFrame 方法。要在这些方法中标识列,请使用 col 函数或计算结果为列的表达式。请参阅 指定列和表达式

例如:

  • 若要指定应返回的行,请调用 filter 方法:

    >>> # Import the col function from the functions module.
    >>> # Python worksheets import this function by default
    >>> from snowflake.snowpark.functions import col
    
    >>> # Create a DataFrame for the rows with the ID 1
    >>> # in the "sample_product_data" table.
    
    >>> # This example uses the == operator of the Column object to perform an
    >>> # equality check.
    >>> df = session.table("sample_product_data").filter(col("id") == 1)
    >>> df.show()
    >>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
    >>> # return df
    ------------------------------------------------------------------------------------
    |"ID"  |"PARENT_ID"  |"CATEGORY_ID"  |"NAME"     |"SERIAL_NUMBER"  |"KEY"  |"3rd"  |
    ------------------------------------------------------------------------------------
    |1     |0            |5              |Product 1  |prod-1           |1      |10     |
    ------------------------------------------------------------------------------------
    
    Copy
  • 若要指定应选择的列,请调用 select 方法:

    >>> # Import the col function from the functions module.
    >>> from snowflake.snowpark.functions import col
    
    >>> # Create a DataFrame that contains the id, name, and serial_number
    >>> # columns in the "sample_product_data" table.
    >>> df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number"))
    >>> df.show()
    >>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
    >>> # return df
    ---------------------------------------
    |"ID"  |"NAME"      |"SERIAL_NUMBER"  |
    ---------------------------------------
    |1     |Product 1   |prod-1           |
    |2     |Product 1A  |prod-1-A         |
    |3     |Product 1B  |prod-1-B         |
    |4     |Product 2   |prod-2           |
    |5     |Product 2A  |prod-2-A         |
    |6     |Product 2B  |prod-2-B         |
    |7     |Product 3   |prod-3           |
    |8     |Product 3A  |prod-3-A         |
    |9     |Product 3B  |prod-3-B         |
    |10    |Product 4   |prod-4           |
    ---------------------------------------
    
    Copy
  • 您还可以按如下方式引用列:

    >>> # Import the col function from the functions module.
    >>> from snowflake.snowpark.functions import col
    
    >>> df_product_info = session.table("sample_product_data")
    >>> df1 = df_product_info.select(df_product_info["id"], df_product_info["name"], df_product_info["serial_number"])
    >>> df2 = df_product_info.select(df_product_info.id, df_product_info.name, df_product_info.serial_number)
    >>> df3 = df_product_info.select("id", "name", "serial_number")
    
    Copy

每个方法都返回一个经过转换的新 DataFrame 对象。该方法不会影响原始 DataFrame 对象。要应用多个转换,可以 将多个方法调用链接起来,基于前一个方法调用返回的新 DataFrame 对象调用每个后续转换方法。

这些转换方法指定了如何构造 SQL 语句,并且不会从 Snowflake 数据库检索数据。执行操作以计算 DataFrame 中描述的操作方法会执行数据检索。

联接 DataFrames

若要联接 DataFrame 对象,请调用 join 方法:

>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> # Create a DataFrame that joins the two DataFrames
>>> # on the column named "key".
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2").show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2")
-------------------------------
|"KEY"  |"VALUE1"  |"VALUE2"  |
-------------------------------
|a      |1         |3         |
|b      |2         |4         |
-------------------------------
Copy

如果两个 DataFrames 都包含要用于联接的相同列,则可以使用以下示例语法:

>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> # If both dataframes have the same column "key", the following is more convenient.
>>> df_lhs.join(df_rhs, ["key"]).show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, ["key"])
-------------------------------
|"KEY"  |"VALUE1"  |"VALUE2"  |
-------------------------------
|a      |1         |3         |
|b      |2         |4         |
-------------------------------
Copy

您还可以使用 & 运算符来连接多个联接表达式:

>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> # Use & operator connect join expression. '|' and ~ are similar.
>>> df_joined_multi_column = df_lhs.join(df_rhs, (df_lhs.col("key") == df_rhs.col("key")) & (df_lhs.col("value1") < df_rhs.col("value2"))).select(df_lhs["key"].as_("key"), "value1", "value2")
>>> df_joined_multi_column.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_joined_multi_column
-------------------------------
|"KEY"  |"VALUE1"  |"VALUE2"  |
-------------------------------
|a      |1         |3         |
|b      |2         |4         |
-------------------------------
Copy

如果要执行自联接,则必须复制 DataFrame:

>>> # copy the DataFrame if you want to do a self-join
>>> from copy import copy

>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> df_lhs_copied = copy(df_lhs)
>>> df_self_joined = df_lhs.join(df_lhs_copied, (df_lhs.col("key") == df_lhs_copied.col("key")) & (df_lhs.col("value1") == df_lhs_copied.col("value1")))
Copy

如果 DataFrames 中存在重叠的列,Snowpark 会在联接结果中的列前面附加一个随机生成的前缀:

>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).show()  
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"))
-----------------------------------------------------
|"l_av5t_KEY"  |"VALUE1"  |"r_1p6k_KEY"  |"VALUE2"  |
-----------------------------------------------------
|a             |1         |a             |3         |
|b             |2         |b             |4         |
-----------------------------------------------------
Copy

可以使用 Column.alias 来重命名重叠的列:

>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].alias("key1"), df_rhs["key"].alias("key2"), "value1", "value2").show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].alias("key1"), df_rhs["key"].alias("key2"), "value1", "value2")
-----------------------------------------
|"KEY1"  |"KEY2"  |"VALUE1"  |"VALUE2"  |
-----------------------------------------
|a       |a       |1         |3         |
|b       |b       |2         |4         |
-----------------------------------------
Copy

要避免使用随机前缀,还可以指定要追加到重叠列的后缀:

>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"), lsuffix="_left", rsuffix="_right").show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"), lsuffix="_left", rsuffix="_right")
--------------------------------------------------
|"KEY_LEFT"  |"VALUE1"  |"KEY_RIGHT"  |"VALUE2"  |
--------------------------------------------------
|a           |1         |a            |3         |
|b           |2         |b            |4         |
--------------------------------------------------
Copy

这些示例使用 DataFrame.col 来指定要在联接中使用的列。有关指定列的更多方法,请参阅 指定列和表达式

如果需要基于不同的列将表与其自身联接,则不能使用单个 DataFrame 执行自联接。以下示例使用单个 DataFrame 执行自联接,操作会失败,因为联接的左侧和右侧都存在 "id" 的列表达式:

>>> from snowflake.snowpark.exceptions import SnowparkJoinException

>>> df = session.table("sample_product_data")
>>> # This fails because columns named "id" and "parent_id"
>>> # are in the left and right DataFrames in the join.
>>> try:
...     df_joined = df.join(df, col("id") == col("parent_id")) # fails
... except SnowparkJoinException as e:
...     print(e.message)
You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy.
>>> # This fails because columns named "id" and "parent_id"
>>> # are in the left and right DataFrames in the join.
>>> try:
...     df_joined = df.join(df, df["id"] == df["parent_id"])   # fails
... except SnowparkJoinException as e:
...     print(e.message)
You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy.
Copy

相反,请使用 Python 内置的 copy() 方法创建 DataFrame 对象的克隆,并使用这两个 DataFrame 对象执行联接:

>>> from copy import copy

>>> # Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
>>> df_lhs = session.table("sample_product_data")
>>> # Clone the DataFrame object to use as the right-hand side of the join.
>>> df_rhs = copy(df_lhs)

>>> # Create a DataFrame that joins the two DataFrames
>>> # for the "sample_product_data" table on the
>>> # "id" and "parent_id" columns.
>>> df_joined = df_lhs.join(df_rhs, df_lhs.col("id") == df_rhs.col("parent_id"))
>>> df_joined.count()
8
Copy

指定列和表达式

调用这些转换方法时,可能需要指定列或者使用列的表达式。例如,调用 select 方法时,需要指定要选择的列。

要引用列,请通过调用 snowflake.snowpark.functions 模块中的 col 函数来创建 Column 对象。

>>> # Import the col function from the functions module.
>>> from snowflake.snowpark.functions import col

>>> df_product_info = session.table("sample_product_data").select(col("id"), col("name"))
>>> df_product_info.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_product_info
---------------------
|"ID"  |"NAME"      |
---------------------
|1     |Product 1   |
|2     |Product 1A  |
|3     |Product 1B  |
|4     |Product 2   |
|5     |Product 2A  |
|6     |Product 2B  |
|7     |Product 3   |
|8     |Product 3A  |
|9     |Product 3B  |
|10    |Product 4   |
---------------------
Copy

备注

要为字面量创建 Column 对象,请参阅 将字面量用作列对象

指定筛选器、投影、联接条件等时,可以在表达式中使用 Column 对象。例如:

  • 可以将 Column 对象与 filter 方法一起使用,以指定筛选条件:

    >>> # Specify the equivalent of "WHERE id = 20"
    >>> # in a SQL SELECT statement.
    >>> df_filtered = df.filter(col("id") == 20)
    
    Copy
    >>> df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"])
    >>> # Specify the equivalent of "WHERE a + b < 10"
    >>> # in a SQL SELECT statement.
    >>> df_filtered = df.filter((col("a") + col("b")) < 10)
    >>> df_filtered.show()
    >>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
    >>> # return df_filtered
    -------------
    |"A"  |"B"  |
    -------------
    |1    |3    |
    -------------
    
    Copy
  • 可以将 Column 对象与 select 方法一起使用,以定义别名:

    >>> df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"])
    >>> # Specify the equivalent of "SELECT b * 10 AS c"
    >>> # in a SQL SELECT statement.
    >>> df_selected = df.select((col("b") * 10).as_("c"))
    >>> df_selected.show()
    >>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
    >>> # return df_selected
    -------
    |"C"  |
    -------
    |30   |
    |100  |
    -------
    
    Copy
  • 可以将 Column 对象与 join 方法一起使用,以定义联接条件:

    >>> dfX = session.create_dataframe([[1], [2]], schema=["a_in_X"])
    >>> dfY = session.create_dataframe([[1], [3]], schema=["b_in_Y"])
    >>> # Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y"
    >>> # in a SQL SELECT statement.
    >>> df_joined = dfX.join(dfY, col("a_in_X") == col("b_in_Y")).select(dfX["a_in_X"].alias("the_joined_column"))
    >>> df_joined.show()
    >>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
    >>> # return df_joined
    -----------------------
    |"THE_JOINED_COLUMN"  |
    -----------------------
    |1                    |
    -----------------------
    
    Copy

引用两个不同 DataFrame 对象中具有相同名称的列时(例如,基于该列联接 DataFrames ),可以在一个 DataFrame 对象中使用 DataFrame.col 方法引用该对象中的列(例如 df1.col("name")df2.col("name"))。

下面的示例演示了如何使用 DataFrame.col 方法来引用特定 DataFrame 中的列。该示例联接两个 DataFrame 对象,两者均具有名为 key 的列。该示例使用 Column.as 方法来更改新创建 DataFrame 中的列名称。

>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value"])
>>> # Create a DataFrame that joins two other DataFrames (df_lhs and df_rhs).
>>> # Use the DataFrame.col method to refer to the columns used in the join.
>>> df_joined = df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs.col("key").as_("key"), df_lhs.col("value").as_("L"), df_rhs.col("value").as_("R"))
>>> df_joined.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_joined
---------------------
|"KEY"  |"L"  |"R"  |
---------------------
|a      |1    |3    |
|b      |2    |4    |
---------------------
Copy

将对象标识符(表名称、列名称等)放在双引号里

您指定的数据库、架构、表和暂存区的名称必须符合 Snowflake 标识符要求

创建一个包含区分大小写的列的表:

>>> session.sql("""
... create or replace temp table "10tablename"(
... id123 varchar, -- case insensitive because it's not quoted.
... "3rdID" varchar, -- case sensitive.
... "id with space" varchar -- case sensitive.
... )""").collect()
>>> # Add return to the statement to return the collect() results in a Python worksheet
[Row(status='Table 10tablename successfully created.')]
Copy

随后向表中添加值:

>>> session.sql("""insert into "10tablename" (id123, "3rdID", "id with space") values ('a', 'b', 'c')""").collect()
>>> # Add return to the statement to return the collect() results in a Python worksheet
[Row(number of rows inserted=1)]
Copy

随后为表创建一个 DataFrame,并查询该表:

>>> df = session.table('"10tablename"')
>>> df.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df
---------------------------------------
|"ID123"  |"3rdID"  |"id with space"  |
---------------------------------------
|a        |b        |c                |
---------------------------------------
Copy

指定名称时,Snowflake 会将该名称视为大写形式。例如,以下调用是等效的:

>>> df.select(col("id123")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(ID123='a')]
>>> df.select(col("ID123")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(ID123='a')]
Copy

如果名称不符合标识符要求,则必须将名称放在双引号里 (")。使用反斜杠 (\) 对字符串字面量中的双引号字符进行转义。例如,以下表名称并非以字母或下划线开头,因此必须将该名称放在双引号里:

>>> df = session.table("\"10tablename\"")
Copy

也可以使用单引号而非反斜杠来转义字符串字面量中的双引号字符。

>>> df = session.table('"10tablename"')
Copy

请注意,指定 :emph:` 列 ` 的名称时,不需要将名称放在双引号里。如果列名称不符合标识符要求,Snowpark 库会自动将列名称放在双引号里:

>>> df.select(col("3rdID")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(3rdID='b')]
>>> df.select(col("\"3rdID\"")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(3rdID='b')]
Copy

再举一个例子,以下调用是等效的:

>>> df.select(col("id with space")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
>>> df.select(col("\"id with space\"")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
Copy

如果已在列名称两侧添加了双引号,则该库不会在列名称两侧插入其他双引号。

在某些情况下,列名称可能包含双引号字符:

>>> session.sql('''
... create or replace temp table quoted(
... "name_with_""air""_quotes" varchar,
... """column_name_quoted""" varchar
... )''').collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Table QUOTED successfully created.')]
Copy
>>> session.sql('''insert into quoted ("name_with_""air""_quotes", """column_name_quoted""") values ('a', 'b')''').collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(number of rows inserted=1)]
Copy

标识符要求 中所述,对于带双引号的标识符中的每个双引号字符,都必须使用两个双引号字符(例如 "name_with_""air""_quotes""""column_name_quoted""" ):

>>> df_table = session.table("quoted")
>>> df_table.select("\"name_with_\"\"air\"\"_quotes\"").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(name_with_"air"_quotes='a')]
Copy
>>> df_table.select("\"\"\"column_name_quoted\"\"\"").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row("column_name_quoted"='b')]
Copy

如果将标识符放在双引号里(无论是您显式添加了引号,还是由库为您添加了引号), Snowflake 都会将标识符视为区分大小写

>>> # The following calls are NOT equivalent!
>>> # The Snowpark library adds double quotes around the column name,
>>> # which makes Snowflake treat the column name as case-sensitive.
>>> df.select(col("id with space")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
Copy

对比此示例:

>>> from snowflake.snowpark.exceptions import SnowparkSQLException
>>> try:
...     df.select(col("ID WITH SPACE")).collect()
... except SnowparkSQLException as e:
...     print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 7
invalid identifier '"ID WITH SPACE"'
Copy

将字面量用作列对象

若要在将 Column 对象作为实参的方法中使用字面量,请将字面量传递给 snowflake.snowpark.functions 模块中的 lit 函数,从而为字面量创建 Column 对象。例如:

>>> # Import for the lit and col functions.
>>> from snowflake.snowpark.functions import col, lit

>>> # Show the first 10 rows in which num_items is greater than 5.
>>> # Use `lit(5)` to create a Column object for the literal 5.
>>> df_filtered = df.filter(col("num_items") > lit(5))
Copy

将列对象的类型转换为特定类型

要将 Column 对象的类型转换为特定类型,请调用 cast 方法,然后从 snowflake.snowpark.types 模块传入类型对象。例如,要将字面量转换为精度为 5、小数位数为 2 的 NUMBER,请运行以下语句:

>>> # Import for the lit function.
>>> from snowflake.snowpark.functions import lit

>>> # Import for the DecimalType class.
>>> from snowflake.snowpark.types import DecimalType

>>> decimal_value = lit(0.05).cast(DecimalType(5,2))
Copy

链接多个方法调用

由于每个 转换 DataFrame 对象的方法 都会返回一个应用了转换的新 DataFrame 对象,因此您可以 链接多个方法调用 (link removed),以生成以其他方式转换的新 DataFrame。

下面的示例返回为如下目的而配置的 DataFrame :

  • 查询 sample_product_data 表。

  • 返回 id = 1 的行。

  • 选择 nameserial_number 列。

    >>> df_product_info = session.table("sample_product_data").filter(col("id") == 1).select(col("name"), col("serial_number"))
    >>> df_product_info.show()
    >>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
    >>> # return df_product_info
    -------------------------------
    |"NAME"     |"SERIAL_NUMBER"  |
    -------------------------------
    |Product 1  |prod-1           |
    -------------------------------
    
    Copy

在此示例中:

  • session.table("sample_product_data") 返回 sample_product_data 表的 DataFrame。

    尽管 DataFrame 尚且不包含表中的数据,但该对象确实包含表列的定义。

  • filter(col("id") == 1) 返回 sample_product_data 表的 DataFrame(设置为返回 id = 1 的行)。

    请注意, DataFrame 还不包含表中匹配的行。在 :ref:` 调用操作方法 <label-snowpark_python_dataframe_action_method>` 之前,不会检索匹配的行。

  • select(col("name"), col("serial_number")) 返回一个 DataFrame,包含 sample_product_data 表中 id = 1 的行的 nameserial_number 列。

在链接多个方法调用时,调用顺序非常重要。每个方法调用都返回一个已转换的 DataFrame。确保后续调用使用已转换的 DataFrame。

例如,在下面的代码中,select 方法返回仅包含两列的 DataFrame:nameserial_number。对此 DataFrame 的 filter 方法调用会失败,因为它使用的 id 列不在已转换的 DataFrame 中。

>>> # This fails with the error "invalid identifier 'ID'."
>>> df_product_info = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") == 1)
>>> try:
...   df_product_info.show()
... except SnowparkSQLException as e:
...   print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 121
invalid identifier 'ID'
Copy

相反,以下代码会成功执行,因为对包含 sample_product_data 表中所有列(包括 id 列)的 DataFrame 调用了 filter() 方法:

>>> # This succeeds because the DataFrame returned by the table() method
>>> # includes the "id" column.
>>> df_product_info = session.table("sample_product_data").filter(col("id") == 1).select(col("name"), col("serial_number"))
>>> df_product_info.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_product_info
-------------------------------
|"NAME"     |"SERIAL_NUMBER"  |
-------------------------------
|Product 1  |prod-1           |
-------------------------------
Copy

使用 Snowpark Python 时,可能需要按照不同于在 SQL 语句中使用等效关键字(SELECT 和 WHERE)时的顺序执行 selectfilter 方法调用。

检索列定义

要在数据集中为 DataFrame 检索列的定义,请调用 schema 属性。此方法会返回一个 StructType 对象,其中包含 StructField 对象的 list。每个 StructField 对象都包含一列的定义。

# Import the StructType
from snowflake.snowpark.types import *
# Get the StructType object that describes the columns in the
# underlying rowset.
table_schema = session.table("sample_product_data").schema
table_schema
StructType([StructField('ID', LongType(), nullable=True), StructField('PARENT_ID', LongType(), nullable=True), StructField('CATEGORY_ID', LongType(), nullable=True), StructField('NAME', StringType(), nullable=True), StructField('SERIAL_NUMBER', StringType(), nullable=True), StructField('KEY', LongType(), nullable=True), StructField('"3rd"', LongType(), nullable=True)])
Copy

在返回的 StructType 对象中,列名称始终是规范化的。不带引号的标识符以大写形式返回,带引号的标识符以其定义所用的确切大小写形式返回。

下面的示例创建一个 DataFrame,其中包含名为 ID3rd 的列。对于名为 3rd 的列,Snowpark 库会自动将名称放在双引号里(即 "3rd"),因为 该名称不符合标识符要求

该示例调用 schema 属性,随后调用返回的 StructType 对象的 names 属性,以获取列名称的 list。这些名称使用 schema 属性返回的 StructType 进行规范化。

>>> # Create a DataFrame containing the "id" and "3rd" columns.
>>> df_selected_columns = session.table("sample_product_data").select(col("id"), col("3rd"))
>>> # Print out the names of the columns in the schema. This prints out:
>>> # This prints List["ID", "\"3rd\""]
>>> df_selected_columns.schema.names
['ID', '"3rd"']
Copy

执行操作以计算 DataFrame

如前所述, DataFrame 是延迟计算的,也就是说,在您执行操作之前, SQL 语句不会发送到服务器执行。执行操作会导致对 DataFrame 进行计算,并将相应的 SQL 语句发送到服务器执行。

以下方法可执行操作:

方法

描述

DataFrame

collect

计算 DataFrame,并将生成的数据集作为 Row 对象的 list 返回。

DataFrame

count

计算 DataFrame 并返回行数。

DataFrame

show

计算 DataFrame,并将行打印到控制台。此方法将行数限制为 10 行(默认值)。

DataFrameWriter

save_as_table

将 DataFrame 中的数据保存到指定表中。请参阅 将数据保存到表中

例如,若要对表执行查询并返回结果,请调用 collect 方法:

>>> # Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
>>> # This does not execute the query.
>>> df = session.table("sample_product_data").select(col("id"), col("name"))

>>> # Send the query to the server for execution and
>>> # return a list of Rows containing the results.
>>> results = df.collect()
>>> # Use a return statement to return the collect() results in a Python worksheet
>>> # return results
Copy

若要执行查询并返回结果数量,请调用 count 方法:

>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")

>>> # Send the query to the server for execution and
>>> # print the count of rows in the table.
>>> print(df_products.count())
12
Copy

若要执行查询并将结果打印到控制台,请调用 show 方法:

>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")

>>> # Send the query to the server for execution and
>>> # print the results to the console.
>>> # The query limits the number of rows to 10 by default.
>>> df_products.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_products
-------------------------------------------------------------------------------------
|"ID"  |"PARENT_ID"  |"CATEGORY_ID"  |"NAME"      |"SERIAL_NUMBER"  |"KEY"  |"3rd"  |
-------------------------------------------------------------------------------------
|1     |0            |5              |Product 1   |prod-1           |1      |10     |
|2     |1            |5              |Product 1A  |prod-1-A         |1      |20     |
|3     |1            |5              |Product 1B  |prod-1-B         |1      |30     |
|4     |0            |10             |Product 2   |prod-2           |2      |40     |
|5     |4            |10             |Product 2A  |prod-2-A         |2      |50     |
|6     |4            |10             |Product 2B  |prod-2-B         |2      |60     |
|7     |0            |20             |Product 3   |prod-3           |3      |70     |
|8     |7            |20             |Product 3A  |prod-3-A         |3      |80     |
|9     |7            |20             |Product 3B  |prod-3-B         |3      |90     |
|10    |0            |50             |Product 4   |prod-4           |4      |100    |
-------------------------------------------------------------------------------------
Copy

若要将行数限制为 20 行:

>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")

>>> # Limit the number of rows to 20, rather than 10.
>>> df_products.show(20)
>>> # All rows are returned when you use return in a Python worksheet to return the DataFrame as a table
>>> # return df_products
-------------------------------------------------------------------------------------
|"ID"  |"PARENT_ID"  |"CATEGORY_ID"  |"NAME"      |"SERIAL_NUMBER"  |"KEY"  |"3rd"  |
-------------------------------------------------------------------------------------
|1     |0            |5              |Product 1   |prod-1           |1      |10     |
|2     |1            |5              |Product 1A  |prod-1-A         |1      |20     |
|3     |1            |5              |Product 1B  |prod-1-B         |1      |30     |
|4     |0            |10             |Product 2   |prod-2           |2      |40     |
|5     |4            |10             |Product 2A  |prod-2-A         |2      |50     |
|6     |4            |10             |Product 2B  |prod-2-B         |2      |60     |
|7     |0            |20             |Product 3   |prod-3           |3      |70     |
|8     |7            |20             |Product 3A  |prod-3-A         |3      |80     |
|9     |7            |20             |Product 3B  |prod-3-B         |3      |90     |
|10    |0            |50             |Product 4   |prod-4           |4      |100    |
|11    |10           |50             |Product 4A  |prod-4-A         |4      |100    |
|12    |10           |50             |Product 4B  |prod-4-B         |4      |100    |
-------------------------------------------------------------------------------------
Copy

备注

如果调用 schema 属性以获取 DataFrame 中列的定义,则不需要调用操作方法。

以 Pandas DataFrame 的形式返回 DataFrame 的内容

要以 Pandas DataFrame 的形式返回 DataFrame 的内容,请使用 to_pandas 方法。

例如:

>>> python_df = session.create_dataframe(["a", "b", "c"])
>>> pandas_df = python_df.to_pandas()
Copy

将数据保存到表中

要将 DataFrame 的内容保存到表中,请执行以下操作:

  1. 调用 write 属性以获取 DataFrameWriter 对象。

  2. 调用 DataFrameWriter 对象中的 mode 方法并指定模式。有关更多信息,请参阅 ` API 文档 <https://docs.snowflake.cn/developer-guide/snowpark/reference/python/latest/api/snowflake.snowpark.DataFrameWriter.mode#snowflake.snowpark.DataFrameWriter.mode>`_。此方法返回使用指定模式进行了配置的新 DataFrameWriter 对象。

  3. DataFrameWriter 对象中调用 save_as_table 方法,以将 DataFrame 的内容保存到指定的表中。

请注意,无需调用单独的方法(例如 collect)来执行将数据保存到表中的 SQL 语句。

例如:

>>> df.write.mode("overwrite").save_as_table("table1")
Copy

基于 DataFrame 创建视图

若要基于 DataFrame 创建视图,请调用 create_or_replace_view 方法,该方法会立即创建新视图:

>>> import os
>>> database = os.environ["snowflake_database"]  # use your own database and schema
>>> schema = os.environ["snowflake_schema"]
>>> view_name = "my_view"
>>> df.create_or_replace_view(f"{database}.{schema}.{view_name}")
[Row(status='View MY_VIEW successfully created.')]
Copy

在 Python 工作表中,由于工作表在数据库和架构的上下文中运行,您可以运行以下命令来创建视图:

# Define a DataFrame
df_products = session.table("sample_product_data")
# Define a View name
view_name = "my_view"
# Create the view
df_products.create_or_replace_view(f"{view_name}")
# return the view name
return view_name + " successfully created"
my_view successfully created
Copy

通过调用 create_or_replace_view 创建的视图是持久保留的。如果不再需要该视图,可以 手动删除视图

也可使用 create_or_replace_temp_view 方法来创建临时视图。临时视图仅在创建临时视图的会话中可用。

处理暂存区中的文件

本节介绍如何查询 Snowflake 暂存区中文件内的数据。要对文件执行其他操作,请 使用 SQL 语句

要查询 Snowflake 暂存区中文件内的数据,请使用 DataFrameReader 类:

  1. 调用 Session 类中的 read 方法,以访问 DataFrameReader 对象。

  2. 如果文件采用 CSV 格式,请描述文件中的字段。要这样做,请执行以下操作:

    1. 创建一个 StructType 对象,该对象包含描述文件中字段的 StructField 对象 list

    2. 对于每个 StructField 对象,请指定以下内容:

      • 字段的名称。

      • 字段的数据类型(在 snowflake.snowpark.types 模块中指定为对象)。

      • 字段是否可为 null。

      例如:

      >>> from snowflake.snowpark.types import *
      
      >>> schema_for_data_file = StructType([
      ...                          StructField("id", StringType()),
      ...                          StructField("name", StringType())
      ...                       ])
      
      Copy
    3. 调用 DataFrameReader 对象中的 schema 属性,传入 StructType 对象。

      例如:

      >>> df_reader = session.read.schema(schema_for_data_file)
      
      Copy

      schema 属性会返回一个 DataFrameReader 对象,该对象配置为读取包含指定字段的文件。

      请注意,对于其他格式(如 JSON)的文件,无需执行此操作。对于这些文件, DataFrameReader 会将数据视为字段名称为 $1、类型为 VARIANT 的单个字段处理。

  3. 如果需要指定有关数据读取方式的其他信息(例如,数据经过压缩,或者 CSV 文件使用分号而非逗号来分隔字段),请调用 DataFrameReader 对象的 optionoptions 方法。

    option 方法接受要设置的选项的名称和值,并允许合并多个链接的调用, options 方法接受选项名称及其相应值构成的字典。

    有关文件格式选项的名称和值,请参阅 有关 CREATE FILE FORMAT 的文档

    您还可以设置 :doc:` 有关 COPY INTO TABLE 的文档 </sql-reference/sql/copy-into-table>` 中所述的复制选项。请注意,当您 :ref:`将数据检索到 DataFrame 中 <label-snowpark_python_dataframe_action_method>`时,设置复制选项可能造成费用更昂贵的执行策略。

    下面的示例设置了 DataFrameReader 对象,以查询未压缩且使用分号作为字段分隔符的 CSV 文件中的数据。

    >>> df_reader = df_reader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    
    Copy

    optionoptions 方法返回使用指定选项配置的 DataFrameReader 对象。

  4. 调用与文件格式相对应的方法(例如 csv 方法),传入文件的位置。

    >>> df = df_reader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
    
    Copy

    与文件格式对应的方法返回一个 DataFrame 对象,该对象配置为保存该文件中的数据。

  5. 使用 DataFrame 对象方法,对 :ref:` 数据集执行所需的任何转换 <label-snowpark_python_dataframe_transform>` (例如,选择特定字段、筛选行等)。

    例如,要从名为 my_stage 的暂存区中的一个 JSON 文件内提取 color 元素:

    >>> # Import the sql_expr function from the functions module.
    >>> from snowflake.snowpark.functions import sql_expr
    
    >>> df = session.read.json("@my_stage").select(sql_expr("$1:color"))
    
    Copy

    如前所述,对于格式并非 CSV 的文件(例如 JSON 格式的文件), DataFrameReader 会将文件中的数据视为名为 $1 的单个 VARIANT 列。

    此示例使用 snowflake.snowpark.functions 模块中的 sql_expr 函数来指定 color 元素的路径。

    请注意, sql_expr 函数不会解释或修改输入实参。该函数仅允许您使用 SQL 构造 Snowpark API 还不支持的表达式和代码片段。

  6. 调用操作方法 来查询文件中的数据。

    与表的 DataFrames 一样,在您调用操作方法之前,不会将数据检索到 DataFrame 中。

使用半结构化数据

使用 DataFrame,您可以查询和访问 :doc:` 半结构化数据 </user-guide/semistructured-intro>` (例如 JSON 数据)。接下来的几个部分将介绍如何在 DataFrame 中处理半结构化数据。

备注

这些部分中的示例使用 示例中使用的示例数据 中的示例数据。

遍历半结构化数据

若要引用半结构化数据中的特定字段或元素,请使用 Column 对象的以下方法:

  • 获取 col_object["<field_name>"] 属性,以返回 OBJECT(或包含 OBJECT 的 VARIANT )中字段的 Column 对象。

  • 使用 col_object[<index>],返回 ARRAY(或包含 ARRAY 的 VARIANT)中元素的 Column 对象。

备注

如果路径中的字段名称或元素不规则,并且导致难以使用上述索引,则可以使用 getget_ignore_caseget_path 作为替代方法。

例如,以下代码会从 :ref:` 示例数据 <label-sample_data_semistructured_data>` 的 src 列的对象内,选择 dealership 字段:

>>> from snowflake.snowpark.functions import col

>>> df = session.table("car_sales")
>>> df.select(col("src")["dealership"]).show()
Copy

该代码会打印以下输出:

----------------------------
|"""SRC""['DEALERSHIP']"   |
----------------------------
|"Valley View Auto Sales"  |
|"Tindel Toyota"           |
----------------------------
Copy

备注

DataFrame 中的值放在双引号之间,因为这些值以字符串字面量的形式返回。若要将这些值的类型转换为特定类型,请参阅 显式转换半结构化数据中的值

还可以 链接多个方法调用,以遍历特定字段或元素的路径。

例如,以下代码选择 salesperson 对象中的 name 字段:

>>> df = session.table("car_sales")
>>> df.select(df["src"]["salesperson"]["name"]).show()
Copy

该代码会打印以下输出:

------------------------------------
|"""SRC""['SALESPERSON']['NAME']"  |
------------------------------------
|"Frank Beasley"                   |
|"Greg Northrup"                   |
------------------------------------
Copy

再举一个例子,下面的代码选择 vehicle 字段的第一个元素,其中包含车辆数组。该示例还会选择第一个元素中的 price 字段。

>>> df = session.table("car_sales")
>>> df.select(df["src"]["vehicle"][0]).show()
>>> df.select(df["src"]["vehicle"][0]["price"]).show()
Copy

该代码会打印以下输出:

---------------------------
|"""SRC""['VEHICLE'][0]"  |
---------------------------
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "paint protection"   |
|  ],                     |
|  "make": "Honda",       |
|  "model": "Civic",      |
|  "price": "20275",      |
|  "year": "2017"         |
|}                        |
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "rust proofing",     |
|    "fabric protection"  |
|  ],                     |
|  "make": "Toyota",      |
|  "model": "Camry",      |
|  "price": "23500",      |
|  "year": "2017"         |
|}                        |
---------------------------

------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']"  |
------------------------------------
|"20275"                           |
|"23500"                           |
------------------------------------
Copy

除了上述访问字段的方法之外,还有一种替代方法,如果路径中的字段名称或元素不规则,您可以使用 getget_ignore_caseget_path 函数。

例如,以下代码行都会打印对象中指定字段的值:

>>> from snowflake.snowpark.functions import get, get_path, lit

>>> df.select(get(col("src"), lit("dealership"))).show()
>>> df.select(col("src")["dealership"]).show()
Copy

同样,以下代码行都会打印对象中指定路径处的字段值:

>>> df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
>>> df.select(col("src")["vehicle"][0]["make"]).show()
Copy

显式转换半结构化数据中的值

默认情况下,字段和元素的值以字符串字面量(包括双引号)的形式返回,如上面的示例所示。

若要避免意外结果,请调用 cast 方法,将值转换为特定类型。例如,以下代码会打印出未经类型转换和经过类型转换的值:

>>> # Import the objects for the data types, including StringType.
>>> from snowflake.snowpark.types import *

>>> df = session.table("car_sales")
>>> df.select(col("src")["salesperson"]["id"]).show()
>>> df.select(col("src")["salesperson"]["id"].cast(StringType())).show()
Copy

该代码会打印以下输出:

----------------------------------
|"""SRC""['SALESPERSON']['ID']"  |
----------------------------------
|"55"                            |
|"274"                           |
----------------------------------

---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)"  |
---------------------------------------------------
|55                                               |
|274                                              |
---------------------------------------------------
Copy

将对象数组展平为行

如果需要将半结构化数据“展平”为 DataFrame(例如,为数组中的每个对象生成一行),请使用 join_table_function 方法调用 flatten。此方法与 FLATTEN SQL 函数等效。如果传入对象或数组的路径,该方法会返回一个 DataFrame,其中包含对象或数组中各字段或元素的行。

例如,在 :ref:` 示例数据 <label-sample_data_semistructured_data>` 中, src:customer 是一个包含有关客户的信息的对象数组。每个对象都包含 nameaddress 字段。

如果将此路径传递给 flatten 函数:

>>> df = session.table("car_sales")
>>> df.join_table_function("flatten", col("src")["customer"]).show()
Copy

该方法返回一个 DataFrame:

----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC"                                      |"SEQ"  |"KEY"  |"PATH"  |"INDEX"  |"VALUE"                            |"THIS"                               |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{                                          |1      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "San Francisco, CA",  |  {                                  |
|    {                                      |       |       |        |         |  "name": "Joyce Ridgely",         |    "address": "San Francisco, CA",  |
|      "address": "San Francisco, CA",      |       |       |        |         |  "phone": "16504378889"           |    "name": "Joyce Ridgely",         |
|      "name": "Joyce Ridgely",             |       |       |        |         |}                                  |    "phone": "16504378889"           |
|      "phone": "16504378889"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Valley View Auto Sales",  |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "55",                            |       |       |        |         |                                   |                                     |
|    "name": "Frank Beasley"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "paint protection"                 |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Honda",                     |       |       |        |         |                                   |                                     |
|      "model": "Civic",                    |       |       |        |         |                                   |                                     |
|      "price": "20275",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
|{                                          |2      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "New York, NY",       |  {                                  |
|    {                                      |       |       |        |         |  "name": "Bradley Greenbloom",    |    "address": "New York, NY",       |
|      "address": "New York, NY",           |       |       |        |         |  "phone": "12127593751"           |    "name": "Bradley Greenbloom",    |
|      "name": "Bradley Greenbloom",        |       |       |        |         |}                                  |    "phone": "12127593751"           |
|      "phone": "12127593751"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Tindel Toyota",           |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "274",                           |       |       |        |         |                                   |                                     |
|    "name": "Greg Northrup"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "rust proofing",                   |       |       |        |         |                                   |                                     |
|        "fabric protection"                |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Toyota",                    |       |       |        |         |                                   |                                     |
|      "model": "Camry",                    |       |       |        |         |                                   |                                     |
|      "price": "23500",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
----------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

从 DataFrame 中,您可以从 VALUE 字段中的每个对象选择 nameaddress 字段:

>>> df.join_table_function("flatten", col("src")["customer"]).select(col("value")["name"], col("value")["address"]).show()
Copy
-------------------------------------------------
|"""VALUE""['NAME']"   |"""VALUE""['ADDRESS']"  |
-------------------------------------------------
|"Joyce Ridgely"       |"San Francisco, CA"     |
|"Bradley Greenbloom"  |"New York, NY"          |
-------------------------------------------------
Copy

以下代码 将值类型转换为特定类型 并更改列的名称,从而补充了上一个示例:

>>> df.join_table_function("flatten", col("src")["customer"]).select(col("value")["name"].cast(StringType()).as_("Customer Name"), col("value")["address"].cast(StringType()).as_("Customer Address")).show()
Copy
-------------------------------------------
|"Customer Name"     |"Customer Address"  |
-------------------------------------------
|Joyce Ridgely       |San Francisco, CA   |
|Bradley Greenbloom  |New York, NY        |
-------------------------------------------
Copy

执行 SQL 语句

若要执行您指定的 SQL 语句,请调用 Session 类中的 sql 方法,然后传入要执行的语句。该方法返回一个 DataFrame。

请注意,在您 :ref:` 调用操作方法 <label-snowpark_python_dataframe_action_method>` 之前, SQL 语句不会执行。

>>> # Get the list of the files in a stage.
>>> # The collect() method causes this SQL statement to be executed.
>>> session.sql("create or replace temp stage my_stage").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Stage area MY_STAGE successfully created.')]

>>> stage_files_df = session.sql("ls @my_stage").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
>>> # Resume the operation of a warehouse.
>>> # Note that you must call the collect method to execute
>>> # the SQL statement.
>>> session.sql("alter warehouse if exists my_warehouse resume if suspended").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Statement executed successfully.')]

>>> # Set up a SQL statement to copy data from a stage to a table.
>>> session.sql("copy into sample_product_data from @my_stage file_format=(type = csv)").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Copy executed with 0 files processed.')]
Copy

如果要 :ref:` 调用方法来转换 DataFrame <label-snowpark_python_dataframe_transform>` (例如 filterselect 等),请注意,这些方法仅在基础 SQL 语句是 SELECT 语句时才有效。其他类型的 SQL 语句不支持转换方法。

>>> df = session.sql("select id, parent_id from sample_product_data where id < 10")
>>> # Because the underlying SQL statement for the DataFrame is a SELECT statement,
>>> # you can call the filter method to transform this DataFrame.
>>> results = df.filter(col("id") < 3).select(col("id")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet

>>> # In this example, the underlying SQL statement is not a SELECT statement.
>>> df = session.sql("ls @my_stage")
>>> # Calling the filter method results in an error.
>>> try:
...   df.filter(col("size") > 50).collect()
... except SnowparkSQLException as e:
...   print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 104
invalid identifier 'SIZE'
Copy
语言: 中文