pandas on Snowflake¶
pandas on Snowflake 允许您直接在 Snowflake 中对数据运行 Pandas 代码。只需修改导入语句和少量代码,即可获得熟悉的 Pandas 开发体验,轻松构建健壮的数据处理管道,同时在管道扩展时无缝享受 Snowflake 的高性能和可扩展性。
pandas on Snowflake 会智能判断 Pandas 代码是本地执行还是使用 Snowflake 引擎来扩展和提升性能,这一过程称为 混合执行。在处理 Snowflake 中的大型数据集时,它会通过转换为 SQL 的方式在 Snowflake 内部本地运行工作负载,从而充分利用并行计算能力,并获得 Snowflake 的数据治理和安全优势。
pandas on Snowflake 通过 Snowpark pandas API 提供,是 Snowpark Python 库 的一部分,使得 Python 代码可以在 Snowflake 平台上进行可扩展的数据处理。
使用 Pandas on Snowflake 的好处¶
与 Python 开发者会面 – 通过提供可在 Snowflake 中原生运行的与 Pandas 兼容的层,Pandas on Snowflake 为 Python 开发者提供了熟悉的界面。
可扩展的分布式 Pandas – 通过利用 Snowflake 中现有的查询优化技术,Pandas on Snowflake 将 Pandas 的便利性与 Snowflake 的可扩展性联系起来。必须尽量减少代码重写,才能简化迁移过程,这样您可以无缝地从原型迁移到生产环境。
无需管理和调整额外的计算基础设施 – Pandas on Snowflake 利用 Snowflake 强大的计算引擎,因此您无需设置或管理任何额外的计算基础设施。
开始使用 Pandas on Snowflake¶
备注
关于如何使用 pandas on Snowflake 的实践示例,请参阅此 笔记本 (https://github.com/Snowflake-Labs/snowflake-python-recipes/blob/main/pandas%20on%20Snowflake%20101/pandas%20on%20Snowflake%20101.ipynb) 并观看此 视频 (link removed)
要安装 Pandas on Snowflake,您可以使用 conda 或 pip 来安装包。有关详细说明,请参阅 安装。
pip install "snowflake-snowpark-python[modin]"
安装 Pandas on Snowflake 后,不要以 import pandas as pd 的方式导入 Pandas,而是使用以下两行:
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
以下示例展示了如何通过 Pandas on Snowpark Python 库和 Modin 开始使用 Pandas on Snowflake。
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
# Create a Snowpark session with a default connection.
from snowflake.snowpark.session import Session
session = Session.builder.create()
# Create a Snowpark pandas DataFrame from existing Snowflake table
df = pd.read_snowflake('SNOWFALL')
# Inspect the DataFrame
df
DAY LOCATION SNOWFALL
0 1 Big Bear 8.0
1 2 Big Bear 10.0
2 3 Big Bear NaN
3 1 Tahoe 3.0
4 2 Tahoe NaN
5 3 Tahoe 13.0
6 1 Whistler NaN
7 Friday Whistler 40.0
8 3 Whistler 25.0
# In-place point update to fix data error.
df.loc[df["DAY"]=="Friday","DAY"]=2
# Inspect the columns after update.
# Note how the data type is updated automatically after transformation.
df["DAY"]
0 1
1 2
2 3
3 1
4 2
5 3
6 1
7 2
8 3
Name: DAY, dtype: int64
# Drop rows with null values.
df.dropna()
DAY LOCATION SNOWFALL
0 1 Big Bear 8.0
1 2 Big Bear 10.0
3 1 Tahoe 3.0
5 3 Tahoe 13.0
7 2 Whistler 40.0
8 3 Whistler 25.0
# Compute the average daily snowfall across locations.
df.groupby("LOCATION").mean()["SNOWFALL"]
LOCATION
Big Bear 9.0
Tahoe 8.0
Whistler 32.5
Name: SNOWFALL, dtype: float64
read_snowflake 支持从 Snowflake 视图、动态表、Iceberg 表等读取数据。您也可以直接传入 SQL 查询,并返回一个 pandas on Snowflake DataFrame,实现 SQL 与 pandas on Snowflake 之间的无缝切换。
summary_df = pd.read_snowflake("SELECT LOCATION, AVG(SNOWFALL) AS avg_snowfall FROM SNOWFALL GROUP BY LOCATION")
summary_df
混合执行的工作原理¶
备注
从 Snowpark Python 1.40.0 版本开始,使用 pandas on Snowflake 时默认启用混合执行。
pandas on Snowflake 使用混合执行来判断 Pandas 代码是本地运行还是使用 Snowflake 引擎强化性能。这让您可以继续编写熟悉的 Pandas 代码构建稳健管道,而无需考虑最优和最高效的执行方式,同时无缝享受 Snowflake 的性能和可扩展性。
示例 1:直接在代码中创建一个小型的 11 行 DataFrame。使用混合执行时,Snowflake 会选择本地内存中的 Pandas 后端来执行操作:
# Create a basic dataframe with 11 rows
df = pd.DataFrame([
("New Year's Day", "2025-01-01"),
("Martin Luther King Jr. Day", "2025-01-20"),
("Presidents' Day", "2025-02-17"),
("Memorial Day", "2025-05-26"),
("Juneteenth National Independence Day", "2025-06-19"),
("Independence Day", "2025-07-04"),
("Labor Day", "2025-09-01"),
("Columbus Day", "2025-10-13"),
("Veterans Day", "2025-11-11"),
("Thanksgiving Day", "2025-11-27"),
("Christmas Day", "2025-12-25")
], columns=["Holiday", "Date"])
# Print out the backend used for this dataframe
df.get_backend()
# >> Output: 'Pandas'
示例 2:向表中插入 1000 万行事务数据
# Create a 10M row table in Snowflake and populate with sythentic data
session.sql('''CREATE OR REPLACE TABLE revenue_transactions (Transaction_ID STRING, Date DATE, Revenue FLOAT);''').collect()
session.sql('''SET num_days = (SELECT DATEDIFF(DAY, '2024-01-01', CURRENT_DATE));''').collect()
session.sql('''INSERT INTO revenue_transactions (Transaction_ID, Date, Revenue) SELECT UUID_STRING() AS Transaction_ID, DATEADD(DAY, UNIFORM(0, $num_days, RANDOM()), '2024-01-01') AS Date, UNIFORM(10, 1000, RANDOM()) AS Revenue FROM TABLE(GENERATOR(ROWCOUNT => 10000000));''').collect()
# Read Snowflake table as Snowpark pandas dataframe
df_transactions = pd.read_snowflake("REVENUE_TRANSACTIONS")
您会看到表使用 Snowflake 作为后端,因为这是一个驻留在 Snowflake 中的大表。
print(f"The dataset size is {len(df_transactions)} and the data is located in {df_transactions.get_backend()}.")
# >> Output: The dataset size is 10000000 and the data is located in Snowflake.
#Perform some operations on 10M rows with Snowflake
df_transactions["DATE"] = pd.to_datetime(df_transactions["DATE"])
df_transactions.groupby("DATE").sum()["REVENUE"]
示例 3:筛选数据并进行 groupby 汇总,最终得到 7 行数据。
筛选数据时,Snowflake 会隐式识别从 Snowflake 到 Pandas 的引擎更改的后端选择,因为输出只有 7 行数据。
# Filter to data in last 7 days
df_transactions_filter1 = df_transactions[(df_transactions["DATE"] >= pd.Timestamp.today().date() - pd.Timedelta('7 days')) & (df_transactions["DATE"] < pd.Timestamp.today().date())]
# Since filter is not yet evaluated, data stays in Snowflake
assert df_transactions_filter1.get_backend() == "Snowflake"
# After groupby operation, result is transfered from Snowflake to Pandas
df_transactions_filter1.groupby("DATE").sum()["REVENUE"]
Notes and limitations¶
即使后端发生变化,DataFrame 类型始终为
modin.pandas.DataFrame/Series/etc,以确保与下游代码的互操作性和兼容性。为了确定使用哪个后端,Snowflake 有时会使用行大小的估算值,而不是在每一步计算 DataFrame 的精确长度。这意味着当数据集增大或减小时(例如筛选、汇总),Snowflake 可能不会在操作后立即切换到最优后端。
当有操作需要跨不同后端合并两个或更多 DataFrames 时,Snowflake 会根据最低数据传输成本决定数据的移动位置。
筛选操作可能不会导致数据移动,因为 Snowflake 可能无法估算底层过滤数据的大小。
任何由内存中 Python 数据组成的 DataFrames 都将使用 Pandas 后端,例如:
pd.DataFrame([1])
pd.DataFrame(pandas.DataFrame([1]))
pd.Series({'a': [4]})
An empty DataFrame: pd.DataFrame()
在有限操作集合中,DataFrames 会自动从 Snowflake 引擎移动到 Pandas 引擎。这些操作包括
df.apply、df.plot、df.iterrows、df.itertuples、series.items,以及在数据量较小时的聚合操作。并非所有操作都支持数据迁移。混合执行不会自动将 DataFrame 从 Pandas 引擎移动回 Snowflake,除非像
pd.concat等操作涉及多个 DataFrames。Snowflake 不会自动将 DataFrame 从 Pandas 引擎移动回 Snowflake,除非像
pd.concat等操作涉及多个 DataFrames。
何时应使用 Pandas on Snowflake¶
如果满足以下任意条件,则您应使用 Pandas on Snowflake:
您对 Pandas API 和更广泛的 PyData 生态系统很熟悉
您与熟悉 Pandas 并希望在同一代码库上进行协作的其他人属于同一团队
您有用 Pandas 编写的现有代码
您更喜欢通过基于 AI 的 copilot 工具完成更准确的代码
有关更多信息,请参阅 Snowpark DataFrames 与 Snowpark Pandas DataFrame:我应该选择哪个?
将 Pandas on Snowflake 与 Snowpark DataFrames 结合使用¶
Pandas on Snowflake 和 DataFrame API 具有高度互操作性,因此您可以构建利用这两个 APIs 的管道。有关更多信息,请参阅 Snowpark DataFrames 与 Snowpark Pandas DataFrame:我应该选择哪个?。
您可以使用以下操作在 Snowpark DataFrames 和 Snowpark Pandas DataFrames 之间进行转换:
操作 |
输入 |
输出 |
|---|---|---|
Snowpark DataFrame |
Snowpark Pandas DataFrame |
|
Snowpark Pandas DataFrame 或 Snowpark Pandas Series |
Snowpark DataFrame |
Pandas on Snowflake 与原生 Pandas 的比较¶
pandas on Snowflake and native pandas have similar DataFrame APIs with matching signatures and similar semantics. pandas on Snowflake provides the same API signature as native pandas and provides scalable computation with Snowflake. pandas on Snowflake respects the semantics described in the native pandas documentation as much as possible, but it uses the Snowflake computation and type system. However, when native pandas executes on a client machine, it uses the Python computation and type system. For information about the type mapping between pandas on Snowflake and Snowflake, see Data types.
Starting with Snowpark Python 1.40.0, pandas on Snowflake is best used with data which is already in Snowflake. To convert between native pandas and pandas on Snowflake type, use the following operations:
操作 |
输入 |
输出 |
|---|---|---|
Snowpark Pandas DataFrame |
Native pandas DataFrame - Materialize all data to the local environment. If the dataset is large, this may result in an out of memory error. |
|
原生 Pandas DataFrame、原始数据、Snowpark Pandas 对象 |
Snowpark Pandas DataFrame |
执行环境¶
pandas: Operates on a single machine and processes in-memory data.pandas on Snowflake: Integrates with Snowflake, which allows for distributed computing across a cluster of machines for large datasets, while leveraging in memory pandas for processing small datasets. This integration enables handling of much larger datasets that exceed the memory capacity of a single machine. Note that using the Snowpark pandas API requires a connection to Snowflake.
惰性求值与及早求值¶
pandas:立即执行操作,并在每次操作后将结果在内存中完全物化。这种及早求值的操作可能会导致内存压力增加,因为需要在机器内大量移动数据。pandas on Snowflake:提供与 Pandas 相同的 API 体验。它模仿 Pandas 的及早求值模型,但在内部构建了惰性求值的查询图,以实现跨操作的优化。通过查询图融合和转译操作为基础分布式 Snowflake 计算引擎提供了额外的优化机会,与直接在 Snowflake 中运行 Pandas 相比,这降低了成本和端到端管道运行时。
备注
与 I/O 相关的 APIs,以及返回值不是 Snowpark Pandas 对象(即
DataFrame、Series或Index)的 APIs 总是及早求值。例如:read_snowflaketo_snowflaketo_pandasto_dictto_list__repr__双下划线方法
__array__可由某些第三方库(如 scikit-learn)自动调用。调用此方法将把结果物化到本地机器。
数据源和存储¶
pandas:支持 IO 工具(text、CSV、HDF5 ...)中 Pandas 文档中列出的各种读取器和写入器 (https://pandas.pydata.org/docs/user_guide/io.html)。pandas on Snowflake:可以通过 Snowflake 表读取和写入,还可以读取本地或暂存的 CSV、JSON 或 parquet 文件。有关更多信息,请参阅 IO(读取和写入)。
数据类型¶
pandas:具有丰富的数据类型集,例如整数、浮点数、字符串、datetime类型和分类类型。它还支持用户定义的数据类型。Pandas 中的数据类型通常派生自基础数据,并严格执行。pandas on Snowflake:受 Snowflake 类型系统约束,该系统通过将 Pandas 数据类型转换为 Snowflake 中的 SQL 类型,将 Pandas 对象映射到 SQL。大多数 Pandas 类型在 Snowflake 中有自然的等价物,但映射并不总是一对一的。在某些情况下,多个 Pandas 类型映射到同一 SQL 类型。
下表列出了 Pandas 和 Snowflake SQL 之间的类型映射:
Pandas 类型 |
Snowflake 类型 |
|---|---|
所有带符号/无符号整数类型,包括 Pandas 扩展整数类型 |
NUMBER(38, 0) |
所有浮点类型,包括 Pandas 扩展的浮点数据类型 |
FLOAT |
|
BOOLEAN |
|
STRING |
|
TIME |
|
DATE |
所有 naive 时区 |
TIMESTAMP_NTZ |
所有 aware 时区 |
TIMESTAMP_TZ |
|
ARRAY |
|
MAP |
具有混合数据类型的对象列 |
VARIANT |
Timedelta64[ns] |
NUMBER(38, 0) |
备注
不支持分类、周期、间隔、稀疏和用户定义的数据类型。目前 Pandas on Snowpark 客户端仅支持 Timedelta。将 Timedelta 重新写入 Snowflake 时,它将被存储为 Number 类型。
下表提供了 Snowflake SQL 类型回到 Pandas on Snowflake 类型(使用 df.dtypes)的映射:
Snowflake 类型 |
Pandas on Snowflake 类型 ( |
|---|---|
NUMBER ( |
|
NUMBER ( |
|
BOOLEAN |
|
STRING、TEXT |
|
VARIANT、BINARY、GEOMETRY、GEOGRAPHY |
|
ARRAY |
|
OBJECT |
|
TIME |
|
TIMESTAMP、TIMESTAMP_NTZ、TIMESTAMP_LTZ、TIMESTAMP_TZ |
|
DATE |
|
使用 to_pandas() 从 Snowpark Pandas DataFrame 转换为原生 Pandas DataFrame 时,原生 Pandas DataFrame 的数据类型将比 Pandas on Snowflake 类型更精确,与函数和过程的 SQL-Python 数据类型映射 兼容。
类型转换和类型推理¶
Null 值处理¶
pandas:在 Pandas 1.x 版本中,Pandas 在 处理缺失数据 (https://pandas.pydata.org/docs/user_guide/missing_data.html#values-considered-missing) 时非常灵活,因此它将所有 PythonNone、np.nan、pd.NaN、pd.NA和pd.NaT都视为缺失值。在 Pandas (2.2.x) 的更高版本中,这些值被视为不同的值。pandas on Snowflake:采用与Pandas 早期版本类似的方法,将上述所有值都视为缺失值。Snowpark 重复使用 Pandas 中的NaN、NA和NaT。但请注意,所有这些缺失值都可互换处理,并在 Snowflake 表中存储为 SQL NULL。
偏移/频率别名¶
pandas:版本 2.2.1 中更改了 Pandas 中的日期偏移。单字母别名'M'、'Q'、'Y'和其他别名已弃用,而改用双字母偏移。pandas on Snowflake:仅使用 Pandas 时间序列文档 (https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#dateoffset-objects) 中描述的新偏移。
安装¶
先决条件: 需要安装 Python 3.9、3.10、3.11 或 3.12、modin 0.32.0 版本、pandas 2.2.* 版本。
小技巧
要在 Snowflake Notebooks 中使用 Pandas on Snowflake,请参阅 笔记本中的 Pandas on Snowflake 中的设置说明。
To install pandas on Snowflake in your development environment, follow these steps:
切换到您的项目目录并激活您的 Python 虚拟环境。
备注
API 正在积极开发中,因此我们建议将其安装在 Python 虚拟环境中,而不是系统范围内。这种做法允许您创建的每个项目使用特定的版本,使您免受未来版本更改的影响。
您可以使用 ` Anaconda <https://www.anaconda.com/ (https://www.anaconda.com/)>`_、` Miniconda <https://docs.conda.io/en/latest/miniconda.html (https://docs.conda.io/en/latest/miniconda.html)>`_ 或 virtualenv (https://docs.python.org/3/tutorial/venv.html) 等工具为特定的 Python 版本创建 Python 虚拟环境。
For example, to use conda to create a Python 3.12 virtual environment, type:
conda create --name snowpark_pandas python=3.12 conda activate snowpark_pandas
备注
If you previously installed an older version of pandas on Snowflake using Python 3.9 and pandas 1.5.3, you will need to upgrade your Python and pandas versions as described above. Follow the steps to create a new environment with Python 3.10 to 3.13.
使用 Modin 安装 Snowpark Python 库。
pip install "snowflake-snowpark-python[modin]"
或
conda install snowflake-snowpark-python modin==0.28.1
备注
确保安装了
snowflake-snowpark-python1.17.0 版本或更高版本。
向 Snowflake 进行身份验证¶
您必须与 Snowflake 数据库建立会话,才能使用 Pandas on Snowflake。您可以使用配置文件为会话选择连接参数,也可以在代码中枚举这些参数。有关更多信息,请参阅 为 Snowpark Python 创建会话。如果存在唯一的活动 Snowpark Python 会话,Pandas on Snowflake 将自动使用该会话。例如:
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
CONNECTION_PARAMETERS = {
'account': '<myaccount>',
'user': '<myuser>',
'password': '<mypassword>',
'role': '<myrole>',
'database': '<mydatabase>',
'schema': '<myschema>',
'warehouse': '<mywarehouse>',
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()
# pandas on Snowflake will automatically pick up the Snowpark session created above.
# It will use that session to create new DataFrames.
df = pd.DataFrame([1, 2])
df2 = pd.read_snowflake('CUSTOMER')
pd.session 是一个 Snowpark 会话,因此您可以按照对任何其他 Snowpark 会话执行的操作,对该会话执行任何操作。例如,您可以使用该会话来执行任意 SQL 查询,该查询会根据 会话 API 产生 Snowpark DataFrame,但请注意,此结果将是 Snowpark DataFrame,而不是 Snowpark Pandas DataFrame。
# pd.session is the session that pandas on Snowflake is using for new DataFrames.
# In this case it is the same as the Snowpark session that we've created.
assert pd.session is session
# Run SQL query with returned result as Snowpark DataFrame
snowpark_df = pd.session.sql('select * from customer')
snowpark_df.show()
或者,您可以在 配置文件 中配置 Snowpark 连接参数。这样就无需在代码中枚举连接参数,从而使您可以编写 Pandas on Snowflake 代码,就像您通常编写 Pandas 代码一样。要实现这一点,请在 ~/.snowflake/connections.toml 创建如下所示的配置文件:
default_connection_name = "default"
[default]
account = "<myaccount>"
user = "<myuser>"
password = "<mypassword>"
role="<myrole>"
database = "<mydatabase>"
schema = "<myschema>"
warehouse = "<mywarehouse>"
然后在代码中,您只需要使用 snowflake.snowpark.Session.builder.create() 来创建使用这些凭据的会话。
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
# Session.builder.create() will create a default Snowflake connection.
Session.builder.create()
# create a DataFrame.
df = pd.DataFrame([[1, 2], [3, 4]])
您还可以创建多个 Snowpark 会话,然后将其中一个会话分配给 Pandas on Snowflake。Pandas on Snowflake 仅使用一个会话,因此您必须使用 pd.session = pandas_session 将其中一个会话显式分配给 Pandas on Snowflake。
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
pandas_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account1>").create()
other_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account2>").create()
pd.session = pandas_session
df = pd.DataFrame([1, 2, 3])
以下示例显示,当没有活动的 Snowpark 会话时,尝试使用 Pandas on Snowflake 将引发 SnowparkSessionException,并显示以下错误:“pandas on Snowflake requires an active snowpark session, but there is none.”。创建会话后,就可以使用 Pandas on Snowflake。例如:
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
df = pd.DataFrame([1, 2, 3])
以下示例显示,当有多个活动的 Snowpark 会话时,尝试使用 Pandas on Snowflake 将导致 SnowparkSessionException,并显示以下消息:“There are multiple active snowpark sessions, but you need to choose one for pandas on Snowflake.”。
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
pandas_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account1>"}).create()
other_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account2>"}).create()
df = pd.DataFrame([1, 2, 3])
备注
You must set the session used for a new pandas on Snowflake DataFrame or Series via modin.pandas.session.
However, joining or merging DataFrames created with different sessions is not supported, so you should avoid repeatedly setting different sessions
and creating DataFrames with different sessions in a workflow.
API 参考¶
请参阅 Pandas on Snowflake API 参考,了解当前实施的 APIs 和可用方法的完整列表。
有关支持的操作的完整列表,请参阅 Pandas on Snowflake 参考中的以下表格:
APIs 和混合执行的配置参数¶
混合执行使用数据集大小估计和应用于 DataFrame 的操作组合来确定后端的选择。一般来说,行数少于 10 万的数据集倾向于使用本地 Pandas;行数超过 10 万的数据集倾向于使用 Snowflake,除非数据集是从本地文件加载的。
配置传输成本¶
要将默认切换阈值更改为其他行数限制值,可以在初始化 DataFrame 之前修改环境变量:
# Change row transfer threshold to 500k
from modin.config.envvars import SnowflakePandasTransferThreshold
SnowflakePandasTransferThreshold.put(500_000)
设置此值将对从 Snowflake 转移行数据施加惩罚。
配置本地执行限制¶
一旦 DataFrame 在本地运行,它通常会保持本地状态,除非需要将其移动回 Snowflake 进行合并,但本地可处理数据的最大行数有上限。当前此上限为 1000 万行。
检查和设置后端¶
要检查当前选择的后端,可以使用 df.get_backend() 命令,该命令返回 Pandas 表示本地执行,返回 Snowflake 表示推送到 Snowflake 执行。
要使用 set_backend 或其别名 move_to 设置当前后端:
df_local = df.set_backend('Pandas')
df_local = df.move_to('Pandas')
df_snow = df.set_backend('Snowflake')
您也可以就地设置后端:
df.set_backend('Pandas', inplace=True)
要检查并显示数据移动的 原因 信息:
pd.explain_switch()
通过固定后端手动替换后端选择¶
默认情况下,Snowflake 会自动为给定的 DataFrame 和操作选择最佳后端。如果您希望替换自动引擎选择,可以对对象及其产生的所有数据禁用自动切换,使用 pin_backend() 方法:
pinned_df_snow = df.move_to('Snowflake').pin_backend()
To re-enable automatic backend switching, call unpin_backend():
unpinned_df_snow = pinned_df_snow.unpin_backend()
在 Snowflake 笔记本中使用 Snowpark Pandas¶
要在 Snowflake 笔记本中使用 Pandas on Snowflake,请参阅 :ref:` 笔记本中的 Pandas on Snowflake <label-notebooks_snowpark_pandas>`。
在 Python 工作表中使用 Snowpark Pandas¶
要使用 Snowpark Pandas,您需要在 Python 工作表环境中的 Packages 中选择 modin 来安装 Modin。
您可以在 Settings > Return type 下选择 Python 函数的 Return 类型。默认情况下,它被设置为 Snowpark 表。要将 Snowpark Pandas DataFrame 显示为结果,您可以通过调用 to_snowpark() 将 Snowpark Pandas DataFrame 转换为 Snowpark DataFrame。在此转换中不会产生 I/O 成本。
以下是在 Python 工作表中使用 Snowpark Pandas 的示例:
import snowflake.snowpark as snowpark
def main(session: snowpark.Session):
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
[1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
[1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
columns=["DAY", "LOCATION", "SNOWFALL"])
# Print a sample of the dataframe to standard output.
print(df)
snowpark_df = df.to_snowpark(index=None)
# Return value will appear in the Results tab.
return snowpark_df
在存储过程中使用 Pandas on Snowflake¶
您可以在 存储过程 中使用 Pandas on Snowflake 来构建数据管道,并使用 任务 调度存储过程的执行。
以下是使用 SQL 创建存储过程的方法:
CREATE OR REPLACE PROCEDURE run_data_transformation_pipeline_sp()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python','modin')
HANDLER='data_transformation_pipeline'
AS $$
def data_transformation_pipeline(session):
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from datetime import datetime
# Create a Snowpark pandas DataFrame with sample data.
df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
[1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
[1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
columns=["DAY", "LOCATION", "SNOWFALL"])
# Drop rows with null values.
df = df.dropna()
# In-place point update to fix data error.
df.loc[df["DAY"]=="Friday","DAY"]=2
# Save Results as a Snowflake Table
timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M")
save_path = f"OUTPUT_{timestamp}"
df.to_snowflake(name=save_path, if_exists="replace", index=False)
return f'Transformed DataFrame saved to {save_path}.'
$$;
以下是使用 Snowflake Python API 创建存储过程的方法:
from snowflake.snowpark.context import get_active_session
session = get_active_session()
from snowflake.snowpark import Session
def data_transformation_pipeline(session: Session) -> str:
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from datetime import datetime
# Create a Snowpark pandas DataFrame with sample data.
df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
[1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
[1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
columns=["DAY", "LOCATION", "SNOWFALL"])
# Drop rows with null values.
df = df.dropna()
# In-place point update to fix data error.
df.loc[df["DAY"]=="Friday","DAY"]=2
# Save Results as a Snowflake Table
timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M")
save_path = f"OUTPUT_{timestamp}"
df.to_snowflake(name=save_path, if_exists="replace", index=False)
return f'Transformed DataFrame saved to {save_path}.'
dt_pipeline_sproc = session.sproc.register(name="run_data_transformation_pipeline_sp",
func=data_transformation_pipeline,
replace=True,
packages=['modin', 'snowflake-snowpark-python'])
要调用存储过程,您可以在 Python 中运行 dt_pipeline_sproc() 或在 SQL 中运行 CALL run_data_transformation_pipeline_sp()。
在第三方库中使用 Pandas on Snowflake¶
对于可视化和机器学习应用程序,Pandas 通常与第三方库 APIs 一起使用。Pandas on Snowflake 可与其中大部分库互操作,因此它们无需明确转换为 Pandas DataFrames 即可使用。但请注意,除有限的用例外,大多数第三方库通常不支持分布式执行。因此,这可能会导致大型数据集的性能降低。
支持的第三方库¶
下面列出的库接受 Pandas on Snowflake DataFrames 作为输入,但并非所有方法都经过测试。要详细了解 API 级别的深度互操作性状态,请参阅 与第三方库的互操作性。
Plotly
Altair
Seaborn
Matplotlib
Numpy
Scikit-Learn
XGBoost
NLTK
Streamlit
pandas on Snowflake 目前对某些 NumPy (https://numpy.org/) 和 Matplotlib (https://matplotlib.org/) APIs 的兼容性有限,例如 np.where 的分布式实现以及与 df.plot 的互操作性。在使用这些第三方库时,通过 to_pandas() 转换 Snowpark Pandas DataFrames 将避免多次 I/O 调用。
下面是一个使用 Altair (https://altair-viz.github.io/) 进行可视化和使用 scikit-learn (https://scikit-learn.org/stable/) 进行机器学习的示例。
# Create a Snowpark session with a default connection.
session = Session.builder.create()
train = pd.read_snowflake('TITANIC')
train[['Pclass', 'Parch', 'Sex', 'Survived']].head()
Pclass Parch Sex Survived
0 3 0 male 0
1 1 0 female 1
2 3 0 female 1
3 1 0 female 1
4 3 0 male 0
import altair as alt
survived_per_age_plot = alt.Chart(train).mark_bar(
).encode(
x=alt.X('Age', bin=alt.Bin(maxbins=25)),
y='count()',
column='Survived:N',
color='Survived:N',
).properties(
width=300,
height=300
).configure_axis(
grid=False
)
您还可以根据性别分析生存率。
# Perform groupby aggregation with Snowpark pandas
survived_per_gender = train.groupby(['Sex','Survived']).agg(count_survived=('Survived', 'count')).reset_index()
survived_per_gender_pandas = survived_per_gender
survived_per_gender_plot = alt.Chart(survived_per_gender).mark_bar().encode(
x='Survived:N',
y='Survived_Count',
color='Sex',
column='Sex'
).properties(
width=200,
height=200
).configure_axis(
grid=False
)
You can now use scikit-learn to train a simple model.
feature_cols = ['Pclass', 'Parch']
X_pandas = train.loc[:, feature_cols]
y_pandas = train["Survived"]
from sklearn.linear_model import LogisticRegression
logreg = LogisticRegression()
logreg.fit(X_pandas, y_pandas)
y_pred_pandas = logreg.predict(X_pandas)
acc_eval = accuracy_score(y_pandas, y_pred_pandas)
备注
为获得更高的性能,我们建议通过 to_pandas() 转换为 Pandas DataFrames,尤其是使用机器学习库(例如 scikit-learn)时。但是,to_pandas() 函数收集所有行,因此最好首先使用 sample(frac=0.1) 或 head(10) 减少数据帧大小。
不支持的库¶
将不支持的第三方库与 Pandas on Snowflake DataFrame 结合使用时,我们建议通过调用 to_pandas() 将 Snowflake DataFrame 转换为 Pandas DataFrame,然后再将 DataFrame 传递到第三方库方法。
备注
调用 to_pandas() 会将数据从 Snowflake 中提取到内存中,因此对于大型数据集和敏感用例,请注意这一点。
将 Snowflake Cortex LLM 函数与 Snowpark Pandas 结合使用¶
您可以通过 Snowpark Pandas Apply 函数 使用 Snowflake Cortex LLM 函数。
您可以通过特殊的关键字实参应用该函数。当前,支持以下 Cortex 函数:
以下示例在 Snowpark Pandas DataFrame 中的多条记录中使用 TRANSLATE 函数:
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.cortex import Translate
content_df = pd.DataFrame(["good morning","hello", "goodbye"], columns=["content"])
result = content_df.apply(Translate, from_language="en", to_language="de")
result["content"]
输出:
Guten Morgen
Hallo
Auf Wiedersehen
Name: content, dtype: object
以下示例在名为 reviews 的 Snowflake 表上使用 SENTIMENT (SNOWFLAKE.CORTEX) 函数:
from snowflake.cortex import Sentiment
s = pd.read_snowflake("reviews")["content"]
result = s.apply(Sentiment)
result
以下示例使用 EXTRACT_ANSWER (SNOWFLAKE.CORTEX) 来回答问题:
from snowflake.cortex import ExtractAnswer
content = "The Snowflake company was co-founded by Thierry Cruanes, Marcin Zukowski, and Benoit Dageville in 2012 and is headquartered in Bozeman, Montana."
df = pd.DataFrame([content])
result = df.apply(ExtractAnswer, question="When was Snowflake founded?")
result[0][0][0]["answer"]
输出:
'2012'
备注
必须安装 snowflake-ml-python (https://pypi.org/project/snowflake-ml-python/) 包才能使用 Cortex LLM 函数。
限制¶
Pandas on Snowflake 有以下限制:
Pandas on Snowflake 不保证与 OSS 第三方库兼容。但是,从版本 1.14.0a1 开始,Snowpark Pandas 引入了对 NumPy 的有限兼容性,特别是针对
np.where使用。有关更多信息,请参阅 NumPy 互操作性。当使用 Snowpark Pandas DataFrame 调用第三方库 APIs 时,Snowflake 建议您在将 DataFrame 传递给第三方库调用之前,通过调用
to_pandas()将 Snowpark Pandas DataFrame 转换为 Pandas DataFrame。有关更多信息,请参阅 在第三方库中使用 Pandas on Snowflake。Pandas on Snowflake 未与 Snowpark ML 集成。使用 Snowpark ML 时,我们建议您在调用Snowpark ML 之前,使用 to_snowpark() 将 Snowpark Pandas DataFrame 转换为 Snowpark DataFrame。
不支持惰性
MultiIndex对象。使用MultiIndex时,它返回一个原生 PandasMultiIndex对象,这需要将所有数据拉取到客户端。在 Pandas on Snowflake 中,并非所有 Pandas APIs 都有分布式实施。对于不支持的 APIs,则会引发
NotImplementedError。有关支持的 APIs 的信息,请参阅 API 参考文档。Pandas on Snowflake 与 Pandas 2.2 的任何补丁版本兼容。
Snowpark pandas can not be referenced within Snowpark pandas
applyfunction. You can only use native pandas insideapply.示例如下:
import modin.pandas as pd import pandas df.apply(lambda row: pandas.to_datetime(f"{row.date} {row.time}"), axis=1)
故障排除¶
本部分介绍在使用 Pandas on Snowflake 时的故障排除提示。
在进行故障排除时,请尝试在原生 Pandas DataFrame(或示例)上运行相同的操作,以查看相同的错误是否仍然存在。这种方法可能会提供有关如何修复查询的提示。例如:
df = pd.DataFrame({"a": [1,2,3], "b": ["x", "y", "z"]}) # Running this in Snowpark pandas throws an error df["A"].sum() # Convert a small sample of 10 rows to pandas DataFrame for testing pandas_df = df.head(10).to_pandas() # Run the same operation. KeyError indicates that the column reference is incorrect pandas_df["A"].sum() # Fix the column reference to get the Snowpark pandas query working df["a"].sum()
如果您打开了一个长时间运行的笔记本,请注意,默认情况下,Snowflake 会话在会话空闲 240 分钟(4 小时)后超时。当会话过期时,如果在 Snowflake 查询上运行其他 Pandas,您会收到以下错误:“Authentication token has expired.The user must authenticate again.”。此时,您必须重新建立与 Snowflake 的连接。这可能会导致损失任何未持久化的会话变量。有关如何配置会话空闲超时参数的更多信息,请参阅 会话策略。
最佳实践¶
本部分介绍在使用 pandas on Snowflake 时要遵循的最佳实践。
避免使用迭代代码模式,如
for循环、iterrows和iteritems。迭代代码模式会快速增加生成的查询复杂度。让 Pandas on Snowflake 执行数据分发和计算并行化,而不是客户端代码。当涉及到迭代代码模式时,请尝试寻找可以在整个 DataFrame 上执行的操作,并使用相应的操作。
for i in np.arange(0, 50):
if i % 2 == 0:
data = pd.concat([data, pd.DataFrame({'A': i, 'B': i + 1}, index=[0])], ignore_index=True)
else:
data = pd.concat([data, pd.DataFrame({'A': i}, index=[0])], ignore_index=True)
# Instead of creating one DataFrame per row and concatenating them,
# try to directly create the DataFrame out of the data, like this:
data = pd.DataFrame(
{
"A": range(0, 50),
"B": [i + 1 if i % 2 == 0 else None for i in range(50)],
},
)
避免调用
apply、applymap和transform,它们最终通过 UDFs 或 UDTFs 实施,其性能可能不如常规 SQL 查询。如果应用的函数具有等效 DataFrame 或 Series 运算,则改用该运算。例如,直接调用df.groupby('col1').sum(),而不是df.groupby('col1').apply('sum')。将 DataFrame 或 Series 传递给第三方库调用之前,调用
to_pandas()。Pandas on Snowflake 不保证与第三方库兼容。使用物化的常规 Snowflake 表来避免额外的 I/O 开销。Pandas on Snowflake 在仅以常规表的数据快照为基础运行。对于其他类型(包括外部表、视图和 Apache Iceberg™ 表),在拍摄快照之前会创建一个临时表,这会引入额外的物化开销。
Pandas on Snowflake 在使用 DataFrames 从 Snowflake 表创建
read_snowflake时提供快速和零复制克隆功能。在进行其他操作之前仔细检查结果类型,如果需要,使用
astype进行显式类型转换。由于有限的类型推理功能,如果没有给出类型提示,
df.apply将返回对象(变量)类型的结果,即使结果包含所有整数值。如果其他操作要求dtype为int,则可以通过调用astype方法进行显式类型转换,以在继续之前更正列类型。如无必要,请避免调用需要求值和物化的 APIs。
未返回
Series或Dataframe的 APIs 需要及早求值和物化以产生正确类型的结果。绘制方法也是如此。减少对这些 APIs 的调用,以尽量减少不必要的求值和物化。避免在大型数据集上调用
np.where(<cond>, <scalar>, n)。<scalar>将广播到大小为<cond>的 DataFrame,这可能很慢。当使用迭代构建的查询时,
df.cache_result可用于物化中间结果,以减少重复求值,并改善延迟和降低整体查询的复杂性。例如:df = pd.read_snowflake('pandas_test') df2 = pd.pivot_table(df, index='index_col', columns='pivot_col') # expensive operation df3 = df.merge(df2) df4 = df3.where(df2 == True)
在以上示例中,要生成
df2的查询的计算开销很大,在创建df3和df4时都会重用该查询。将df2物化到临时表中(使后续操作涉及df2表扫描而不是透视)可以减少代码块的整体延迟:df = pd.read_snowflake('pandas_test') df2 = pd.pivot_table(df, index='index_col', columns='pivot_col') # expensive operation df2.cache_result(inplace=True) df3 = df.merge(df2) df4 = df3.where(df2 == True)
示例¶
下面是使用 Pandas 操作的代码示例。我们从名为 pandas_test 的 Snowpark Pandas DataFrame 开始,其中包含三列:COL_STR、COL_FLOAT 和 COL_INT。要查看与以下示例相关的笔记本,请参阅 Snowflake-Labs 存储库中的 Pandas on Snowflake 示例 (https://github.com/Snowflake-Labs/sf-samples/blob/main/samples/snowpark-pandas/api-examples/api_examples.ipynb)。
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
CONNECTION_PARAMETERS = {
'account': '<myaccount>',
'user': '<myuser>',
'password': '<mypassword>',
'role': '<myrole>',
'database': '<mydatabase>',
'schema': '<myschema>',
'warehouse': '<mywarehouse>',
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()
df = pd.DataFrame([['a', 2.1, 1],['b', 4.2, 2],['c', 6.3, None]], columns=["COL_STR", "COL_FLOAT", "COL_INT"])
df
COL_STR COL_FLOAT COL_INT
0 a 2.1 1.0
1 b 4.2 2.0
2 c 6.3 NaN
我们将 DataFrame 保存名为 pandas_test 的 Snowflake 表,我们将在整个示例中使用该表。
df.to_snowflake("pandas_test", if_exists='replace',index=False)
接下来,通过 Snowflake 表创建 DataFrame。删除列 COL_INT,然后使用名为 row_position 的列将结果保存回 Snowflake。
# Create a DataFrame out of a Snowflake table.
df = pd.read_snowflake('pandas_test')
df.shape
(3, 3)
df.head(2)
COL_STR COL_FLOAT COL_INT
0 a 2.1 1
1 b 4.2 2
df.dropna(subset=["COL_FLOAT"], inplace=True)
df
COL_STR COL_FLOAT COL_INT
0 a 2.1 1
1 c 6.3 2
df.shape
(2, 3)
df.dtypes
COL_STR object
COL_FLOAT float64
COL_INT int64
dtype: object
# Save the result back to Snowflake with a row_pos column.
df.reset_index(drop=True).to_snowflake('pandas_test2', if_exists='replace', index=True, index_label=['row_pos'])
最终得到一个新表 pandas_test2,如下所示:
row_pos COL_STR COL_FLOAT COL_INT
0 1 a 2.0 1
1 2 b 4.0 2
IO(读取和写入)¶
# Reading and writing to Snowflake
df = pd.DataFrame({"fruit": ["apple", "orange"], "size": [3.4, 5.4], "weight": [1.4, 3.2]})
df.to_snowflake("test_table", if_exists="replace", index=False )
df_table = pd.read_snowflake("test_table")
# Generate sample CSV file
with open("data.csv", "w") as f:
f.write('fruit,size,weight\napple,3.4,1.4\norange,5.4,3.2')
# Read from local CSV file
df_csv = pd.read_csv("data.csv")
# Generate sample JSON file
with open("data.json", "w") as f:
f.write('{"fruit":"apple", "size":3.4, "weight":1.4},{"fruit":"orange", "size":5.4, "weight":3.2}')
# Read from local JSON file
df_json = pd.read_json('data.json')
# Upload data.json and data.csv to Snowflake stage named @TEST_STAGE
# Read CSV and JSON file from stage
df_csv = pd.read_csv('@TEST_STAGE/data.csv')
df_json = pd.read_json('@TEST_STAGE/data.json')
有关更多信息,请参阅 输入/输出。
索引¶
df = pd.DataFrame({"a": [1,2,3], "b": ["x", "y", "z"]})
df.columns
Index(['a', 'b'], dtype='object')
df.index
Index([0, 1, 2], dtype='int8')
df["a"]
0 1
1 2
2 3
Name: a, dtype: int8
df["b"]
0 x
1 y
2 z
Name: b, dtype: object
df.iloc[0,1]
'x'
df.loc[df["a"] > 2]
a b
2 3 z
df.columns = ["c", "d"]
df
c d
0 1 x
1 2 y
2 3 z
df = df.set_index("c")
df
d
c
1 x
2 y
3 z
df.rename(columns={"d": "renamed"})
renamed
c
1 x
2 y
3 z
缺少值¶
import numpy as np
df = pd.DataFrame([[np.nan, 2, np.nan, 0],
[3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, np.nan],
[np.nan, 3, np.nan, 4]],
columns=list("ABCD"))
df
A B C D
0 NaN 2.0 NaN 0.0
1 3.0 4.0 NaN 1.0
2 NaN NaN NaN NaN
3 NaN 3.0 NaN 4.0
df.isna()
A B C D
0 True False True False
1 False False True False
2 True True True True
3 True False True False
df.fillna(0)
A B C D
0 0.0 2.0 0.0 0.0
1 3.0 4.0 0.0 1.0
2 0.0 0.0 0.0 0.0
3 0.0 3.0 0.0 4.0
df.dropna(how="all")
A B C D
0 NaN 2.0 NaN 0.0
1 3.0 4.0 NaN 1.0
3 NaN 3.0 NaN 4.0
类型转换¶
df = pd.DataFrame({"int": [1,2,3], "str": ["4", "5", "6"]})
df
int str
0 1 4
1 2 5
2 3 6
df_float = df.astype(float)
df_float
int str
0 1.0 4.0
1 2.0 5.0
2 3.0 6.0
df_float.dtypes
int float64
str float64
dtype: object
pd.to_numeric(df.str)
0 4.0
1 5.0
2 6.0
Name: str, dtype: float64
df = pd.DataFrame({'year': [2015, 2016],
'month': [2, 3],
'day': [4, 5]})
pd.to_datetime(df)
0 2015-02-04
1 2016-03-05
dtype: datetime64[ns]
二元运算¶
df_1 = pd.DataFrame([[1,2,3],[4,5,6]])
df_2 = pd.DataFrame([[6,7,8]])
df_1.add(df_2)
0 1 2
0 7.0 9.0 11.0
1 NaN NaN NaN
s1 = pd.Series([1, 2, 3])
s2 = pd.Series([2, 2, 2])
s1 + s2
0 3
1 4
2 5
dtype: int64
df = pd.DataFrame({"A": [1,2,3], "B": [4,5,6]})
df["A+B"] = df["A"] + df["B"]
df
A B A+B
0 1 4 5
1 2 5 7
2 3 6 9
聚合¶
df = pd.DataFrame([[1, 2, 3],
[4, 5, 6],
[7, 8, 9],
[np.nan, np.nan, np.nan]],
columns=['A', 'B', 'C'])
df.agg(['sum', 'min'])
A B C
sum 12.0 15.0 18.0
min 1.0 2.0 3.0
df.median()
A 4.0
B 5.0
C 6.0
dtype: float64
合并¶
df1 = pd.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
'value': [1, 2, 3, 5]})
df1
lkey value
0 foo 1
1 bar 2
2 baz 3
3 foo 5
df2 = pd.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'],
'value': [5, 6, 7, 8]})
df2
rkey value
0 foo 5
1 bar 6
2 baz 7
3 foo 8
df1.merge(df2, left_on='lkey', right_on='rkey')
lkey value_x rkey value_y
0 foo 1 foo 5
1 foo 1 foo 8
2 bar 2 bar 6
3 baz 3 baz 7
4 foo 5 foo 5
5 foo 5 foo 8
df = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
df
key A
0 K0 A0
1 K1 A1
2 K2 A2
3 K3 A3
4 K4 A4
5 K5 A5
other = pd.DataFrame({'key': ['K0', 'K1', 'K2'],
'B': ['B0', 'B1', 'B2']})
df.join(other, lsuffix='_caller', rsuffix='_other')
key_caller A key_other B
0 K0 A0 K0 B0
1 K1 A1 K1 B1
2 K2 A2 K2 B2
3 K3 A3 None None
4 K4 A4 None None
5 K5 A5 None None
分组¶
df = pd.DataFrame({'Animal': ['Falcon', 'Falcon','Parrot', 'Parrot'],
'Max Speed': [380., 370., 24., 26.]})
df
Animal Max Speed
0 Falcon 380.0
1 Falcon 370.0
2 Parrot 24.0
3 Parrot 26.0
df.groupby(['Animal']).mean()
Max Speed
Animal
Falcon 375.0
Parrot 25.0
有关更多信息,请参阅 GroupBy。
透视¶
df = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
"bar", "bar", "bar", "bar"],
"B": ["one", "one", "one", "two", "two",
"one", "one", "two", "two"],
"C": ["small", "large", "large", "small",
"small", "large", "small", "small",
"large"],
"D": [1, 2, 2, 3, 3, 4, 5, 6, 7],
"E": [2, 4, 5, 5, 6, 6, 8, 9, 9]})
df
A B C D E
0 foo one small 1 2
1 foo one large 2 4
2 foo one large 2 5
3 foo two small 3 5
4 foo two small 3 6
5 bar one large 4 6
6 bar one small 5 8
7 bar two small 6 9
8 bar two large 7 9
pd.pivot_table(df, values='D', index=['A', 'B'],
columns=['C'], aggfunc="sum")
C large small
A B
bar one 4.0 5
two 7.0 6
foo one 4.0 1
two NaN 6
df = pd.DataFrame({'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
'baz': [1, 2, 3, 4, 5, 6],
'zoo': ['x', 'y', 'z', 'q', 'w', 't']})
df
foo bar baz zoo
0 one A 1 x
1 one B 2 y
2 one C 3 z
3 two A 4 q
4 two B 5 w
5 two C 6 t
资源¶
快速入门:Pandas on Snowflake 入门 (https://quickstarts.snowflake.com/guide/getting_started_with_pandas_on_snowflake/index.html)
快速入门:使用 Snowpark Python 的数据工程管道 (https://quickstarts.snowflake.com/guide/data_engineering_pipelines_with_snowpark_pandas/#0)