将 Snowflake SQLAlchemy 工具包与 Python Connector 结合使用¶
Snowflake SQLAlchemy 在 Snowflake Connector for Python 之上运行,作为桥接 Snowflake 数据库和 SQLAlchemy 应用程序的方言。
本主题内容:
先决条件¶
Snowflake Connector for Python¶
Snowflake SQLAlchemy 的唯一要求是 Snowflake Connector for Python;但是,不需要安装该连接器,因为安装 Snowflake SQLAlchemy 会自动安装连接器。
数据分析和 Web 应用程序框架(可选)¶
Snowflake SQLAlchemy 可以与 pandas、Jupyter 和 Pyramid 一起使用,它们为数据分析和 Web 应用程序提供了更高级别的应用程序框架。但是,从零开始构建工作环境并不是一项简单的任务,特别是对于新手用户来说。安装框架需要 C 编译器和工具,选择正确的工具和版本是一个障碍,可能会阻止用户使用 Python 应用程序。
构建环境的一种更简单的方法是通过 Anaconda,它为所有用户(包括数据分析师和学生等非 Python 专家)提供了完整的预编译技术堆栈。有关 Anaconda 安装说明,请参阅 Anaconda 安装文档。然后可以使用 pip
将 Snowflake SQLAlchemy 包安装在 Anaconda 之上。
安装 Snowflake SQLAlchemy¶
可以使用 pip
从公共 PyPI 存储库安装 Snowflake SQLAlchemy 包:
pip install --upgrade snowflake-sqlalchemy
pip
自动安装所有必需的模块,包括用于 Snowflake Connector for Python。
请注意,开发者说明与源代码一起托管在 ` GitHub <https://github.com/snowflakedb/snowflake-sqlalchemy (https://github.com/snowflakedb/snowflake-sqlalchemy)>`_ 上。
验证安装¶
创建一个包含以下 Python 示例代码的文件(例如
validate.py
),该代码连接到 Snowflake 并显示 Snowflake 的版本:#!/usr/bin/env python from sqlalchemy import create_engine engine = create_engine( 'snowflake://{user}:{password}@{account_identifier}/'.format( user='<user_login_name>', password='<password>', account_identifier='<account_identifier>', ) ) try: connection = engine.connect() results = connection.execute('select current_version()').fetchone() print(results[0]) finally: connection.close() engine.dispose()
将
<user_login_name>
、<password>
和<account_identifier>
替换为 Snowflake 账户和用户的相应值。有关更多详细信息,请参阅 ` 连接参数 `_ (本主题内容)。执行示例代码。例如,如果您创建了一个名为
validate.py
的文件:python validate.py
应显示 Snowflake 版本(例如 1.48.0
)。
Snowflake 特定的参数和行为¶
Snowflake SQLAlchemy 尽可能为 SQLAlchemy 应用程序提供兼容的功能。
但是,Snowflake SQLAlchemy 还提供了 Snowflake 特定的参数和行为,以下各部分对此进行了介绍。
连接参数¶
必填参数¶
Snowflake SQLAlchemy 使用以下连接字符串语法连接到 Snowflake 并启动会话:
'snowflake://<user_login_name>:<password>@<account_identifier>'
其中:
<user_login_name>
是您的 Snowflake 用户的登录名。<password>
是 Snowflake 用户的密码。<account_identifier>
是您的账户标识符。请参阅 账户标识符。备注
请 勿 将
snowflakecomputing.cn
域名作为账户标识符的一部分。Snowflake 会自动将域名追加到您的账户标识符,以创建所需连接。
其他连接参数¶
您可以选择在连接字符串的末尾(<account_name>
之后)包含以下附加信息:
'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
其中:
<database_name>
和<schema_name>
是 Snowflake 会话的初始数据库和架构,用正斜杠 (/
) 分隔。warehouse=<warehouse_name>
和role=<role_name>'
是会话的初始仓库和角色,指定为参数字符串,用问号 (?
) 分隔。
备注
登录后,始终可以为会话更改连接字符串中指定的初始数据库、架构、仓库和角色。
代理服务器配置¶
不支持代理服务器参数。相反,请使用支持的环境变量来配置代理服务器。有关信息,请参阅 使用代理服务器。
连接字符串示例¶
以下示例使用用户名 testuser1
、密码 0123456
、账户标识符 myorganization-myaccount
、数据库 testdb
、架构 public
、仓库 testwh
和角色 myrole
调用 create_engine
方法:
from sqlalchemy import create_engine engine = create_engine( 'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole' )
为方便起见,可以使用 snowflake.sqlalchemy.URL
方法来构造连接字符串,并连接到数据库。下面的示例构造与上一示例相同的连接字符串:
from snowflake.sqlalchemy import URL from sqlalchemy import create_engine engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = '0123456', database = 'testdb', schema = 'public', warehouse = 'testwh', role='myrole', ))
打开和关闭连接¶
通过执行 engine.connect()
打开连接;避免使用 engine.execute()
。
# Avoid this. engine = create_engine(...) engine.execute(<SQL>) engine.dispose() # Do this. engine = create_engine(...) connection = engine.connect() try: connection.execute(<SQL>) finally: connection.close() engine.dispose()
备注
请确保通过在 engine.dispose()
之前执行 connection.close()
来关闭连接;否则,Python 垃圾回收器将移除与 Snowflake 通信所需的资源,从而阻止 Python Connector 正确关闭会话。
如果计划使用显式事务,则必须禁用 SQLAlchemy 中的 AUTOCOMMIT 执行选项。
默认情况下,SQLAlchemy 启用此选项。启用此选项后,INSERT、UPDATE 和 DELETE 语句在执行时会自动提交,即使这些语句在显式事务中运行也是如此。
若要禁用 AUTOCOMMIT,请将 autocommit=False
传递给 Connection.execution_options()
方法。例如:
# Disable AUTOCOMMIT if you need to use an explicit transaction.
with engine.connect().execution_options(autocommit=False) as connection:
try:
connection.execute("BEGIN")
connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)")
connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)")
connection.execute("COMMIT")
except Exception as e:
connection.execute("ROLLBACK")
finally:
connection.close()
engine.dispose()
自动递增行为¶
自动递增一个值需要 Sequence
对象。将 Sequence
对象包含在主键列中,以便在插入每条新记录时自动递增此值。例如:
t = Table('mytable', metadata, Column('id', Integer, Sequence('id_seq'), primary_key=True), Column(...), ... )
对象名称大小写处理¶
Snowflake 将所有不区分大小写的对象名称存储在大写文本中。相反, SQLAlchemy 将所有小写对象名称视为不区分大小写。Snowflake SQLAlchemy 在架构级通信期间(即在表和索引反射期间)转换对象名称大小写。如果使用大写对象名称, SQLAlchemy 则假定它们区分大小写,并将名称放在引号内。此行为将导致与从 Snowflake 接收的数据字典数据不匹配,因此除非标识符名称真正使用引号创建为区分大小写(例如 "TestDb"
),否则应在 SQLAlchemy 端使用所有小写名称。
索引支持¶
Snowflake 不使用索引,因此 Snowflake SQLAlchemy 也不使用索引。
Numpy 数据类型支持¶
Snowflake SQLAlchemy 支持绑定和提取 NumPy
数据类型。始终支持绑定。若要启用提取 NumPy
数据类型,请将 numpy=True
添加到连接参数。
支持以下 NumPy
数据类型:
numpy.int64
numpy.float64
numpy.datetime64
以下示例显示了 numpy.datetime64
数据的往返行程:
import numpy as np import pandas as pd engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = 'pass', database = 'db', schema = 'public', warehouse = 'testwh', role='myrole', numpy=True, )) specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z') connection = engine.connect() connection.execute( "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)") connection.execute( "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,) ) df = pd.read_sql_query("SELECT * FROM ts_tbl", engine) assert df.c1.values[0] == specific_date
缓存列元数据¶
SQLAlchemy 提供运行时检查 API,以获取有关各种对象的运行时信息。一个常见的用例是,获取架构中的所有表及其列元数据,以便构建架构目录。
伪代码流程如下:
inspector = inspect(engine) schema = inspector.default_schema_name for table_name in inspector.get_table_names(schema): column_metadata = inspector.get_columns(table_name, schema) primary_keys = inspector.get_primary_keys(table_name, schema) foreign_keys = inspector.get_foreign_keys(table_name, schema) ...
在此流程中,一个潜在的问题是,在每个表上运行查询时可能需要相当长的时间。结果是缓存的,但获取列元数据的成本很高。
为了缓解此问题,Snowflake SQLAlchemy 采用一个标志 cache_column_metadata=True
,以便在调用 get_table_names
时缓存所有表的所有列元数据,且 get_columns
的其余部分、get_primary_keys
和 get_foreign_keys
也可以利用该缓存。
engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = 'pass', database = 'db', schema = 'public', warehouse = 'testwh', role='myrole', cache_column_metadata=True, ))
备注
内存使用率将大幅上升,因为与 Inspector
对象关联的所有列元数据均被缓存。仅当需要获取所有列元数据时,才使用该标志。
VARIANT、ARRAY 和 OBJECT 支持¶
Snowflake SQLAlchemy 支持提取 VARIANT
、ARRAY
和 OBJECT
数据类型。所有类型都将在 Python 中转换为 str
,以便您可以使用 json.loads
将它们转换为原生数据类型。
此示例展示了如何创建包含 VARIANT
、ARRAY
和 OBJECT
数据类型列的表:
from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT) ... t = Table('my_semi_structured_datatype_table', metadata, Column('va', VARIANT), Column('ob', OBJECT), Column('ar', ARRAY)) metdata.create_all(engine)
为了检索 VARIANT
、ARRAY
和 OBJECT
数据类型列并将它们转换为原生 Python 数据类型,请提取数据并调用 json.loads
方法,如下所示:
import json connection = engine.connect() results = connection.execute(select([t])) row = results.fetchone() data_variant = json.loads(row[0]) data_object = json.loads(row[1]) data_array = json.loads(row[2])
CLUSTER BY 支持¶
Snowflake SQLAlchemy 支持表的 CLUSTER BY
参数。有关该参数的信息,请参阅 CREATE TABLE。
此示例展示了如何将包含两列(id
和 name
)的表创建为群集密钥:
t = Table('myuser', metadata, Column('id', Integer, primary_key=True), Column('name', String), snowflake_clusterby=['id', 'name'], ... ) metadata.create_all(engine)
Alembic 支持¶
Alembic 是 SQLAlchemy
之上的数据库迁移工具。Snowflake SQLAlchemy 的运作方式是将以下代码添加到 alembic/env.py
,以便 Alembic 可以识别 Snowflake SQLAlchemy。
from alembic.ddl.impl import DefaultImpl class SnowflakeImpl(DefaultImpl): __dialect__ = 'snowflake'
密钥对身份验证支持¶
Snowflake SQLAlchemy 通过利用 Snowflake Connector for Python 函数来支持密钥对身份验证。有关创建私钥和公钥的步骤,请参阅 使用密钥对身份验证和密钥对轮换。
私钥参数通过 connect_args
来传递,如下所示:
... from snowflake.sqlalchemy import URL from sqlalchemy import create_engine from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import dsa from cryptography.hazmat.primitives import serialization with open("rsa_key.p8", "rb") as key: p_key= serialization.load_pem_private_key( key.read(), password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(), backend=default_backend() ) pkb = p_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()) engine = create_engine(URL( account='abc123', user='testuser1', ), connect_args={ 'private_key': pkb, }, )
其中 PRIVATE_KEY_PASSPHRASE
是用于解密私钥文件 ( rsa_key.p8
) 的密码。
snowflake.sqlalchemy.URL
方法不支持私钥参数。
合并命令支持¶
Snowflake SQLAlchemy 支持使用其 MergeInto
自定义表达式执行更新或插入。有关完整文档,请参阅 MERGE。
按如下方式使用:
from sqlalchemy.orm import sessionmaker from sqlalchemy import MetaData, create_engine from snowflake.sqlalchemy import MergeInto engine = create_engine(db.url, echo=False) session = sessionmaker(bind=engine)() connection = engine.connect() meta = MetaData() meta.reflect(bind=session.bind) t1 = meta.tables['t1'] t2 = meta.tables['t2'] merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key) merge.when_matched_then_delete().where(t2.c.marked == 1) merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus) merge.when_matched_then_update().values(val=t2.c.newval) merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus) connection.execute(merge)
CopyIntoStorage 支持¶
Snowflake SQLAlchemy 支持使用其自定义 CopyIntoStorage
表达式将表和查询结果保存到不同的 Snowflake 暂存区、Azure 容器和 AWS 桶中。有关完整文档,请参阅 COPY INTO <location>。
按如下方式使用:
from sqlalchemy.orm import sessionmaker from sqlalchemy import MetaData, create_engine from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter engine = create_engine(db.url, echo=False) session = sessionmaker(bind=engine)() connection = engine.connect() meta = MetaData() meta.reflect(bind=session.bind) users = meta.tables['users'] copy_into = CopyIntoStorage(from_=users, into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'), formatter=CSVFormatter().null_if(['null', 'Null'])) connection.execute(copy_into)