将 Snowflake SQLAlchemy 工具包与 Python Connector 结合使用

Snowflake SQLAlchemy 在 Snowflake Connector for Python 之上运行,作为桥接 Snowflake 数据库和 SQLAlchemy 应用程序的方言。

本主题内容:

先决条件

Snowflake Connector for Python

Snowflake SQLAlchemy 的唯一要求是 Snowflake Connector for Python;但是,不需要安装该连接器,因为安装 Snowflake SQLAlchemy 会自动安装连接器。

数据分析和 Web 应用程序框架(可选)

Snowflake SQLAlchemy 可以与 pandas、Jupyter 和 Pyramid 一起使用,它们为数据分析和 Web 应用程序提供了更高级别的应用程序框架。但是,从零开始构建工作环境并不是一项简单的任务,特别是对于新手用户来说。安装框架需要 C 编译器和工具,选择正确的工具和版本是一个障碍,可能会阻止用户使用 Python 应用程序。

构建环境的一种更简单的方法是通过 Anaconda,它为所有用户(包括数据分析师和学生等非 Python 专家)提供了完整的预编译技术堆栈。有关 Anaconda 安装说明,请参阅 Anaconda 安装文档。然后可以使用 pip 将 Snowflake SQLAlchemy 包安装在 Anaconda 之上。

安装 Snowflake SQLAlchemy

可以使用 pip 从公共 PyPI 存储库安装 Snowflake SQLAlchemy 包:

pip install --upgrade snowflake-sqlalchemy
Copy

pip 自动安装所有必需的模块,包括用于 Snowflake Connector for Python。

请注意,开发者说明与源代码一起托管在 ` GitHub <https://github.com/snowflakedb/snowflake-sqlalchemy (https://github.com/snowflakedb/snowflake-sqlalchemy)>`_ 上。

验证安装

  1. 创建一个包含以下 Python 示例代码的文件(例如 validate.py),该代码连接到 Snowflake 并显示 Snowflake 的版本:

    #!/usr/bin/env python
    from sqlalchemy import create_engine
    
    engine = create_engine(
        'snowflake://{user}:{password}@{account_identifier}/'.format(
            user='<user_login_name>',
            password='<password>',
            account_identifier='<account_identifier>',
        )
    )
    try:
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        print(results[0])
    finally:
        connection.close()
        engine.dispose()
    
    Copy
  2. <user_login_name><password><account_identifier> 替换为 Snowflake 账户和用户的相应值。有关更多详细信息,请参阅 ` 连接参数 `_(本主题内容)。

  3. 执行示例代码。例如,如果您创建了一个名为 validate.py 的文件:

    python validate.py
    
    Copy

应显示 Snowflake 版本(例如 1.6.0)。

Snowflake 特定的参数和行为

Snowflake SQLAlchemy 尽可能为 SQLAlchemy 应用程序提供兼容的功能。

但是,Snowflake SQLAlchemy 还提供了 Snowflake 特定的参数和行为,以下各部分对此进行了介绍。

连接参数

必填参数

Snowflake SQLAlchemy 使用以下连接字符串语法连接到 Snowflake 并启动会话:

'snowflake://<user_login_name>:<password>@<account_identifier>'
Copy

其中:

  • <user_login_name> 是您的 Snowflake 用户的登录名。

  • <password> 是 Snowflake 用户的密码。

  • <account_identifier> 是您的账户标识符。请参阅 配置客户端、驱动程序、库或第三方应用程序以连接到 Snowflake

    备注

    snowflakecomputing.cn 域名作为账户标识符的一部分。Snowflake 会自动将域名追加到您的账户标识符,以创建所需连接。

其他连接参数

您可以选择在连接字符串的末尾(<account_name> 之后)包含以下附加信息:

'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
Copy

其中:

  • <database_name><schema_name> 是 Snowflake 会话的初始数据库和架构,用正斜杠 (/) 分隔。

  • warehouse=<warehouse_name>role=<role_name>' 是会话的初始仓库和角色,指定为参数字符串,用问号 (?) 分隔。

备注

登录后,始终可以为会话更改连接字符串中指定的初始数据库、架构、仓库和角色。

代理服务器配置

不支持代理服务器参数。相反,请使用支持的环境变量来配置代理服务器。有关信息,请参阅 使用代理服务器

连接字符串示例

以下示例使用用户名 testuser1、密码 0123456、账户标识符 myorganization-myaccount、数据库 testdb、架构 public、仓库 testwh 和角色 myrole 调用 create_engine 方法:

from sqlalchemy import create_engine
engine = create_engine(
    'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole'
)
Copy

为方便起见,可以使用 snowflake.sqlalchemy.URL 方法来构造连接字符串,并连接到数据库。下面的示例构造与上一示例相同的连接字符串:

from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = '0123456',
    database = 'testdb',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
))
Copy

打开和关闭连接

通过执行 engine.connect() 打开连接;避免使用 engine.execute()

# Avoid this.
engine = create_engine(...)
engine.execute(<SQL>)
engine.dispose()

# Do this.
engine = create_engine(...)
connection = engine.connect()
try:
    connection.execute(<SQL>)
finally:
    connection.close()
    engine.dispose()
Copy

备注

请确保通过在 engine.dispose() 之前执行 connection.close() 来关闭连接;否则,Python 垃圾回收器将移除与 Snowflake 通信所需的资源,从而阻止 Python Connector 正确关闭会话。

如果计划使用显式事务,则必须禁用 SQLAlchemy 中的 AUTOCOMMIT 执行选项。

默认情况下,SQLAlchemy 启用此选项。启用此选项后,INSERT、UPDATE 和 DELETE 语句在执行时会自动提交,即使这些语句在显式事务中运行也是如此。

若要禁用 AUTOCOMMIT,请将 autocommit=False 传递给 Connection.execution_options() 方法。例如:

# Disable AUTOCOMMIT if you need to use an explicit transaction.
with engine.connect().execution_options(autocommit=False) as connection:

  try:
    connection.execute("BEGIN")
    connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)")
    connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)")
    connection.execute("COMMIT")
  except Exception as e:
    connection.execute("ROLLBACK")
  finally:
    connection.close()

engine.dispose()
Copy

自动递增行为

自动递增一个值需要 Sequence 对象。将 Sequence 对象包含在主键列中,以便在插入每条新记录时自动递增此值。例如:

t = Table('mytable', metadata,
    Column('id', Integer, Sequence('id_seq'), primary_key=True),
    Column(...), ...
)
Copy

对象名称大小写处理

Snowflake 将所有不区分大小写的对象名称存储在大写文本中。相反,SQLAlchemy 将所有小写对象名称视为不区分大小写。Snowflake SQLAlchemy 在架构级通信期间(即在表和索引反射期间)转换对象名称大小写。如果使用大写对象名称,SQLAlchemy 则假定它们区分大小写,并将名称放在引号内。此行为将导致与从 Snowflake 接收的数据字典数据不匹配,因此除非标识符名称真正使用引号创建为区分大小写(例如 "TestDb"),否则应在 SQLAlchemy 端使用所有小写名称。

索引支持

索引仅在 Snowflake SqlAlchemy 的混合表中受支持。有关限制和用例的更多详细信息,请参阅 创建索引 文档。您可以使用以下方法创建索引:

  • 单列索引

    您可以通过在列上设置 index=True 参数或通过明确定义 Index 对象来创建单列索引。

    hybrid_test_table_1 = HybridTable(
      "table_name",
      metadata,
      Column("column1", Integer, primary_key=True),
      Column("column2", String, index=True),
      Index("index_1","column1", "column2")
    )
    
    metadata.create_all(engine_testaccount)
    
    Copy
  • 多列索引

    对于多列索引,您需要定义索引对象,指定应被索引的列。

    hybrid_test_table_1 = HybridTable(
      "table_name",
      metadata,
      Column("column1", Integer, primary_key=True),
      Column("column2", String),
      Index("index_1","column1", "column2")
    )
    
    metadata.create_all(engine_testaccount)
    
    Copy

Numpy 数据类型支持

Snowflake SQLAlchemy 支持绑定和提取 NumPy 数据类型。始终支持绑定。若要启用提取 NumPy 数据类型,请将 numpy=True 添加到连接参数。

支持以下 NumPy 数据类型:

  • numpy.int64

  • numpy.float64

  • numpy.datetime64

以下示例显示了 numpy.datetime64 数据的往返行程:

import numpy as np
import pandas as pd
engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    numpy=True,
))

specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z')

with engine.connect() as connection:
    connection.exec_sql_query(
        "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)")
    connection.exec_sql_query(
        "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,)
    )
    df = pd.read_sql_query("SELECT * FROM ts_tbl", connection)
    assert df.c1.values[0] == specific_date
Copy

缓存列元数据

SQLAlchemy 提供运行时检查 API,以获取有关各种对象的运行时信息。一个常见的用例是,获取架构中的所有表及其列元数据,以便构建架构目录。

伪代码流程如下:

inspector = inspect(engine)
schema = inspector.default_schema_name
for table_name in inspector.get_table_names(schema):
    column_metadata = inspector.get_columns(table_name, schema)
    primary_keys = inspector.get_primary_keys(table_name, schema)
    foreign_keys = inspector.get_foreign_keys(table_name, schema)
    ...
Copy

在此流程中,一个潜在的问题是,在每个表上运行查询时可能需要相当长的时间。结果是缓存的,但获取列元数据的成本很高。

为了缓解此问题,Snowflake SQLAlchemy 采用一个标志 cache_column_metadata=True,以便在调用 get_table_names 时缓存所有表的所有列元数据,且 get_columns 的其余部分、get_primary_keysget_foreign_keys 也可以利用该缓存。

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    cache_column_metadata=True,
))
Copy

备注

内存使用率将大幅上升,因为与 Inspector 对象关联的所有列元数据均被缓存。仅当需要获取所有列元数据时,才使用该标志。

VARIANT、ARRAY 和 OBJECT 支持

Snowflake SQLAlchemy 支持提取 VARIANTARRAYOBJECT 数据类型。所有类型都将在 Python 中转换为 str,以便您可以使用 json.loads 将它们转换为原生数据类型。

此示例展示了如何创建包含 VARIANTARRAYOBJECT 数据类型列的表:

from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT)
...
t = Table('my_semi_structured_datatype_table', metadata,
    Column('va', VARIANT),
    Column('ob', OBJECT),
    Column('ar', ARRAY))
metadata.create_all(engine)
Copy

为了检索 VARIANTARRAYOBJECT 数据类型列并将它们转换为原生 Python 数据类型,请提取数据并调用 json.loads 方法,如下所示:

import json
connection = engine.connect()
results = connection.execute(select([t]))
row = results.fetchone()
data_variant = json.loads(row[0])
data_object  = json.loads(row[1])
data_array   = json.loads(row[2])
Copy

结构化数据类型支持

本模块为 Snowflake 结构化数据(特别是 Iceberg 表)定义了自定义 SQLAlchemy 类型。MAP、OBJECT 和 ARRAY 类型允许在 SQLAlchemy 模型中存储复杂的数据结构。有关详细信息,请参阅 Snowflake 结构化数据类型 文档。

MAP

MAP 类型表示键值对的集合,其中每个键和值可以有不同的类型,如下所示:

  • 密钥类型: 密钥的类型,如 TEXTNUMBER)。

  • 值类型: 值的类型,如 TEXTNUMBER)。

  • 不为 Null: 是否允许 NULL 值(默认值为 False)。

使用示例:

IcebergTable(
  table_name,
  metadata,
  Column("id", Integer, primary_key=True),
  Column("map_col", MAP(NUMBER(10, 0), TEXT(16777216))),
  external_volume="external_volume",
  base_location="base_location",
)
Copy

OBJECT

OBJECT 类型表示具有命名字段的半结构化对象。每个字段可以有特定的类型,您也可以指定每个字段是否可以为空。

  • 项目类型: 字段名称及其类型的字典。该类型可以可选地包括可空标志(True 表示不可空,或 False 表示可空 [默认])。

使用示例:

IcebergTable(
    table_name,
    metadata,
    Column("id", Integer, primary_key=True),
    Column(
        "object_col",
        OBJECT(key1=(TEXT(16777216), False), key2=(NUMBER(10, 0), False)),
        OBJECT(key1=TEXT(16777216), key2=NUMBER(10, 0)), # Without nullable flag
    ),
    external_volume="external_volume",
    base_location="base_location",
)
Copy

ARRAY

ARRAY 类型表示有序的值列表,其中每个元素具有相同的类型。元素类型是在创建数组时定义的。

  • 值类型: 数组中元素的类型,如:TEXTNUMBER)。

  • 不为 Null: 是否允许 NULL 值(默认值为 False)。

使用示例:

IcebergTable(
    table_name,
    metadata,
    Column("id", Integer, primary_key=True),
    Column("array_col", ARRAY(TEXT(16777216))),
    external_volume="external_volume",
    base_location="base_location",
)
Copy

CLUSTER BY 支持

Snowflake SQLAlchemy 支持表的 CLUSTER BY 参数。有关该参数的信息,请参阅 CREATE TABLE

此示例展示了如何将包含两列(idname)的表创建为群集密钥:

t = Table('myuser', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    snowflake_clusterby=['id', 'name'], ...
)
metadata.create_all(engine)
Copy

Alembic 支持

Alembic 是 SQLAlchemy 之上的数据库迁移工具。Snowflake SQLAlchemy 的运作方式是将以下代码添加到 alembic/env.py,以便 Alembic 可以识别 Snowflake SQLAlchemy。

from alembic.ddl.impl import DefaultImpl

class SnowflakeImpl(DefaultImpl):
    __dialect__ = 'snowflake'
Copy

密钥对身份验证支持

Snowflake SQLAlchemy 通过利用 Snowflake Connector for Python 函数来支持密钥对身份验证。有关创建私钥和公钥的步骤,请参阅 使用密钥对身份验证和密钥对轮换

私钥参数通过 connect_args 来传递,如下所示:

...
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

with open("rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

engine = create_engine(URL(
    account='abc123',
    user='testuser1',
    ),
    connect_args={
        'private_key': pkb,
        },
    )
Copy

其中 PRIVATE_KEY_PASSPHRASE 是用于解密私钥文件 (rsa_key.p8) 的密码。

snowflake.sqlalchemy.URL 方法不支持私钥参数。

合并命令支持

Snowflake SQLAlchemy 支持使用其 MergeInto 自定义表达式执行更新或插入。有关完整文档,请参阅 MERGE

按如下方式使用:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import MergeInto

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
t1 = meta.tables['t1']
t2 = meta.tables['t2']

merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key)
merge.when_matched_then_delete().where(t2.c.marked == 1)
merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus)
merge.when_matched_then_update().values(val=t2.c.newval)
merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus)
connection.execute(merge)
Copy

CopyIntoStorage 支持

Snowflake SQLAlchemy 支持使用其自定义 CopyIntoStorage 表达式将表和查询结果保存到不同的 Snowflake 暂存区、Azure 容器和 AWS 桶中。有关完整文档,请参阅 COPY INTO <location>

按如下方式使用:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
users = meta.tables['users']

copy_into = CopyIntoStorage(from_=users,
                            into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'),
                            formatter=CSVFormatter().null_if(['null', 'Null']))
connection.execute(copy_into)
Copy

Iceberg 表与 Snowflake 目录支持

Snowflake SQLAlchemy 支持带有 Snowflake 目录的 Iceberg 表,以及各种相关参数。有关 Iceberg 表的详细信息,请参阅 Snowflake CREATE ICEBERG 文档。

若要使用 Snowflake SQLAlchemy 创建 Iceberg 表,您可以使用 SQLAlchemy Core 语法定义表,如下所示:

table = IcebergTable(
        "myuser",
        metadata,
        Column("id", Integer, primary_key=True),
        Column("name", String),
        external_volume=external_volume_name,
        base_location="my_iceberg_table",
  as_query="SELECT * FROM table"
    )
Copy

或者,您可以使用声明式方法定义表:

class MyUser(Base):
    __tablename__ = "myuser"

    @classmethod
    def __table_cls__(cls, name, metadata, *arg, **kw):
        return IcebergTable(name, metadata, *arg, **kw)

    __table_args__ = {
        "external_volume": "my_external_volume",
        "base_location": "my_iceberg_table",
  "as_query": "SELECT * FROM table",
    }

    id = Column(Integer, primary_key=True)
    name = Column(String)
Copy

混合表支持

Snowflake SQLAlchemy 支持具有索引的混合表。有关详细信息,请参阅 Snowflake CREATE HYBRID TABLE 文档。

若要创建混合表并添加索引,您可以使用 SQLAlchemy 核心语法,如下所示:

table = HybridTable(
    "myuser",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("name", String),
    Index("idx_name", "name")
Copy

此外,您也可以使用声明式方法定义表:

class MyUser(Base):
    __tablename__ = "myuser"

    @classmethod
    def __table_cls__(cls, name, metadata, *arg, **kw):
        return HybridTable(name, metadata, *arg, **kw)

    __table_args__ = (
        Index("idx_name", "name"),
    )

    id = Column(Integer, primary_key=True)
    name = Column(String)
Copy

动态表支持

Snowflake SQLAlchemy 支持动态表。有关详细信息,请参阅 Snowflake CREATE DYNAMIC TABLE 文档。

若要创建动态表,您可以使用 SQLAlchemy 核心语法,如下所示:

 dynamic_test_table_1 = DynamicTable(
       "dynamic_MyUser",
       metadata,
       Column("id", Integer),
       Column("name", String),
       target_lag=(1, TimeUnit.HOURS), # Additionally you can use SnowflakeKeyword.DOWNSTREAM
       warehouse='test_wh',
refresh_mode=SnowflakeKeyword.FULL
       as_query="SELECT id, name from MyUser;"
   )
Copy

此外,您也可以使用 SqlAlchemy select() 结构定义一个没有列的表:

 dynamic_test_table_1 = DynamicTable(
       "dynamic_MyUser",
       metadata,
       target_lag=(1, TimeUnit.HOURS),
       warehouse='test_wh',
refresh_mode=SnowflakeKeyword.FULL
       as_query=select(MyUser.id, MyUser.name)
   )
Copy

备注

  • 不支持在动态表中定义主键,这意味着声明式表不支持动态表。

  • 当将 as_query 参数与字符串结合使用时,您必须明确定义列。然而,如果您使用 SQLAlchemy select() 结构,则无需明确定义列。

  • 不支持直接将数据插入动态表。

语言: 中文