pandas on Snowflake

Pandas on Snowflake 可让您直接在 Snowflake 中的数据上以分布式方式运行 Pandas 代码。只需更改 import 语句和几行代码,您就可以获得熟悉的 Pandas 体验,并享受 Snowflake 的可扩展性和安全性优势。借助 Pandas on Snowflake,您就可以处理更大的数据集,并避免将 pandas 管道移植到其他大数据框架或配置大型昂贵机器所需的时间和费用。它通过转译到 SQL 在 Snowflake 中原生运行工作负载,使其能够利用并行化优势以及 Snowflake 的数据治理和安全性优势。Pandas on Snowflake 通过 Snowpark Pandas API 交付,是 Snowpark Python 库 的一部分,可在 Snowflake 平台内对 Python 代码进行可扩展的数据处理。

使用 Pandas on Snowflake 的好处

  • 与 Python 开发者会面 – 通过提供可在 Snowflake 中原生运行的与 Pandas 兼容的层,Pandas on Snowflake 为 Python 开发者提供了熟悉的界面。

  • 可扩展的分布式 Pandas – 通过利用 Snowflake 中现有的查询优化技术,Pandas on Snowflake 将 Pandas 的便利性与 Snowflake 的可扩展性联系起来。必须尽量减少代码重写,才能简化迁移过程,这样您可以无缝地从原型迁移到生产环境。

  • 安全性和治理 – 数据不会离开 Snowflake 的安全平台。Pandas on Snowflake 允许以统一的方式访问数据组织内的数据,并实现更轻松的审计和治理。

  • 无需管理和调整额外的计算基础设施 – Pandas on Snowflake 利用 Snowflake 强大的计算引擎,因此您无需设置或管理任何额外的计算基础设施。

何时应使用 Pandas on Snowflake

如果满足以下任意条件,则您应使用 Pandas on Snowflake:

  • 您对 Pandas API 和更广泛的 PyData 生态系统很熟悉

  • 您与熟悉 Pandas 并希望在同一代码库上进行协作的其他人属于同一团队

  • 您有用 Pandas 编写的现有代码

  • 您的工作流程具有与顺序相关的需求,由 Pandas DataFrames 提供支持。例如,您需要数据集在整个工作流程中保持相同顺序

  • 您更喜欢通过基于 AI 的 copilot 工具完成更准确的代码

开始使用 Pandas on Snowflake

要安装 Pandas on Snowflake,您可以使用 conda 或 pip 来安装包。有关详细说明,请参阅 安装

pip install "snowflake-snowpark-python[modin]"
Copy

安装 Pandas on Snowflake 后,不要以 import pandas as pd 的方式导入 Pandas,而是使用以下两行:

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
Copy

以下示例展示了如何通过 Pandas on Snowpark Python 库和 Modin 开始使用 Pandas on Snowflake。

import modin.pandas as pd

# Import the Snowpark plugin for modin.
import snowflake.snowpark.modin.plugin

# Create a Snowpark session with a default connection.
from snowflake.snowpark.session import Session
session = Session.builder.create()

# Create a Snowpark pandas DataFrame from existing Snowflake table
df = pd.read_snowflake('SNOWFALL')

# Alternatively, create a Snowpark pandas DataFrame with sample data.
df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
                    [1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
                    [1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
                    columns=["DAY", "LOCATION", "SNOWFALL"])

# Inspect the DataFrame
df
Copy
      DAY  LOCATION  SNOWFALL
0       1  Big Bear       8.0
1       2  Big Bear      10.0
2       3  Big Bear       NaN
3       1     Tahoe       3.0
4       2     Tahoe       NaN
5       3     Tahoe      13.0
6       1  Whistler       NaN
7  Friday  Whistler      40.0
8       3  Whistler      25.0
# In-place point update to fix data error.
df.loc[df["DAY"]=="Friday","DAY"]=2

# Inspect the columns after update.
# Note how the data type is updated automatically after transformation.
df["DAY"]
Copy
0    1
1    2
2    3
3    1
4    2
5    3
6    1
7    2
8    3
Name: DAY, dtype: int64
# Drop rows with null values.
df.dropna()
Copy
  DAY  LOCATION  SNOWFALL
0   1  Big Bear       8.0
1   2  Big Bear      10.0
3   1     Tahoe       3.0
5   3     Tahoe      13.0
7   2  Whistler      40.0
8   3  Whistler      25.0
# Compute the average daily snowfall across locations.
df.groupby("LOCATION").mean()["SNOWFALL"]
Copy
LOCATION
Big Bear     9.0
Tahoe        8.0
Whistler    32.5
Name: SNOWFALL, dtype: float64

将 Pandas on Snowflake 与 Snowpark DataFrames 结合使用

Pandas on Snowflake 和 DataFrame API 具有高度互操作性,因此您可以构建利用这两个 APIs 的管道。

您可以使用以下操作在 Snowpark DataFrames 和 Snowpark Pandas DataFrames 之间进行转换:

操作

输入

输出

备注

to_snowpark_pandas

Snowpark DataFrame

Snowpark Pandas DataFrame

此操作为每一行分配隐式顺序,并在 DataFrame 的生命周期内保持此行顺序。在此转换中会产生 I/O 成本。

to_snowpark

Snowpark Pandas DataFrame 或 Snowpark Pandas Series

Snowpark DataFrame

此操作不会保持行排序,生成的 Snowpark DataFrame 会对源 Snowpark Pandas DataFrame 的数据快照进行操作。与直接通过表创建的 Snowpark DataFrames 不同,此行为意味着在 Snowpark 操作的评估过程中不会反映对基础表的更改。DataFrame 上不能应用任何 DDL 操作和有限 DML 操作。在此转换中不会产生 I/O 成本。

我们建议尽可能使用 read_snowflake 直接从 Snowflake 读取表,而不是将其与 Snowpark DataFrame 进行转换,以避免不必要的转换成本。

有关更多信息,请参阅 Snowpark DataFrames 与 Snowpark Pandas DataFrame:我应该选择哪个?

Pandas on Snowflake 与原生 Pandas 的比较

Pandas on Snowflake 与原生 Pandas 具有相似的 DataFrame APIs(包含匹配的签名和相似的语义)。Pandas on Snowflake 提供了与原生 Pandas (Pandas 2.2.1) 相同的 API 签名,并提供了使用 Snowflake 的可扩展计算。Pandas on Snowflake 尽可能遵循原生 Pandas 文档中描述的语义,但使用 Snowflake 计算和类型系统。但是,当原生 Pandas 在客户端机器上执行时,它使用 Python 计算和类型系统。有关 Pandas on Snowflake 和 Snowflake 之间的类型映射的信息,请参阅 数据类型

像原生 Pandas 一样,Pandas on Snowflake 也有索引的概念,并保持行排序。但是,它们不同的执行环境导致它们的行为存在某些细微的差异。本部分将指出需要注意的主要差异。

Pandas on Snowflake 最好与 Snowflake 中已有的数据一起使用,但您可以使用以下操作在原生 Pandas 和 Pandas on Snowflake 之间进行转换:

操作

输入

输出

备注

to_pandas

Snowpark Pandas DataFrame

原生 Pandas DataFrame

将所有数据物化到本地环境。如果数据集很大,这可能会导致内存不足错误。

`pd.DataFrame(...)<https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/modin/pandas_api/snowflake.snowpark.modin.pandas.DataFrame>`_

原生 Pandas DataFrame、原始数据、Snowpark Pandas 对象

Snowpark Pandas DataFrame

这应该留给小的 DataFrames。使用大量本地数据创建 DataFrame 将引入临时表,并可能因数据上传而导致性能问题。

session.write_pandas

原生 Pandas DataFrame、Snowpark Pandas 对象

Snowflake 表

随后可以使用 write_pandas 调用中指定的表名,通过 pd.read_snowflake 将结果加载到 Pandas on Snowflake 中。

执行环境

  • pandas:在单个机器上运行并处理内存中的数据。

  • pandas on Snowflake:与 Snowflake 集成,允许跨机器集群进行分布式计算。这种集成可以处理超过单个机器内存容量的更大数据集。请注意,使用 Snowpark Pandas API 需要连接到 Snowflake。

惰性求值与及早求值

  • pandas:立即执行操作,并在每次操作后将结果在内存中完全物化。这种及早求值的操作可能会导致内存压力增加,因为需要在机器内大量移动数据。

  • pandas on Snowflake:提供与 Pandas 相同的 API 体验。它模仿 Pandas 的及早求值模型,但在内部构建了惰性求值的查询图,以实现跨操作的优化。

    通过查询图融合和转译操作为基础分布式 Snowflake 计算引擎提供了额外的优化机会,与直接在 Snowflake 中运行 Pandas 相比,这降低了成本和端到端管道运行时。

    备注

    与 I/O 相关的 APIs,以及返回值不是 Snowpark Pandas 对象(即 DataFrameSeriesIndex)的 APIs 总是及早求值。例如:

    • read_snowflake

    • to_snowflake

    • to_pandas

    • to_dict

    • to_list

    • __repr__

    • 双下划线方法 __array__ 可由某些第三方库(如 scikit-learn)自动调用。调用此方法将把结果物化到本地机器。

数据源和存储

数据类型

  • pandas:具有丰富的数据类型集,例如整数、浮点数、字符串、datetime 类型和分类类型。它还支持用户定义的数据类型。Pandas 中的数据类型通常派生自基础数据,并严格执行。

  • pandas on Snowflake:受 Snowflake 类型系统约束,该系统通过将 Pandas 数据类型转换为 Snowflake 中的 SQL 类型,将 Pandas 对象映射到 SQL。大多数 Pandas 类型在 Snowflake 中有自然的等价物,但映射并不总是一对一的。在某些情况下,多个 Pandas 类型映射到同一 SQL 类型。

下表列出了 Pandas 和 Snowflake SQL 之间的类型映射:

Pandas 类型

Snowflake 类型

所有带符号/无符号整数类型,包括 Pandas 扩展整数类型

NUMBER(38, 0)

所有浮点类型,包括 Pandas 扩展的浮点数据类型

FLOAT

boolBooleanDtype

BOOLEAN

strStringDtype

STRING

datetime.time

TIME

datetime.date

DATE

所有 naive 时区 datetime 类型

TIMESTAMP_NTZ

所有 aware 时区 datetime 类型

TIMESTAMP_TZ

listtuplearray

ARRAY

dictjson

MAP

具有混合数据类型的对象列

VARIANT

Timedelta64[ns]

NUMBER(38, 0)

备注

不支持分类、周期、间隔、稀疏和用户定义的数据类型。目前 Pandas on Snowpark 客户端仅支持 Timedelta。将 Timedelta 重新写入 Snowflake 时,它将被存储为 Number 类型。

下表提供了 Snowflake SQL 类型回到 Pandas on Snowflake 类型(使用 df.dtypes)的映射:

Snowflake 类型

Pandas on Snowflake 类型 (df.dtypes)

NUMBER (scale = 0)

int64

NUMBER (scale > 0)、REAL

float64

BOOLEAN

bool

STRING、TEXT

object (str)

VARIANT、BINARY、GEOMETRY、GEOGRAPHY

object

ARRAY

object (list)

OBJECT

object (dict)

TIME

object (datetime.time)

TIMESTAMP、TIMESTAMP_NTZ、TIMESTAMP_LTZ、TIMESTAMP_TZ

datetime64[ns]

DATE

object (datetime.date)

使用 to_pandas() 从 Snowpark Pandas DataFrame 转换为原生 Pandas DataFrame 时,原生 Pandas DataFrame 的数据类型将比 Pandas on Snowflake 类型更精确,与函数和过程的 SQL-Python 数据类型映射 兼容。

类型转换和类型推理

  • pandas:依赖于 NumPy (https://numpy.org/),默认情况下,遵循 NumPy 和 Python 类型系统进行隐式类型转换和推理。例如,它将布尔视为整数类型,因此 1 + True 返回 2

  • pandas on Snowflake:根据上表将 NumPy 和 Python 类型映射到 Snowflake 类型,并使用基础 Snowflake 类型系统进行隐式 类型转换和推理。例如,根据 逻辑数据类型,它不会隐式将布尔转换为整数类型,因此 1 + True 会导致类型转换错误。

Null 值处理

  • pandas:在 Pandas 1.x 版本中,Pandas 在 处理缺失数据 (https://pandas.pydata.org/docs/user_guide/missing_data.html#values-considered-missing) 时非常灵活,因此它将所有 Python Nonenp.nanpd.NaNpd.NApd.NaT 都视为缺失值。在 Pandas (2.2.x) 的更高版本中,这些值被视为不同的值。

  • pandas on Snowflake:采用与Pandas 早期版本类似的方法,将上述所有值都视为缺失值。Snowpark 重复使用 Pandas 中的 NaNNANaT。但请注意,所有这些缺失值都可互换处理,并在 Snowflake 表中存储为 SQL NULL。

偏移/频率别名

  • pandas:版本 2.2.1 中更改了 Pandas 中的日期偏移。单字母别名 'M''Q''Y' 和其他别名已弃用,而改用双字母偏移。

  • pandas on Snowflake:仅使用 Pandas 时间序列文档 (https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#dateoffset-objects) 中描述的新偏移。

安装

先决条件: 需要安装 Python 3.9、3.10 或 3.11、modin 0.28.1 版本、pandas 2.2.1 版本。

小技巧

要在 Snowflake Notebooks 中使用 Pandas on Snowflake,请参阅 笔记本中的 Pandas on Snowflake 中的设置说明。

要在开发环境中安装 Pandas on Snowflake,请执行以下步骤:

  1. 切换到您的项目目录并激活您的 Python 虚拟环境。

    备注

    API 正在积极开发中,因此我们建议将其安装在 Python 虚拟环境中,而不是系统范围内。这种做法允许您创建的每个项目使用特定的版本,使您免受未来版本更改的影响。

    您可以使用 ` Anaconda <https://www.anaconda.com/ (https://www.anaconda.com/)>`_、` Miniconda <https://docs.conda.io/en/latest/miniconda.html (https://docs.conda.io/en/latest/miniconda.html)>`_ 或 ` virtualenv <https://docs.python.org/3/tutorial/venv.html (https://docs.python.org/3/tutorial/venv.html)>`_ 等工具为特定的 Python 版本创建 Python 虚拟环境。

    例如,要使用 conda 创建 Python 3.9 虚拟环境,请输入:

    conda create --name snowpark_pandas python=3.9
    conda activate snowpark_pandas
    
    Copy

    备注

    如果您之前使用 Python 3.8 和 Pandas 1.5.3 在 Snowflake 上安装了旧版本的 Pandas on Snowflake,则需要如上所述升级 Python 和 Pandas 版本。按照步骤使用 Python 3.9、3.10 或 3.11 创建新环境。

  2. 使用 Modin 安装 Snowpark Python 库。

    pip install "snowflake-snowpark-python[modin]"
    
    Copy

    conda install snowflake-snowpark-python modin==0.28.1
    
    Copy

备注

确保安装了 snowflake-snowpark-python 1.17.0 版本或更高版本。

向 Snowflake 进行身份验证

您必须与 Snowflake 数据库建立会话,才能使用 Pandas on Snowflake。您可以使用配置文件为会话选择连接参数,也可以在代码中枚举这些参数。有关更多信息,请参阅 为 Snowpark Python 创建会话。如果存在唯一的活动 Snowpark Python 会话,Pandas on Snowflake 将自动使用该会话。例如:

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

CONNECTION_PARAMETERS = {
    'account': '<myaccount>',
    'user': '<myuser>',
    'password': '<mypassword>',
    'role': '<myrole>',
    'database': '<mydatabase>',
    'schema': '<myschema>',
    'warehouse': '<mywarehouse>',
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()

# pandas on Snowflake will automatically pick up the Snowpark session created above.
# It will use that session to create new DataFrames.
df = pd.DataFrame([1, 2])
df2 = pd.read_snowflake('CUSTOMER')
Copy

pd.session 是一个 Snowpark 会话,因此您可以按照对任何其他 Snowpark 会话执行的操作,对该会话执行任何操作。例如,您可以使用该会话来执行任意 SQL 查询,该查询会根据 会话 API 产生 Snowpark DataFrame,但请注意,此结果将是 Snowpark DataFrame,而不是 Snowpark Pandas DataFrame。

# pd.session is the session that pandas on Snowflake is using for new DataFrames.
# In this case it is the same as the Snowpark session that we've created.
assert pd.session is session

# Run SQL query with returned result as Snowpark DataFrame
snowpark_df = pd.session.sql('select * from customer')
snowpark_df.show()
Copy

或者,您可以在 配置文件 中配置 Snowpark 连接参数。这样就无需在代码中枚举连接参数,从而使您可以编写 Pandas on Snowflake 代码,就像您通常编写 Pandas 代码一样。要实现这一点,请在 ~/.snowflake/connections.toml 创建如下所示的配置文件:

default_connection_name = "default"

[default]
account = "<myaccount>"
user = "<myuser>"
password = "<mypassword>"
role="<myrole>"
database = "<mydatabase>"
schema = "<myschema>"
warehouse = "<mywarehouse>"
Copy

然后在代码中,您只需要使用 snowflake.snowpark.Session.builder.create() 来创建使用这些凭据的会话。

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

# Session.builder.create() will create a default Snowflake connection.
Session.builder.create()
# create a DataFrame.
df = pd.DataFrame([[1, 2], [3, 4]])
Copy

您还可以创建多个 Snowpark 会话,然后将其中一个会话分配给 Pandas on Snowflake。Pandas on Snowflake 仅使用一个会话,因此您必须使用 pd.session = pandas_session 将其中一个会话显式分配给 Pandas on Snowflake。

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

pandas_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account1>").create()
other_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account2>").create()
pd.session = pandas_session
df = pd.DataFrame([1, 2, 3])
Copy

以下示例显示,当没有活动的 Snowpark 会话时,尝试使用 Pandas on Snowflake 将引发 SnowparkSessionException,并显示以下错误:“pandas on Snowflake requires an active snowpark session, but there is none.”。创建会话后,就可以使用 Pandas on Snowflake。例如:

import modin.pandas as pd
import snowflake.snowpark.modin.plugin

df = pd.DataFrame([1, 2, 3])
Copy

以下示例显示,当有多个活动的 Snowpark 会话时,尝试使用 Pandas on Snowflake 将导致 SnowparkSessionException,并显示以下消息:“There are multiple active snowpark sessions, but you need to choose one for pandas on Snowflake.”。

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

pandas_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account1>"}).create()
other_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account2>"}).create()
df = pd.DataFrame([1, 2, 3])
Copy

备注

您必须通过 modin.pandas.session 设置用于新的 Snowpark Pandas DataFrame 或 Series 的会话。但是,不支持联接或合并使用不同会话创建的 DataFrames,因此您应该避免在工作流程中重复设置不同的会话并使用不同的会话创建 DataFrames。

API 参考

请参阅 Pandas on Snowflake API 参考,了解当前实施的 APIs 和可用方法的完整列表。

有关支持的操作的完整列表,请参阅 Pandas on Snowflake 参考中的以下表格:

在 Snowflake 笔记本中使用 Snowpark Pandas

要在 Snowflake 笔记本中使用 Pandas on Snowflake,请参阅 ` 笔记本中的 Pandas on Snowflake <https://docs.snowflake.cn/en/user-guide/ui-snowsight/notebooks-use-with-snowflake#snowpark-pandas-in-notebooks>`_。

在 Python 工作表中使用 Snowpark Pandas

要使用 Snowpark Pandas,您需要在 Python 工作表环境中的 Packages 中选择 modin 来安装 Modin。

您可以在 Settings > Return type 下选择 Python 函数的 Return 类型。默认情况下,它被设置为 Snowpark 表。要将 Snowpark Pandas DataFrame 显示为结果,您可以通过调用 to_snowpark() 将 Snowpark Pandas DataFrame 转换为 Snowpark DataFrame。在此转换中不会产生 I/O 成本。

以下是在 Python 工作表中使用 Snowpark Pandas 的示例:

import snowflake.snowpark as snowpark

def main(session: snowpark.Session):
  import modin.pandas as pd
  import snowflake.snowpark.modin.plugin

  df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
                  [1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
                  [1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
                  columns=["DAY", "LOCATION", "SNOWFALL"])

  # Print a sample of the dataframe to standard output.
  print(df)

  snowpark_df = df.to_snowpark(index=None)
  # Return value will appear in the Results tab.
  return snowpark_df
Copy

在存储过程中使用 Pandas on Snowflake

您可以在 存储过程 中使用 Pandas on Snowflake 来构建数据管道,并使用 任务 调度存储过程的执行。

from snowflake.snowpark.context import get_active_session
session = get_active_session()

from snowflake.snowpark import Session

def data_transformation_pipeline(session: Session) -> str:
  import modin.pandas as pd
  import snowflake.snowpark.modin.plugin
  from datetime import datetime
  # Create a Snowpark pandas DataFrame with sample data.
  df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
                     [1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
                     [1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
                      columns=["DAY", "LOCATION", "SNOWFALL"])
  # Drop rows with null values.
  df = df.dropna()
  # In-place point update to fix data error.
  df.loc[df["DAY"]=="Friday","DAY"]=2
  # Save Results as a Snowflake Table
  timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M")
  save_path = f"OUTPUT_{timestamp}"
  df.to_snowflake(name=save_path, if_exists="replace", index=False)
  return f'Transformed DataFrame saved to {save_path}.'


  dt_pipeline_sproc = session.sproc.register(name="run_data_transformation_pipeline_sp",
                             func=data_transformation_pipeline,
                             replace=True,
                             packages=['modin', 'snowflake-snowpark-python'])
Copy

要调用存储过程,您可以在 Python 中运行 dt_pipeline_sproc() 或在 SQL 中运行 CALL run_data_transformation_pipeline_sp()

要将存储过程作为任务调度,可以使用 Snowflake Python API 创建任务。

在第三方库中使用 Pandas on Snowflake

当使用 Snowpark pandas DataFrame 调用第三方库 APIs 时,我们建议在将 DataFrame 传递给第三方库调用之前,通过调用 to_pandas() 将 Snowpark pandas DataFrame 转换为 pandas DataFrame。

备注

调用 to_pandas() 会将数据从 Snowflake 中提取到内存中,因此对于大型数据集和敏感用例请注意这一点。

pandas on Snowflake 目前对某些 NumPy (https://numpy.org/) 和 Matplotlib (https://matplotlib.org/) APIs 的兼容性有限,例如 np.where 的分布式实现以及与 df.plot 的互操作性。在使用这些第三方库时,通过 to_pandas() 转换 Snowpark Pandas DataFrames 将避免多次 I/O 调用。

下面是一个使用 Altair (https://altair-viz.github.io/) 进行可视化和使用 scikit-learn (https://scikit-learn.org/stable/) 进行机器学习的示例。

# Create a Snowpark session with a default connection.
session = Session.builder.create()

train = pd.read_snowflake('TITANIC')

train[['Pclass', 'Parch', 'Sex', 'Survived']].head()
Copy
    Pclass  Parch     Sex       Survived
0       3      0     male               0
1       1      0   female               1
2       3      0   female               1
3       1      0   female               1
4       3      0     male               0
import altair as alt
# Convert to pandas DataFrame
train_df_pandas = train.to_pandas()
survived_per_age_plot = alt.Chart(train_df_pandas).mark_bar(
).encode(
    x=alt.X('Age', bin=alt.Bin(maxbins=25)),
    y='count()',
    column='Survived:N',
    color='Survived:N',
).properties(
    width=300,
    height=300
).configure_axis(
    grid=False
)
survived_per_age_plot
Copy
altair

我们现在可以使用 scikit-learn 在转换为 Pandas 后训练一个简单的模型。

feature_cols = ['Pclass', 'Parch']
# Convert features DataFrame to pandas DataFrames
X_pandas = train_snowpark_pandas.loc[:, feature_cols].to_pandas()
# Convert labels Series to pandas Series
y_pandas = train_snowpark_pandas["Survived"].to_pandas()

from sklearn.linear_model import LogisticRegression

logreg = LogisticRegression()

logreg.fit(X_pandas, y_pandas)

y_pred_pandas = logreg.predict(X_pandas)

acc_eval = accuracy_score(y_pandas, y_pred_pandas)
Copy
scikit 模型

限制

Pandas on Snowflake 有以下限制:

  • Pandas on Snowflake 不保证与 OSS 第三方库兼容。但是,从版本 1.14.0a1 开始,Snowpark Pandas 引入了对 NumPy 的有限兼容性,特别是针对 np.where 使用。有关更多信息,请参阅 NumPy 互操作性

    当使用 Snowpark Pandas DataFrame 调用第三方库 APIs 时,Snowflake 建议您在将 DataFrame 传递给第三方库调用之前,通过调用 to_pandas() 将 Snowpark Pandas DataFrame 转换为 Pandas DataFrame。有关更多信息,请参阅 在第三方库中使用 Pandas on Snowflake

  • Pandas on Snowflake 未与 Snowpark ML 集成。使用 Snowpark ML 时,我们建议您在调用Snowpark ML 之前,使用 to_snowpark() 将 Snowpark Pandas DataFrame 转换为 Snowpark DataFrame。

  • 不支持惰性 MultiIndex 对象。使用 MultiIndex 时,它返回一个原生 Pandas MultiIndex 对象,这需要将所有数据拉取到客户端。

  • 在 Pandas on Snowflake 中,并非所有 Pandas APIs 都有分布式实施。对于不支持的 APIs,则会引发 NotImplementedError。没有分布式实施的操作会回退到存储过程。有关支持的 APIs 的信息,请参阅 API 参考文档。

  • Pandas on Snowflake 需要特定的 Pandas 版本。Pandas on Snowflake 需要 Pandas 2.2.1,并且仅提供与 Pandas 2.2.1 的兼容性。

  • 不能在 Pandas on Snowflake apply() 函数中引用 Pandas on Snowflake。您只能在 apply() 内使用原生 Pandas。

故障排除

本部分介绍在使用 Pandas on Snowflake 时的故障排除提示。

  • 在进行故障排除时,请尝试在原生 Pandas DataFrame(或示例)上运行相同的操作,以查看相同的错误是否仍然存在。这种方法可能会提供有关如何修复查询的提示。例如:

    df = pd.DataFrame({"a": [1,2,3], "b": ["x", "y", "z"]})
    # Running this in Snowpark pandas throws an error
    df["A"].sum()
    # Convert a small sample of 10 rows to pandas DataFrame for testing
    pandas_df = df.head(10).to_pandas()
    # Run the same operation. KeyError indicates that the column reference is incorrect
    pandas_df["A"].sum()
    # Fix the column reference to get the Snowpark pandas query working
    df["a"].sum()
    
    Copy
  • 如果您打开了一个长时间运行的笔记本,请注意,默认情况下,Snowflake 会话在会话空闲 240 分钟(4 小时)后超时。当会话过期时,如果在 Snowflake 查询上运行其他 Pandas,您会收到以下错误:“Authentication token has expired.The user must authenticate again.”。此时,您必须重新建立与 Snowflake 的连接。这可能会导致损失任何未持久化的会话变量。有关如何配置会话空闲超时参数的更多信息,请参阅 会话策略

最佳实践

本部分介绍在使用 pandas on Snowflake 时要遵循的最佳实践。

  • 避免使用迭代代码模式,如 for 循环、iterrowsiteritems。迭代代码模式会快速增加生成的查询复杂度。让 Pandas on Snowflake 执行数据分发和计算并行化,而不是客户端代码。当涉及到迭代代码模式时,请尝试寻找可以在整个 DataFrame 上执行的操作,并使用相应的操作。

for i in np.arange(0, 50):
  if i % 2 == 0:
    data = pd.concat([data, pd.DataFrame({'A': i, 'B': i + 1}, index=[0])], ignore_index=True)
  else:
    data = pd.concat([data, pd.DataFrame({'A': i}, index=[0])], ignore_index=True)

# Instead of creating one DataFrame per row and concatenating them,
# try to directly create the DataFrame out of the data, like this:

data = pd.DataFrame(
      {
          "A": range(0, 50),
          "B": [i + 1 if i % 2 == 0 else None for i in range(50)],
      },
)
Copy
  • 避免调用 applyapplymaptransform,它们最终通过 UDFsUDTFs 实施,其性能可能不如常规 SQL 查询。如果应用的函数具有等效 DataFrame 或 Series 运算,则改用该运算。例如,直接调用 df.groupby('col1').sum(),而不是 df.groupby('col1').apply('sum')

  • 将 DataFrame 或 Series 传递给第三方库调用之前,调用 to_pandas()。Pandas on Snowflake 不保证与第三方库兼容。

  • 使用物化的常规 Snowflake 表来避免额外的 I/O 开销。Pandas on Snowflake 在仅以常规表的数据快照为基础运行。对于其他类型(包括外部表、视图和 Apache Iceberg™ 表),在拍摄快照之前会创建一个临时表,这会引入额外的物化开销。

  • Pandas on Snowflake 在使用 read_snowflake 从 Snowflake 表创建 DataFrames 时提供快速和零复制克隆功能。但是,仅为普通数据库下的常规 Snowflake 表提供快照功能。当加载混合、Iceberg 等类型的表或加载共享数据库下的表时,将引入对常规 Snowflake 表的额外物化。快照需要提供数据一致性和排序保证,目前没有其他方法可以解决额外物化,请在使用 Pandas on Snowflake 时,尽量尝试使用正常的 Snowflake 表。

  • 在进行其他操作之前仔细检查结果类型,如果需要,使用 astype 进行显式类型转换。

    由于有限的类型推理功能,如果没有给出类型提示,df.apply 将返回对象(变量)类型的结果,即使结果包含所有整数值。如果其他操作要求 dtypeint,则可以通过调用 astype 方法进行显式类型转换,以在继续之前更正列类型。

  • 如无必要,请避免调用需要求值和物化的 APIs。

    未返回 SeriesDataframe 的 APIs 需要及早求值和物化以产生正确类型的结果。绘制方法也是如此。减少对这些 APIs 的调用,以尽量减少不必要的求值和物化。

  • 避免在大型数据集上调用 np.where(<cond>, <scalar>, n)<scalar> 将广播到大小为 <cond> 的 DataFrame,这可能很慢。

  • 当使用迭代构建的查询时,df.cache_result 可用于物化中间结果,以减少重复求值,并改善延迟和降低整体查询的复杂性。例如:

    df = pd.read_snowflake('pandas_test')
    df2 = pd.pivot_table(df, index='index_col', columns='pivot_col') # expensive operation
    df3 = df.merge(df2)
    df4 = df3.where(df2 == True)
    
    Copy

    在以上示例中,要生成 df2 的查询的计算开销很大,在创建 df3df4 时都会重用该查询。将 df2 物化到临时表中(使后续操作涉及 df2 表扫描而不是透视)可以减少代码块的整体延迟:

    df = pd.read_snowflake('pandas_test')
    df2 = pd.pivot_table(df, index='index_col', columns='pivot_col') # expensive operation
    df2.cache_result(inplace=True)
    df3 = df.merge(df2)
    df4 = df3.where(df2 == True)
    
    Copy

示例

下面是使用 Pandas 操作的代码示例。我们从名为 pandas_test 的 Snowpark Pandas DataFrame 开始,其中包含三列:COL_STRCOL_FLOATCOL_INT。要查看与以下示例相关的笔记本,请参阅 Snowflake-Labs 存储库中的 Pandas on Snowflake 示例 (https://github.com/Snowflake-Labs/sf-samples/blob/main/samples/snowpark-pandas/api-examples/api_examples.ipynb)。

import modin.pandas as pd
import snowflake.snowpark.modin.plugin

from snowflake.snowpark import Session

CONNECTION_PARAMETERS = {
    'account': '<myaccount>',
    'user': '<myuser>',
    'password': '<mypassword>',
    'role': '<myrole>',
    'database': '<mydatabase>',
    'schema': '<myschema>',
    'warehouse': '<mywarehouse>',
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()

df = pd.DataFrame([['a', 2.1, 1],['b', 4.2, 2],['c', 6.3, None]], columns=["COL_STR", "COL_FLOAT", "COL_INT"])

df
Copy
  COL_STR    COL_FLOAT    COL_INT
0       a          2.1        1.0
1       b          4.2        2.0
2       c          6.3        NaN

我们将 DataFrame 保存名为 pandas_test 的 Snowflake 表,我们将在整个示例中使用该表。

df.to_snowflake("pandas_test", if_exists='replace',index=False)
Copy

接下来,通过 Snowflake 表创建 DataFrame。删除列 COL_INT,然后使用名为 row_position 的列将结果保存回 Snowflake。

# Create a DataFrame out of a Snowflake table.
df = pd.read_snowflake('pandas_test')

df.shape
Copy
(3, 3)
df.head(2)
Copy
    COL_STR  COL_FLOAT  COL_INT
0         a        2.1        1
1         b        4.2        2
df.dropna(subset=["COL_FLOAT"], inplace=True)

df
Copy
    COL_STR  COL_FLOAT  COL_INT
0         a        2.1        1
1         c        6.3        2
df.shape
Copy
(2, 3)
df.dtypes
Copy
COL_STR       object
COL_FLOAT    float64
COL_INT        int64
dtype: object
# Save the result back to Snowflake with a row_pos column.
df.reset_index(drop=True).to_snowflake('pandas_test2', if_exists='replace', index=True, index_label=['row_pos'])
Copy

最终得到一个新表 pandas_test2,如下所示:

     row_pos  COL_STR  COL_FLOAT  COL_INT
0          1         a       2.0        1
1          2         b       4.0        2

IO(读取和写入)

# Reading and writing to Snowflake
df = pd.DataFrame({"fruit": ["apple", "orange"], "size": [3.4, 5.4], "weight": [1.4, 3.2]})
df.to_snowflake("test_table", if_exists="replace", index=False )

df_table = pd.read_snowflake("test_table")


# Generate sample CSV file
with open("data.csv", "w") as f:
    f.write('fruit,size,weight\napple,3.4,1.4\norange,5.4,3.2')
# Read from local CSV file
df_csv = pd.read_csv("data.csv")

# Generate sample JSON file
with open("data.json", "w") as f:
    f.write('{"fruit":"apple", "size":3.4, "weight":1.4},{"fruit":"orange", "size":5.4, "weight":3.2}')
# Read from local JSON file
df_json = pd.read_json('data.json')

# Upload data.json and data.csv to Snowflake stage named @TEST_STAGE
# Read CSV and JSON file from stage
df_csv = pd.read_csv('@TEST_STAGE/data.csv')
df_json = pd.read_json('@TEST_STAGE/data.json')
Copy

有关更多信息,请参阅 输入/输出

索引

df = pd.DataFrame({"a": [1,2,3], "b": ["x", "y", "z"]})
df.columns
Copy
Index(['a', 'b'], dtype='object')
df.index
Copy
Index([0, 1, 2], dtype='int8')
df["a"]
Copy
0    1
1    2
2    3
Name: a, dtype: int8
df["b"]
Copy
0    x
1    y
2    z
Name: b, dtype: object
df.iloc[0,1]
Copy
'x'
df.loc[df["a"] > 2]
Copy
a  b
2  3  z
df.columns = ["c", "d"]
df
Copy
     c  d
0    1  x
1    2  y
2    3  z
df = df.set_index("c")
df
Copy
   d
c
1  x
2  y
3  z
df.rename(columns={"d": "renamed"})
Copy
    renamed
c
1       x
2       y
3       z

缺少值

import numpy as np
df = pd.DataFrame([[np.nan, 2, np.nan, 0],
                [3, 4, np.nan, 1],
                [np.nan, np.nan, np.nan, np.nan],
                [np.nan, 3, np.nan, 4]],
                columns=list("ABCD"))
df
Copy
     A    B   C    D
0  NaN  2.0 NaN  0.0
1  3.0  4.0 NaN  1.0
2  NaN  NaN NaN  NaN
3  NaN  3.0 NaN  4.0
df.isna()
Copy
       A      B     C      D
0   True  False  True  False
1  False  False  True  False
2   True   True  True   True
3   True  False  True  False
df.fillna(0)
Copy
     A    B    C    D
0   0.0  2.0  0.0  0.0
1   3.0  4.0  0.0  1.0
2   0.0  0.0  0.0  0.0
3   0.0  3.0  0.0  4.0
df.dropna(how="all")
Copy
     A    B   C    D
0   NaN  2.0 NaN  0.0
1   3.0  4.0 NaN  1.0
3   NaN  3.0 NaN  4.0

类型转换

df = pd.DataFrame({"int": [1,2,3], "str": ["4", "5", "6"]})
df
Copy
   int str
0    1   4
1    2   5
2    3   6
df_float = df.astype(float)
df_float
Copy
   int  str
0  1.0  4.0
1  2.0  5.0
2  3.0  6.0
df_float.dtypes
Copy
int    float64
str    float64
dtype: object
pd.to_numeric(df.str)
Copy
0    4.0
1    5.0
2    6.0
Name: str, dtype: float64
df = pd.DataFrame({'year': [2015, 2016],
                'month': [2, 3],
                'day': [4, 5]})
pd.to_datetime(df)
Copy
0   2015-02-04
1   2016-03-05
dtype: datetime64[ns]

二元运算

df_1 = pd.DataFrame([[1,2,3],[4,5,6]])
df_2 = pd.DataFrame([[6,7,8]])
df_1.add(df_2)
Copy
    0    1     2
0  7.0  9.0  11.0
1  NaN  NaN   NaN
s1 = pd.Series([1, 2, 3])
s2 = pd.Series([2, 2, 2])
s1 + s2
Copy
0    3
1    4
2    5
dtype: int64
df = pd.DataFrame({"A": [1,2,3], "B": [4,5,6]})
df["A+B"] = df["A"] + df["B"]
df
Copy
   A  B  A+B
0  1  4    5
1  2  5    7
2  3  6    9

聚合

df = pd.DataFrame([[1, 2, 3],
                [4, 5, 6],
                [7, 8, 9],
                [np.nan, np.nan, np.nan]],
                columns=['A', 'B', 'C'])
df.agg(['sum', 'min'])
Copy
        A     B     C
sum  12.0  15.0  18.0
min   1.0   2.0   3.0
df.median()
Copy
A    4.0
B    5.0
C    6.0
dtype: float64

合并

df1 = pd.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
                    'value': [1, 2, 3, 5]})
df1
Copy
  lkey  value
0  foo      1
1  bar      2
2  baz      3
3  foo      5
df2 = pd.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'],
                    'value': [5, 6, 7, 8]})
df2
Copy
  rkey  value
0  foo      5
1  bar      6
2  baz      7
3  foo      8
df1.merge(df2, left_on='lkey', right_on='rkey')
Copy
  lkey  value_x rkey  value_y
0  foo        1  foo        5
1  foo        1  foo        8
2  bar        2  bar        6
3  baz        3  baz        7
4  foo        5  foo        5
5  foo        5  foo        8
df = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
                'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
df
Copy
  key   A
0  K0  A0
1  K1  A1
2  K2  A2
3  K3  A3
4  K4  A4
5  K5  A5
other = pd.DataFrame({'key': ['K0', 'K1', 'K2'],
                    'B': ['B0', 'B1', 'B2']})
df.join(other, lsuffix='_caller', rsuffix='_other')
Copy
  key_caller   A key_other     B
0         K0  A0        K0    B0
1         K1  A1        K1    B1
2         K2  A2        K2    B2
3         K3  A3      None  None
4         K4  A4      None  None
5         K5  A5      None  None

分组

df = pd.DataFrame({'Animal': ['Falcon', 'Falcon','Parrot', 'Parrot'],
               'Max Speed': [380., 370., 24., 26.]})

df
Copy
   Animal  Max Speed
0  Falcon      380.0
1  Falcon      370.0
2  Parrot       24.0
3  Parrot       26.0
df.groupby(['Animal']).mean()
Copy
        Max Speed
Animal
Falcon      375.0
Parrot       25.0

有关更多信息,请参阅 GroupBy

透视

df = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
                        "bar", "bar", "bar", "bar"],
                "B": ["one", "one", "one", "two", "two",
                        "one", "one", "two", "two"],
                "C": ["small", "large", "large", "small",
                        "small", "large", "small", "small",
                        "large"],
                "D": [1, 2, 2, 3, 3, 4, 5, 6, 7],
                "E": [2, 4, 5, 5, 6, 6, 8, 9, 9]})
df
Copy
     A    B      C  D  E
0  foo  one  small  1  2
1  foo  one  large  2  4
2  foo  one  large  2  5
3  foo  two  small  3  5
4  foo  two  small  3  6
5  bar  one  large  4  6
6  bar  one  small  5  8
7  bar  two  small  6  9
8  bar  two  large  7  9
pd.pivot_table(df, values='D', index=['A', 'B'],
                   columns=['C'], aggfunc="sum")
Copy
    C    large  small
A   B
bar one    4.0      5
    two    7.0      6
foo one    4.0      1
    two    NaN      6
df = pd.DataFrame({'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
                'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
                'baz': [1, 2, 3, 4, 5, 6],
                'zoo': ['x', 'y', 'z', 'q', 'w', 't']})
df
Copy
   foo bar  baz zoo
0  one   A    1   x
1  one   B    2   y
2  one   C    3   z
3  two   A    4   q
4  two   B    5   w
5  two   C    6   t

资源

  • 快速入门:Pandas on Snowflake 入门 (https://quickstarts.snowflake.com/guide/getting_started_with_pandas_on_snowflake/index.html)

  • 快速入门:使用 Snowpark Python 的数据工程管道 (https://quickstarts.snowflake.com/guide/data_engineering_pipelines_with_snowpark_pandas/#0)

语言: 中文