使用 Python Connector 连接到 Snowflake

本主题介绍了使用 Python Connector 连接到 Snowflake 的多种方式。

使用 SnowCD 验证与 Snowflake 的网络连接

配置驱动程序后,可以使用 SnowCD 评估与 Snowflake 的网络连接并进行故障排除。

可以在初始配置过程中使用 SnowCD,也可以根据需要使用,以评估与 Snowflake 的网络连接并进行故障排除。

导入 snowflake.connector 模块

要导入 snowflake.connector 模块,请执行以下命令:

import snowflake.connector
Copy

您可以从环境变量、命令行、配置文件或其他相应来源获取登录信息。例如:

PASSWORD = os.getenv('SNOWSQL_PWD')
WAREHOUSE = os.getenv('WAREHOUSE')
...
Copy

对于 ACCOUNT 参数,请使用 账户标识符。请注意,账户标识符 包含 snowflakecomputing.cn 后缀。

有关详细信息和示例,请参阅 account 参数的用法说明(适用于 connect 方法)

备注

有关可用连接器参数的描述,请参阅 snowflake.connector 方法

如果您从自己的 Amazon S3 桶中复制数据,则需要 AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY。

import os

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
Copy

备注

如果数据存储在 Microsoft Azure 容器中,请直接在 COPY 语句中提供凭据。

读取连接信息后,可使用默认身份验证器或联合身份验证(如果已启用)进行连接。

设置会话参数

使用 Python Connector 时,您可以通过多种方式设置会话参数(例如 QUERY_TAG):

  • 在连接到 Snowflake 时,可以通过传递名为 session_parameters 的可选连接参数来设置会话级参数,如下所示:

    con = snowflake.connector.connect(
        user='XXXX',
        password='XXXX',
        account='XXXX',
        session_parameters={
            'QUERY_TAG': 'EndOfMonthFinancials',
        }
    )
    
    Copy

    传递给 snowflake.connector.connect 方法的 session_parameters 字典可包含一个或多个会话级参数。

    备注

    不能在 Python 连接中设置 SEARCH_PATH 参数。在设置搜索路径之前必须建立会话。

  • 您还可以在连接后执行 ALTER SESSION SET SQL 语句来设置会话参数:

    con.cursor().execute("ALTER SESSION SET QUERY_TAG = 'EndOfMonthFinancials'")
    
    Copy

有关会话参数的更多信息,请参阅通用 参数 页面上对各个参数的描述。

使用默认身份验证器进行连接

可使用登录参数连接到 Snowflake:

conn = snowflake.connector.connect(
    user=USER,
    password=PASSWORD,
    account=ACCOUNT,
    warehouse=WAREHOUSE,
    database=DATABASE,
    schema=SCHEMA
    )
Copy

您可能需要使用 snowflake.connector.connect 方法中提供的其他信息对此进行扩展。

使用 connections.toml 文件进行连接

Python Connector 使您可以将连接定义添加到 connections.toml 配置文件中。连接定义是指与连接相关的参数的集合。 Snowflake Python 库目前支持 TOML 1.0.0 版本。

Python Connector 按顺序在以下位置查找 connections.toml 文件:

  • 如果计算机中存在 ~/.snowflake 目录,则 Snowflake CLI 将使用 ~/.snowflake/connections.toml 文件。您可以通过在 SNOWFLAKE_HOME 环境变量中设置位置来替换默认的 ~/.snowflake 目录。

  • 否则,Snowflake CLI 将根据操作系统使用以下某个位置中的 connections.toml 文件:

    • Linux:~/.config/snowflake/connections.toml,但您可以使用 XDG 变量对其进行更新

    • Windows:%USERPROFILE%\AppData\Local\snowflake\connections.toml

    • Mac:~/Library/Application Support/snowflake/connections.toml

要在连接配置文件中添加凭据,请执行以下操作:

  1. 在文本编辑器中,打开 connections.toml 文件进行编辑。例如,要在 Linux vi 编辑器中打开该文件,请执行以下命令:

    $ vi connections.toml
    
    Copy
  2. 添加新的 Snowflake 连接定义。

    例如,要添加一个名为 myconnection 的 Snowflake 连接,并在其中包含账户 myaccount、用户 johndoe 和密码凭据以及数据库信息,请在配置文件中添加以下行:

    [myconnection]
    account = "myaccount"
    user = "jdoe"
    password = "******"
    warehouse = "my-wh"
    database = "my_db"
    schema = "my_schema"
    
    Copy

    连接定义支持的配置选项与 snowflake.connector.connect 方法中提供的配置选项相同。

  3. 可选:添加更多连接,如下所示:

    [myconnection_test]
    account = "myaccount"
    user = "jdoe-test"
    password = "******"
    warehouse = "my-test_wh"
    database = "my_test_db"
    schema = "my_schema"
    
    Copy
  4. 保存对文件所做的更改。

  5. 在 Python 代码中,将连接名称提供给 snowflake.connector.connect,如下所示:

    with snowflake.connector.connect(
          connection_name="myconnection",
    ) as conn:
    
    Copy

    您也可以替换 connections.toml 文件中为连接定义的值,如下所示:

    with snowflake.connector.connect(
          connection_name="myconnection",
          warehouse="test_xl_wh",
          database="testdb_2"
    ) as conn:
    
    Copy

设置默认连接

您可以将连接设置为默认连接,这样就不必在每次调用 snowflake.connector.connect() 连接到 Snowflake 时都指定一个连接。您可以通过以下任一方法定义默认连接,这些方法按优先级递增顺序列出:

  • 创建名为 default 的连接定义。

    1. connections.toml 文件中,创建连接定义并为其指定名称 default,如下所示:

      [default]
      account = "myaccount"
      user = "jdoe-test"
      password = "******"
      warehouse = "my-test_wh"
      database = "my_test_db"
      schema = "my_schema"
      
      Copy
    2. 保存文件。

  • 在与 connections.toml 文件位于同一目录下的 Snowflake config.toml 文件中指定一个命名连接作为默认连接。

    1. 打开 config.toml 文件进行编辑;然后:

    2. 设置类似于以下内容的 default_connection_name 参数:

      default_connection_name = "myaccount"
      
      Copy
    3. 保存文件。

  • 设置 SNOWFLAKE_DEFAULT_CONNECTION_NAME 环境变量。

    有时您可能想暂时替换默认连接,例如尝试一个测试连接,而无需更改正常的默认连接。您可以按以下方式设置 SNOWFLAKE_DEFAULT_CONNECTION_NAME 环境变量,替换 connections.tomlconfig.toml 文件中指定的默认连接:

    SNOWFLAKE_DEFAULT_CONNECTION_NAME = myconnection_test
    
    Copy

要使用默认连接,请执行类似于以下内容的 Python 代码:

with snowflake.connector.connect() as conn:
    with conn.cursor() as cur:
        print(cur.execute("SELECT 1;").fetchall())
Copy

备注

如果您选择使用默认连接,则无法替换连接参数(例如 usernamedatabaseschema)。

使用单点登录 (SSO) 进行身份验证

如果 已将 Snowflake 配置为使用单点登录 (SSO),则可将客户端应用程序配置为使用 SSO 进行身份验证。有关详细信息,请参阅 为连接到 Snowflake 的客户端应用程序使用 SSO

使用多重身份验证 (MFA)

Snowflake 支持缓存 MFA 令牌,包括将 MFA 令牌缓存与 SSO 相结合。

有关更多信息,请参阅 使用 MFA 令牌缓存,最大限度地减少身份验证过程中的提示次数 – 可选

使用密钥对身份验证和密钥对轮换

Python Connector 支持密钥对身份验证和密钥轮换。

有关如何配置密钥对身份验证和密钥轮换的更多信息,请参阅 密钥对身份验证和密钥对轮换

  1. 完成密钥对身份验证配置后,将 connect 函数中的 private_key 参数设置为私钥文件的路径。

  2. 修改并执行下面的示例代码。该代码可解密私钥文件并将其传递给 Snowflake 驱动程序以创建连接:

示例代码

import snowflake.connector
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization
with open("<path>/rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

ctx = snowflake.connector.connect(
    user='<user>',
    account='<account_identifier>',
    private_key=pkb,
    warehouse=WAREHOUSE,
    database=DATABASE,
    schema=SCHEMA
    )

cs = ctx.cursor()
Copy

使用代理服务器

要使用代理服务器,请配置以下环境变量:

  • HTTP_PROXY

  • HTTPS_PROXY

  • NO_PROXY

例如:

Linux 或 macOS:
export HTTP_PROXY='http://username:password@proxyserver.company.com:80'
export HTTPS_PROXY='http://username:password@proxyserver.company.com:80'
Copy
Windows:
set HTTP_PROXY=http://username:password@proxyserver.company.com:80
set HTTPS_PROXY=http://username:password@proxyserver.company.com:80
Copy

小技巧

Snowflake 的安全模型不允许安全套接层 (SSL) 代理(使用 HTTPS 证书)。代理服务器必须使用公开可用的证书颁发机构 (CA),从而降低潜在的安全风险,例如通过受感染的代理发起的 MITM(中间人)攻击。

如果 必须 使用 SSL 代理,我们强烈建议更新服务器策略以通过 Snowflake 证书,这样证书就不会在通信过程中被更改。

(可选)可以使用 NO_PROXY 绕过代理进行特定通信。例如,通过指定 NO_PROXY=".amazonaws.com",可以绕过代理服务器访问 Amazon S3。

NO_PROXY 不支持通配符。指定的每个值都应为以下内容之一:

  • 主机名的末尾(或完整主机名),例如:

    • .amazonaws.com

    • myorganization-myaccount.snowflakecomputing.cn

  • IP 地址,例如:

    • 192.196.1.15

如果指定了多个值,则应使用逗号分隔这些值,例如:

localhost,.my_company.com,.snowflakecomputing.cn,192.168.1.15,192.168.1.16
Copy

使用 OAuth 连接

要使用 OAuth 进行连接,连接字符串必须包括 authenticator 参数(设置为 oauth )和 token 参数(设置为 oauth_access_token)。有关更多信息,请参阅 客户端、驱动程序和连接器

ctx = snowflake.connector.connect(
    user="<username>",
    host="<hostname>",
    account="<account_identifier>",
    authenticator="oauth",
    token="<oauth_access_token>",
    warehouse="test_warehouse",
    database="test_db",
    schema="test_schema"
)
Copy

管理连接超时

调用 snowflake.connector.connect 会提交登录请求。如果登录请求失败,连接器可以重新发送连接请求。以下参数设置了时间限制,超过此时间限制后,连接器将停止重试请求:

  • login_timeout:指定了持续重新发送连接请求的时长(以秒为单位)。如果在这段时间内连接不成功,则连接器在完成当前尝试后因超时错误而失败,而不会继续重试登录请求。超时后,系统将阻止后续的重试。但是,当前正在进行的尝试将自然终止。

  • network_timeout:指定了等待网络问题得到解决以处理其他请求(例如来自 cursor.execute 的查询请求)所需的时间。当 network_timeout 秒过去后,如果当前尝试失败,则会发生超时,相关请求不会重试。超过 network_timeout 秒后,系统仍允许当前尝试完成(自行失败),之后会发生超时。

  • socket_timeout:指定了套接字级的连接和请求超时。

以下示例替换了 SNOWFLAKE_JWT 身份验证器的 socket_timeout

# this request itself stops retrying after 60 seconds as it is a login request
conn = snowflake.connector.connect(
login_timeout=60,
network_timeout=30,
socket_timeout=10
)

# this request stops retrying after 30 seconds
conn.cursor.execute("SELECT * FROM table")
Copy

以下示例演示了将 socket_timeout 设置为较大值的效果:

# even though login_timeout is 1, connect will take up to n*300 seconds before failing
# (n depends on possible socket addresses)
# this issue arises because socket operations cannot be cancelled once started
conn = snowflake.connector.connect(
login_timeout=1,
socket_timeout=300
)
Copy

以下示例展示了如何替换 SNOWFLAKE_JWT 身份验证器的套接字超时:

# socket timeout for login request overriden by env variable JWT_CNXN_WAIT_TIME
conn = snowflake.connector.connect(
authenticator="SNOWFLAKE_JWT",
socket_timeout=300
)

# socket timeout for this request is still 300 seconds
conn.cursor.execute("SELECT * FROM table")
Copy

请注意, MAX_CON_RETRY_ATTEMPTS 环境变量限制了登录请求的最大重试次数。如果请求未超时,但已达到最大重试次数,则该请求将立即失败。默认值为 1,表示连接器仅尝试一次重试。

管理重试的连接退避策略

在某些情况下,您可能需要更改连接器对因超时而失败的请求进行重试时使用的速率或频率。例如,如果您注意到大量尝试同时发生,则可以通过定义重试退避策略来分散这些请求。退避策略指定了两次重试之间的等待时间。

Snowflake Connector for Python 使用 backoff_policy 连接参数来实现退避策略,该连接参数指定了 Python 生成器函数。生成器函数可让您指定在发送下一次重试请求前等待(退避)的时间。

Snowflake 提供以下帮助器,供您使用所需参数创建预定义的生成器函数。如果您不想自行创建生成器函数,可以使用这些帮助器:

  • linear_backoff,每次迭代时将退避持续时间增加一个常数。

  • exponential_backoff,每次迭代时将退避持续时间乘以一个常数。

  • mixed_backoff,每次迭代时,在使用 exponential_backoff 递增退避持续时间和保持退避持续时间不变之间随机选择。

这些预定义的生成器函数使用以下参数来指定其行为:

  • base:初始退避时间,以秒为单位(默认值 = 1)。

  • factor:退避时间的递增系数。效果取决于实现方式(默认值 = 2); linear_backup 会增加该值,而 exponential_backup 会乘以该值。

  • cap:最大退避时间,以秒为单位(默认值 = 16)。

  • enable_jitter:是否对计算持续时间启用抖动(默认值 = True)。

例如,您可以在 exponential_backoff 策略中使用默认值或自定义值,如下所示:

from snowflake.connector.backoff_policies import exponential_backoff

# correct, no required arguments
snowflake.connector.connect(
backoff_policy=exponential_backoff()
)

# correct, parameters are customizeable
snowflake.connector.connect(
backoff_policy=exponential_backoff(
    factor=5,
    base=10,
    cap=60,
    enable_jitter=False
  )
)
Copy

您也可以创建自己的退避策略生成器函数,类似于以下示例,该示例定义了 my_backoff_policy 生成器函数:

def my_backoff_policy() -> int:
  while True:
    # yield the desired backoff duration
Copy

然后,可以将 backoff_policy 连接参数设置为生成器函数的名称,如下所示:

snowflake.connector.connect(
  backoff_policy=constant_backoff
)
Copy

OCSP

当驱动程序发起连接时,Snowflake 会发送一个证书,确认其要连接的是 Snowflake 而不是冒充 Snowflake 的主机。驱动程序将该证书发送到 OCSP(在线证书状态协议)服务器,验证该证书是否被撤消。

如果驱动程序无法访问 OCSP 服务器来验证证书,则驱动程序可以 “故障打开”或“故障关闭”

选择故障打开或故障关闭模式

Snowflake Connector for Python 1.8.0 之前的版本默认为故障关闭模式。1.8.0 及以后的版本默认为故障打开模式。您可以在调用 connect () 方法时设置可选的连接参数 ocsp_fail_open 来替换默认行为。例如:

con = snowflake.connector.connect(
    account=<account_identifier>,
    user=<user>,
    ...,
    ocsp_fail_open=False,
    ...);
Copy

验证 OCSP 连接器或驱动程序版本

驱动程序或连接器版本及其配置都决定了 OCSP 行为。有关驱动程序或连接器版本及其配置和 OCSP 行为的更多信息,请参阅 OCSP 配置

缓存 OCSP 响应

为了确保所有通信的安全,Snowflake Connector for Python 使用 HTTPS 协议连接到 Snowflake 以及所有其他服务(例如用于暂存数据文件的 Amazon S3 和用于进行联合身份验证的 Okta)。除了常规 HTTPS 协议外,连接器还通过 OCSP(在线证书状态协议)检查每个连接的 TLS/SSL 证书撤消状态,如果发现证书被撤消或 OCSP 状态不可靠,则中止连接。

由于每个 Snowflake 连接最多会触发三次与 OCSP 服务器的往来,因此,系统为 OCSP 响应引入了多级缓存,以减少连接中增加的网络开销:

  • 内存缓存,在进程的生命周期中保留该状态。

  • 文件缓存,持续到缓存目录(例如 ~/.cache/snowflake)被清除为止。

  • OCSP 响应服务器缓存。

缓存还解决了 OCSP 服务器的可用性问题(即,在实际 OCSP 服务器出现故障的情况下)。只要缓存有效,连接器仍然可以验证证书的撤消状态。

如果没有一个缓存层包含 OCSP 响应,则客户端会尝试直接从 CA 的 OCSP 服务器提取验证状态。

修改 OCSP 响应文件缓存位置

默认情况下,文件缓存已在以下位置启用,因此无需执行其他配置任务:

Linux:

~/.cache/snowflake/ocsp_response_cache.json

macOS:

~/Library/Caches/Snowflake/ocsp_response_cache.json

Windows:

%USERPROFILE%\AppData\Local\Snowflake\Caches\ocsp_response_cache.json

但是,如果要为 OCSP 响应缓存文件指定不同的位置和/或文件名,则 connect 方法将接受 ocsp_response_cache_filename 参数,该参数以 URI 的形式指定 OCSP 缓存文件的路径和名称。

OCSP 响应缓存服务器

备注

Snowflake Connector for Python 1.6.0 及更高版本目前支持 OCSP 响应缓存服务器。

OCSP 内存缓存和文件缓存适用于使用 Snowflake 提供的某个客户端连接到 Snowflake 且具有持久性主机的应用程序。但是,它们不适用于动态预置的环境(例如 AWS Lambda 或 Docker)。

为了解决这种情况,Snowflake 提供了第三级缓存:OCSP 响应缓存服务器。OCSP 响应缓存服务器每小时从 CA 的 OCSP 服务器提取 OCSP 响应,并将这些响应存储 24 小时。然后,客户端可以从此服务器缓存中请求给定 Snowflake 证书的验证状态。

重要

如果服务器策略拒绝对大多数或所有外部 IP 地址和网站的访问,则 必须 允许访问缓存服务器地址,以保障正常的服务运行。缓存服务器 URL 为 ocsp*.snowflakecomputing.cn:80

如果因故需要禁用缓存服务器,请将 SF_OCSP_RESPONSE_CACHE_SERVER_ENABLED 环境变量设置为 false。请注意,该值区分大小写,并且必须为小写。

语言: 中文