将 Snowflake SQLAlchemy 工具包与 Python Connector 结合使用

Snowflake SQLAlchemy 在 Snowflake Connector for Python 之上运行,作为桥接 Snowflake 数据库和 SQLAlchemy 应用程序的方言。

本主题内容:

先决条件

Snowflake Connector for Python

Snowflake SQLAlchemy 的唯一要求是 Snowflake Connector for Python;但是,不需要安装该连接器,因为安装 Snowflake SQLAlchemy 会自动安装连接器。

数据分析和 Web 应用程序框架(可选)

Snowflake SQLAlchemy 可以与 pandas、Jupyter 和 Pyramid 一起使用,它们为数据分析和 Web 应用程序提供了更高级别的应用程序框架。但是,从零开始构建工作环境并不是一项简单的任务,特别是对于新手用户来说。安装框架需要 C 编译器和工具,选择正确的工具和版本是一个障碍,可能会阻止用户使用 Python 应用程序。

构建环境的一种更简单的方法是通过 Anaconda,它为所有用户(包括数据分析师和学生等非 Python 专家)提供了完整的预编译技术堆栈。有关 Anaconda 安装说明,请参阅 Anaconda 安装文档。然后可以使用 pip 将 Snowflake SQLAlchemy 包安装在 Anaconda 之上。

安装 Snowflake SQLAlchemy

可以使用 pip 从公共 PyPI 存储库安装 Snowflake SQLAlchemy 包:

pip install --upgrade snowflake-sqlalchemy
Copy

pip 自动安装所有必需的模块,包括用于 Snowflake Connector for Python。

请注意,开发者说明与源代码一起托管在 ` GitHub <https://github.com/snowflakedb/snowflake-sqlalchemy (https://github.com/snowflakedb/snowflake-sqlalchemy)>`_ 上。

验证安装

  1. 创建一个包含以下 Python 示例代码的文件(例如 validate.py),该代码连接到 Snowflake 并显示 Snowflake 的版本:

    #!/usr/bin/env python
    from sqlalchemy import create_engine
    
    engine = create_engine(
        'snowflake://{user}:{password}@{account_identifier}/'.format(
            user='<user_login_name>',
            password='<password>',
            account_identifier='<account_identifier>',
        )
    )
    try:
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        print(results[0])
    finally:
        connection.close()
        engine.dispose()
    
    Copy
  2. <user_login_name><password><account_identifier> 替换为 Snowflake 账户和用户的相应值。有关更多详细信息,请参阅 ` 连接参数 `_ (本主题内容)。

  3. 执行示例代码。例如,如果您创建了一个名为 validate.py 的文件:

    python validate.py
    
    Copy

应显示 Snowflake 版本(例如 1.48.0)。

Snowflake 特定的参数和行为

Snowflake SQLAlchemy 尽可能为 SQLAlchemy 应用程序提供兼容的功能。

但是,Snowflake SQLAlchemy 还提供了 Snowflake 特定的参数和行为,以下各部分对此进行了介绍。

连接参数

必填参数

Snowflake SQLAlchemy 使用以下连接字符串语法连接到 Snowflake 并启动会话:

'snowflake://<user_login_name>:<password>@<account_identifier>'
Copy

其中:

  • <user_login_name> 是您的 Snowflake 用户的登录名。

  • <password> 是 Snowflake 用户的密码。

  • <account_identifier> 是您的账户标识符。请参阅 账户标识符

    备注

    snowflakecomputing.cn 域名作为账户标识符的一部分。Snowflake 会自动将域名追加到您的账户标识符,以创建所需连接。

其他连接参数

您可以选择在连接字符串的末尾(<account_name> 之后)包含以下附加信息:

'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
Copy

其中:

  • <database_name><schema_name> 是 Snowflake 会话的初始数据库和架构,用正斜杠 (/) 分隔。

  • warehouse=<warehouse_name>role=<role_name>' 是会话的初始仓库和角色,指定为参数字符串,用问号 (?) 分隔。

备注

登录后,始终可以为会话更改连接字符串中指定的初始数据库、架构、仓库和角色。

代理服务器配置

不支持代理服务器参数。相反,请使用支持的环境变量来配置代理服务器。有关信息,请参阅 使用代理服务器

连接字符串示例

以下示例使用用户名 testuser1、密码 0123456、账户标识符 myorganization-myaccount、数据库 testdb、架构 public、仓库 testwh 和角色 myrole 调用 create_engine 方法:

from sqlalchemy import create_engine
engine = create_engine(
    'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole'
)
Copy

为方便起见,可以使用 snowflake.sqlalchemy.URL 方法来构造连接字符串,并连接到数据库。下面的示例构造与上一示例相同的连接字符串:

from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = '0123456',
    database = 'testdb',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
))
Copy

打开和关闭连接

通过执行 engine.connect() 打开连接;避免使用 engine.execute()

# Avoid this.
engine = create_engine(...)
engine.execute(<SQL>)
engine.dispose()

# Do this.
engine = create_engine(...)
connection = engine.connect()
try:
    connection.execute(<SQL>)
finally:
    connection.close()
    engine.dispose()
Copy

备注

请确保通过在 engine.dispose() 之前执行 connection.close() 来关闭连接;否则,Python 垃圾回收器将移除与 Snowflake 通信所需的资源,从而阻止 Python Connector 正确关闭会话。

如果计划使用显式事务,则必须禁用 SQLAlchemy 中的 AUTOCOMMIT 执行选项。

默认情况下,SQLAlchemy 启用此选项。启用此选项后,INSERT、UPDATE 和 DELETE 语句在执行时会自动提交,即使这些语句在显式事务中运行也是如此。

若要禁用 AUTOCOMMIT,请将 autocommit=False 传递给 Connection.execution_options() 方法。例如:

# Disable AUTOCOMMIT if you need to use an explicit transaction.
with engine.connect().execution_options(autocommit=False) as connection:

  try:
    connection.execute("BEGIN")
    connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)")
    connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)")
    connection.execute("COMMIT")
  except Exception as e:
    connection.execute("ROLLBACK")
  finally:
    connection.close()

engine.dispose()
Copy

自动递增行为

自动递增一个值需要 Sequence 对象。将 Sequence 对象包含在主键列中,以便在插入每条新记录时自动递增此值。例如:

t = Table('mytable', metadata,
    Column('id', Integer, Sequence('id_seq'), primary_key=True),
    Column(...), ...
)
Copy

对象名称大小写处理

Snowflake 将所有不区分大小写的对象名称存储在大写文本中。相反, SQLAlchemy 将所有小写对象名称视为不区分大小写。Snowflake SQLAlchemy 在架构级通信期间(即在表和索引反射期间)转换对象名称大小写。如果使用大写对象名称, SQLAlchemy 则假定它们区分大小写,并将名称放在引号内。此行为将导致与从 Snowflake 接收的数据字典数据不匹配,因此除非标识符名称真正使用引号创建为区分大小写(例如 "TestDb"),否则应在 SQLAlchemy 端使用所有小写名称。

索引支持

Snowflake 不使用索引,因此 Snowflake SQLAlchemy 也不使用索引。

Numpy 数据类型支持

Snowflake SQLAlchemy 支持绑定和提取 NumPy 数据类型。始终支持绑定。若要启用提取 NumPy 数据类型,请将 numpy=True 添加到连接参数。

支持以下 NumPy 数据类型:

  • numpy.int64

  • numpy.float64

  • numpy.datetime64

以下示例显示了 numpy.datetime64 数据的往返行程:

import numpy as np
import pandas as pd
engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    numpy=True,
))

specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z')

connection = engine.connect()
connection.execute(
    "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)")
connection.execute(
    "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,)
)
df = pd.read_sql_query("SELECT * FROM ts_tbl", engine)
assert df.c1.values[0] == specific_date
Copy

缓存列元数据

SQLAlchemy 提供运行时检查 API,以获取有关各种对象的运行时信息。一个常见的用例是,获取架构中的所有表及其列元数据,以便构建架构目录。

伪代码流程如下:

inspector = inspect(engine)
schema = inspector.default_schema_name
for table_name in inspector.get_table_names(schema):
    column_metadata = inspector.get_columns(table_name, schema)
    primary_keys = inspector.get_primary_keys(table_name, schema)
    foreign_keys = inspector.get_foreign_keys(table_name, schema)
    ...
Copy

在此流程中,一个潜在的问题是,在每个表上运行查询时可能需要相当长的时间。结果是缓存的,但获取列元数据的成本很高。

为了缓解此问题,Snowflake SQLAlchemy 采用一个标志 cache_column_metadata=True,以便在调用 get_table_names 时缓存所有表的所有列元数据,且 get_columns 的其余部分、get_primary_keysget_foreign_keys 也可以利用该缓存。

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    cache_column_metadata=True,
))
Copy

备注

内存使用率将大幅上升,因为与 Inspector 对象关联的所有列元数据均被缓存。仅当需要获取所有列元数据时,才使用该标志。

VARIANT、ARRAY 和 OBJECT 支持

Snowflake SQLAlchemy 支持提取 VARIANTARRAYOBJECT 数据类型。所有类型都将在 Python 中转换为 str,以便您可以使用 json.loads 将它们转换为原生数据类型。

此示例展示了如何创建包含 VARIANTARRAYOBJECT 数据类型列的表:

from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT)
...
t = Table('my_semi_structured_datatype_table', metadata,
    Column('va', VARIANT),
    Column('ob', OBJECT),
    Column('ar', ARRAY))
metdata.create_all(engine)
Copy

为了检索 VARIANTARRAYOBJECT 数据类型列并将它们转换为原生 Python 数据类型,请提取数据并调用 json.loads 方法,如下所示:

import json
connection = engine.connect()
results = connection.execute(select([t]))
row = results.fetchone()
data_variant = json.loads(row[0])
data_object  = json.loads(row[1])
data_array   = json.loads(row[2])
Copy

CLUSTER BY 支持

Snowflake SQLAlchemy 支持表的 CLUSTER BY 参数。有关该参数的信息,请参阅 CREATE TABLE

此示例展示了如何将包含两列(idname)的表创建为群集密钥:

t = Table('myuser', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    snowflake_clusterby=['id', 'name'], ...
)
metadata.create_all(engine)
Copy

Alembic 支持

Alembic 是 SQLAlchemy 之上的数据库迁移工具。Snowflake SQLAlchemy 的运作方式是将以下代码添加到 alembic/env.py,以便 Alembic 可以识别 Snowflake SQLAlchemy。

from alembic.ddl.impl import DefaultImpl

class SnowflakeImpl(DefaultImpl):
    __dialect__ = 'snowflake'
Copy

密钥对身份验证支持

Snowflake SQLAlchemy 通过利用 Snowflake Connector for Python 函数来支持密钥对身份验证。有关创建私钥和公钥的步骤,请参阅 使用密钥对身份验证和密钥对轮换

私钥参数通过 connect_args 来传递,如下所示:

...
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization

with open("rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

engine = create_engine(URL(
    account='abc123',
    user='testuser1',
    ),
    connect_args={
        'private_key': pkb,
        },
    )
Copy

其中 PRIVATE_KEY_PASSPHRASE 是用于解密私钥文件 ( rsa_key.p8) 的密码。

snowflake.sqlalchemy.URL 方法不支持私钥参数。

合并命令支持

Snowflake SQLAlchemy 支持使用其 MergeInto 自定义表达式执行更新或插入。有关完整文档,请参阅 MERGE

按如下方式使用:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import MergeInto

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
t1 = meta.tables['t1']
t2 = meta.tables['t2']

merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key)
merge.when_matched_then_delete().where(t2.c.marked == 1)
merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus)
merge.when_matched_then_update().values(val=t2.c.newval)
merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus)
connection.execute(merge)
Copy

CopyIntoStorage 支持

Snowflake SQLAlchemy 支持使用其自定义 CopyIntoStorage 表达式将表和查询结果保存到不同的 Snowflake 暂存区、Azure 容器和 AWS 桶中。有关完整文档,请参阅 COPY INTO <location>

按如下方式使用:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
users = meta.tables['users']

copy_into = CopyIntoStorage(from_=users,
                            into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'),
                            formatter=CSVFormatter().null_if(['null', 'Null']))
connection.execute(copy_into)
Copy
语言: 中文