将 Snowflake SQLAlchemy 工具包与 Python Connector 结合使用¶
Snowflake SQLAlchemy 在 Snowflake Connector for Python 之上运行,作为桥接 Snowflake 数据库和 SQLAlchemy 应用程序的方言。
本主题内容:
先决条件¶
Snowflake Connector for Python¶
Snowflake SQLAlchemy 的唯一要求是 Snowflake Connector for Python;但是,不需要安装该连接器,因为安装 Snowflake SQLAlchemy 会自动安装连接器。
数据分析和 Web 应用程序框架(可选)¶
Snowflake SQLAlchemy 可以与 pandas、Jupyter 和 Pyramid 一起使用,它们为数据分析和 Web 应用程序提供了更高级别的应用程序框架。但是,从零开始构建工作环境并不是一项简单的任务,特别是对于新手用户来说。安装框架需要 C 编译器和工具,选择正确的工具和版本是一个障碍,可能会阻止用户使用 Python 应用程序。
构建环境的一种更简单的方法是通过 Anaconda,它为所有用户(包括数据分析师和学生等非 Python 专家)提供了完整的预编译技术堆栈。有关 Anaconda 安装说明,请参阅 Anaconda 安装文档。然后可以使用 pip
将 Snowflake SQLAlchemy 包安装在 Anaconda 之上。
安装 Snowflake SQLAlchemy¶
可以使用 pip
从公共 PyPI 存储库安装 Snowflake SQLAlchemy 包:
pip install --upgrade snowflake-sqlalchemy
pip
自动安装所有必需的模块,包括用于 Snowflake Connector for Python。
请注意,开发者说明与源代码一起托管在 ` GitHub <https://github.com/snowflakedb/snowflake-sqlalchemy (https://github.com/snowflakedb/snowflake-sqlalchemy)>`_ 上。
验证安装¶
创建一个包含以下 Python 示例代码的文件(例如
validate.py
),该代码连接到 Snowflake 并显示 Snowflake 的版本:#!/usr/bin/env python from sqlalchemy import create_engine engine = create_engine( 'snowflake://{user}:{password}@{account_identifier}/'.format( user='<user_login_name>', password='<password>', account_identifier='<account_identifier>', ) ) try: connection = engine.connect() results = connection.execute('select current_version()').fetchone() print(results[0]) finally: connection.close() engine.dispose()
将
<user_login_name>
、<password>
和<account_identifier>
替换为 Snowflake 账户和用户的相应值。有关更多详细信息,请参阅 ` 连接参数 `_(本主题内容)。执行示例代码。例如,如果您创建了一个名为
validate.py
的文件:python validate.py
应显示 Snowflake 版本(例如 1.6.0
)。
Snowflake 特定的参数和行为¶
Snowflake SQLAlchemy 尽可能为 SQLAlchemy 应用程序提供兼容的功能。
但是,Snowflake SQLAlchemy 还提供了 Snowflake 特定的参数和行为,以下各部分对此进行了介绍。
连接参数¶
必填参数¶
Snowflake SQLAlchemy 使用以下连接字符串语法连接到 Snowflake 并启动会话:
'snowflake://<user_login_name>:<password>@<account_identifier>'
其中:
<user_login_name>
是您的 Snowflake 用户的登录名。<password>
是 Snowflake 用户的密码。<account_identifier>
是您的账户标识符。请参阅 配置客户端、驱动程序、库或第三方应用程序以连接到 Snowflake。备注
请 勿 将
snowflakecomputing.cn
域名作为账户标识符的一部分。Snowflake 会自动将域名追加到您的账户标识符,以创建所需连接。
其他连接参数¶
您可以选择在连接字符串的末尾(<account_name>
之后)包含以下附加信息:
'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
其中:
<database_name>
和<schema_name>
是 Snowflake 会话的初始数据库和架构,用正斜杠 (/
) 分隔。warehouse=<warehouse_name>
和role=<role_name>'
是会话的初始仓库和角色,指定为参数字符串,用问号 (?
) 分隔。
备注
登录后,始终可以为会话更改连接字符串中指定的初始数据库、架构、仓库和角色。
代理服务器配置¶
不支持代理服务器参数。相反,请使用支持的环境变量来配置代理服务器。有关信息,请参阅 使用代理服务器。
连接字符串示例¶
以下示例使用用户名 testuser1
、密码 0123456
、账户标识符 myorganization-myaccount
、数据库 testdb
、架构 public
、仓库 testwh
和角色 myrole
调用 create_engine
方法:
from sqlalchemy import create_engine engine = create_engine( 'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole' )
为方便起见,可以使用 snowflake.sqlalchemy.URL
方法来构造连接字符串,并连接到数据库。下面的示例构造与上一示例相同的连接字符串:
from snowflake.sqlalchemy import URL from sqlalchemy import create_engine engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = '0123456', database = 'testdb', schema = 'public', warehouse = 'testwh', role='myrole', ))
打开和关闭连接¶
通过执行 engine.connect()
打开连接;避免使用 engine.execute()
。
# Avoid this. engine = create_engine(...) engine.execute(<SQL>) engine.dispose() # Do this. engine = create_engine(...) connection = engine.connect() try: connection.execute(<SQL>) finally: connection.close() engine.dispose()
备注
请确保通过在 engine.dispose()
之前执行 connection.close()
来关闭连接;否则,Python 垃圾回收器将移除与 Snowflake 通信所需的资源,从而阻止 Python Connector 正确关闭会话。
如果计划使用显式事务,则必须禁用 SQLAlchemy 中的 AUTOCOMMIT 执行选项。
默认情况下,SQLAlchemy 启用此选项。启用此选项后,INSERT、UPDATE 和 DELETE 语句在执行时会自动提交,即使这些语句在显式事务中运行也是如此。
若要禁用 AUTOCOMMIT,请将 autocommit=False
传递给 Connection.execution_options()
方法。例如:
# Disable AUTOCOMMIT if you need to use an explicit transaction.
with engine.connect().execution_options(autocommit=False) as connection:
try:
connection.execute("BEGIN")
connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)")
connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)")
connection.execute("COMMIT")
except Exception as e:
connection.execute("ROLLBACK")
finally:
connection.close()
engine.dispose()
自动递增行为¶
自动递增一个值需要 Sequence
对象。将 Sequence
对象包含在主键列中,以便在插入每条新记录时自动递增此值。例如:
t = Table('mytable', metadata, Column('id', Integer, Sequence('id_seq'), primary_key=True), Column(...), ... )
对象名称大小写处理¶
Snowflake 将所有不区分大小写的对象名称存储在大写文本中。相反,SQLAlchemy 将所有小写对象名称视为不区分大小写。Snowflake SQLAlchemy 在架构级通信期间(即在表和索引反射期间)转换对象名称大小写。如果使用大写对象名称,SQLAlchemy 则假定它们区分大小写,并将名称放在引号内。此行为将导致与从 Snowflake 接收的数据字典数据不匹配,因此除非标识符名称真正使用引号创建为区分大小写(例如 "TestDb"
),否则应在 SQLAlchemy 端使用所有小写名称。
索引支持¶
索引仅在 Snowflake SqlAlchemy 的混合表中受支持。有关限制和用例的更多详细信息,请参阅 创建索引 文档。您可以使用以下方法创建索引:
单列索引
您可以通过在列上设置
index=True
参数或通过明确定义Index
对象来创建单列索引。hybrid_test_table_1 = HybridTable( "table_name", metadata, Column("column1", Integer, primary_key=True), Column("column2", String, index=True), Index("index_1","column1", "column2") ) metadata.create_all(engine_testaccount)
多列索引
对于多列索引,您需要定义索引对象,指定应被索引的列。
hybrid_test_table_1 = HybridTable( "table_name", metadata, Column("column1", Integer, primary_key=True), Column("column2", String), Index("index_1","column1", "column2") ) metadata.create_all(engine_testaccount)
Numpy 数据类型支持¶
Snowflake SQLAlchemy 支持绑定和提取 NumPy
数据类型。始终支持绑定。若要启用提取 NumPy
数据类型,请将 numpy=True
添加到连接参数。
支持以下 NumPy
数据类型:
numpy.int64
numpy.float64
numpy.datetime64
以下示例显示了 numpy.datetime64
数据的往返行程:
import numpy as np import pandas as pd engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = 'pass', database = 'db', schema = 'public', warehouse = 'testwh', role='myrole', numpy=True, )) specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z') with engine.connect() as connection: connection.exec_sql_query( "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)") connection.exec_sql_query( "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,) ) df = pd.read_sql_query("SELECT * FROM ts_tbl", connection) assert df.c1.values[0] == specific_date
缓存列元数据¶
SQLAlchemy 提供运行时检查 API,以获取有关各种对象的运行时信息。一个常见的用例是,获取架构中的所有表及其列元数据,以便构建架构目录。
伪代码流程如下:
inspector = inspect(engine) schema = inspector.default_schema_name for table_name in inspector.get_table_names(schema): column_metadata = inspector.get_columns(table_name, schema) primary_keys = inspector.get_primary_keys(table_name, schema) foreign_keys = inspector.get_foreign_keys(table_name, schema) ...
在此流程中,一个潜在的问题是,在每个表上运行查询时可能需要相当长的时间。结果是缓存的,但获取列元数据的成本很高。
为了缓解此问题,Snowflake SQLAlchemy 采用一个标志 cache_column_metadata=True
,以便在调用 get_table_names
时缓存所有表的所有列元数据,且 get_columns
的其余部分、get_primary_keys
和 get_foreign_keys
也可以利用该缓存。
engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = 'pass', database = 'db', schema = 'public', warehouse = 'testwh', role='myrole', cache_column_metadata=True, ))
备注
内存使用率将大幅上升,因为与 Inspector
对象关联的所有列元数据均被缓存。仅当需要获取所有列元数据时,才使用该标志。
VARIANT、ARRAY 和 OBJECT 支持¶
Snowflake SQLAlchemy 支持提取 VARIANT
、ARRAY
和 OBJECT
数据类型。所有类型都将在 Python 中转换为 str
,以便您可以使用 json.loads
将它们转换为原生数据类型。
此示例展示了如何创建包含 VARIANT
、ARRAY
和 OBJECT
数据类型列的表:
from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT) ... t = Table('my_semi_structured_datatype_table', metadata, Column('va', VARIANT), Column('ob', OBJECT), Column('ar', ARRAY)) metadata.create_all(engine)
为了检索 VARIANT
、 ARRAY
和 OBJECT
数据类型列并将它们转换为原生 Python 数据类型,请提取数据并调用 json.loads
方法,如下所示:
import json connection = engine.connect() results = connection.execute(select([t])) row = results.fetchone() data_variant = json.loads(row[0]) data_object = json.loads(row[1]) data_array = json.loads(row[2])
结构化数据类型支持¶
本模块为 Snowflake 结构化数据(特别是 Iceberg 表)定义了自定义 SQLAlchemy 类型。MAP、OBJECT 和 ARRAY 类型允许在 SQLAlchemy 模型中存储复杂的数据结构。有关详细信息,请参阅 Snowflake 结构化数据类型 文档。
MAP¶
MAP
类型表示键值对的集合,其中每个键和值可以有不同的类型,如下所示:
密钥类型: 密钥的类型,如
TEXT
或NUMBER
)。值类型: 值的类型,如
TEXT
或NUMBER
)。不为 Null: 是否允许 NULL 值(默认值为
False
)。
使用示例:
IcebergTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column("map_col", MAP(NUMBER(10, 0), TEXT(16777216))),
external_volume="external_volume",
base_location="base_location",
)
OBJECT¶
OBJECT
类型表示具有命名字段的半结构化对象。每个字段可以有特定的类型,您也可以指定每个字段是否可以为空。
项目类型: 字段名称及其类型的字典。该类型可以可选地包括可空标志(
True
表示不可空,或False
表示可空 [默认])。
使用示例:
IcebergTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column(
"object_col",
OBJECT(key1=(TEXT(16777216), False), key2=(NUMBER(10, 0), False)),
OBJECT(key1=TEXT(16777216), key2=NUMBER(10, 0)), # Without nullable flag
),
external_volume="external_volume",
base_location="base_location",
)
ARRAY¶
ARRAY
类型表示有序的值列表,其中每个元素具有相同的类型。元素类型是在创建数组时定义的。
值类型: 数组中元素的类型,如:
TEXT
或NUMBER
)。不为 Null: 是否允许
NULL
值(默认值为False
)。
使用示例:
IcebergTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column("array_col", ARRAY(TEXT(16777216))),
external_volume="external_volume",
base_location="base_location",
)
CLUSTER BY 支持¶
Snowflake SQLAlchemy 支持表的 CLUSTER BY
参数。有关该参数的信息,请参阅 CREATE TABLE。
此示例展示了如何将包含两列(id
和 name
)的表创建为群集密钥:
t = Table('myuser', metadata, Column('id', Integer, primary_key=True), Column('name', String), snowflake_clusterby=['id', 'name'], ... ) metadata.create_all(engine)
Alembic 支持¶
Alembic 是 SQLAlchemy
之上的数据库迁移工具。Snowflake SQLAlchemy 的运作方式是将以下代码添加到 alembic/env.py
,以便 Alembic 可以识别 Snowflake SQLAlchemy。
from alembic.ddl.impl import DefaultImpl class SnowflakeImpl(DefaultImpl): __dialect__ = 'snowflake'
密钥对身份验证支持¶
Snowflake SQLAlchemy 通过利用 Snowflake Connector for Python 函数来支持密钥对身份验证。有关创建私钥和公钥的步骤,请参阅 使用密钥对身份验证和密钥对轮换。
私钥参数通过 connect_args
来传递,如下所示:
... from snowflake.sqlalchemy import URL from sqlalchemy import create_engine from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization with open("rsa_key.p8", "rb") as key: p_key= serialization.load_pem_private_key( key.read(), password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(), backend=default_backend() ) pkb = p_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()) engine = create_engine(URL( account='abc123', user='testuser1', ), connect_args={ 'private_key': pkb, }, )
其中 PRIVATE_KEY_PASSPHRASE
是用于解密私钥文件 (rsa_key.p8
) 的密码。
snowflake.sqlalchemy.URL
方法不支持私钥参数。
合并命令支持¶
Snowflake SQLAlchemy 支持使用其 MergeInto
自定义表达式执行更新或插入。有关完整文档,请参阅 MERGE。
按如下方式使用:
from sqlalchemy.orm import sessionmaker from sqlalchemy import MetaData, create_engine from snowflake.sqlalchemy import MergeInto engine = create_engine(db.url, echo=False) session = sessionmaker(bind=engine)() connection = engine.connect() meta = MetaData() meta.reflect(bind=session.bind) t1 = meta.tables['t1'] t2 = meta.tables['t2'] merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key) merge.when_matched_then_delete().where(t2.c.marked == 1) merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus) merge.when_matched_then_update().values(val=t2.c.newval) merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus) connection.execute(merge)
CopyIntoStorage 支持¶
Snowflake SQLAlchemy 支持使用其自定义 CopyIntoStorage
表达式将表和查询结果保存到不同的 Snowflake 暂存区、Azure 容器和 AWS 桶中。有关完整文档,请参阅 COPY INTO <location>。
按如下方式使用:
from sqlalchemy.orm import sessionmaker from sqlalchemy import MetaData, create_engine from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter engine = create_engine(db.url, echo=False) session = sessionmaker(bind=engine)() connection = engine.connect() meta = MetaData() meta.reflect(bind=session.bind) users = meta.tables['users'] copy_into = CopyIntoStorage(from_=users, into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'), formatter=CSVFormatter().null_if(['null', 'Null'])) connection.execute(copy_into)
Iceberg 表与 Snowflake 目录支持¶
Snowflake SQLAlchemy 支持带有 Snowflake 目录的 Iceberg 表,以及各种相关参数。有关 Iceberg 表的详细信息,请参阅 Snowflake CREATE ICEBERG 文档。
若要使用 Snowflake SQLAlchemy 创建 Iceberg 表,您可以使用 SQLAlchemy Core 语法定义表,如下所示:
table = IcebergTable(
"myuser",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
external_volume=external_volume_name,
base_location="my_iceberg_table",
as_query="SELECT * FROM table"
)
或者,您可以使用声明式方法定义表:
class MyUser(Base):
__tablename__ = "myuser"
@classmethod
def __table_cls__(cls, name, metadata, *arg, **kw):
return IcebergTable(name, metadata, *arg, **kw)
__table_args__ = {
"external_volume": "my_external_volume",
"base_location": "my_iceberg_table",
"as_query": "SELECT * FROM table",
}
id = Column(Integer, primary_key=True)
name = Column(String)
混合表支持¶
Snowflake SQLAlchemy 支持具有索引的混合表。有关详细信息,请参阅 Snowflake CREATE HYBRID TABLE 文档。
若要创建混合表并添加索引,您可以使用 SQLAlchemy 核心语法,如下所示:
table = HybridTable(
"myuser",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
Index("idx_name", "name")
此外,您也可以使用声明式方法定义表:
class MyUser(Base):
__tablename__ = "myuser"
@classmethod
def __table_cls__(cls, name, metadata, *arg, **kw):
return HybridTable(name, metadata, *arg, **kw)
__table_args__ = (
Index("idx_name", "name"),
)
id = Column(Integer, primary_key=True)
name = Column(String)
动态表支持¶
Snowflake SQLAlchemy 支持动态表。有关详细信息,请参阅 Snowflake CREATE DYNAMIC TABLE 文档。
若要创建动态表,您可以使用 SQLAlchemy 核心语法,如下所示:
dynamic_test_table_1 = DynamicTable(
"dynamic_MyUser",
metadata,
Column("id", Integer),
Column("name", String),
target_lag=(1, TimeUnit.HOURS), # Additionally you can use SnowflakeKeyword.DOWNSTREAM
warehouse='test_wh',
refresh_mode=SnowflakeKeyword.FULL
as_query="SELECT id, name from MyUser;"
)
此外,您也可以使用 SqlAlchemy select() 结构定义一个没有列的表:
dynamic_test_table_1 = DynamicTable(
"dynamic_MyUser",
metadata,
target_lag=(1, TimeUnit.HOURS),
warehouse='test_wh',
refresh_mode=SnowflakeKeyword.FULL
as_query=select(MyUser.id, MyUser.name)
)
备注
不支持在动态表中定义主键,这意味着声明式表不支持动态表。
当将
as_query
参数与字符串结合使用时,您必须明确定义列。然而,如果您使用 SQLAlchemyselect()
结构,则无需明确定义列。不支持直接将数据插入动态表。