使用 Python Connector¶
本主题提供一系列示例,说明如何使用 Snowflake Connector 执行标准的 Snowflake 操作,例如用户登录、数据库和表创建、仓库创建、数据插入/加载和查询。
本主题末尾的示例代码将这些示例合并为一个可运行的 Python 程序。
备注
Snowflake 现在提供一流的 Python APIs 来管理核心 Snowflake 资源,包括数据库、架构、表、任务和仓库,而无需使用 SQL。有关更多信息,请参阅 Snowflake Python API:使用 Python 管理 Snowflake 对象。
本主题内容:
创建数据库、架构和仓库¶
在登录后,如果数据库、架构和仓库尚不存在,请使用 CREATE DATABASE、CREATE SCHEMA 和 CREATE WAREHOUSE 命令创建。
以下示例显示如何创建名为 tiny_warehouse
的仓库、名为 testdb
的数据库和名为 testschema
的架构。请注意,在创建架构时,您必须指定要在其中创建架构的数据库的名称,或者您必须已经连接到要在其中创建架构的数据库。以下示例在 CREATE SCHEMA
命令之前执行 USE DATABASE
命令,以确保在正确的数据库中创建架构。
conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg") conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg") conn.cursor().execute("USE DATABASE testdb_mg") conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
使用数据库、架构和仓库¶
指定要在其中创建表的数据库和架构。此外,还要指定仓库,用于提供执行 DML 语句和查询所需的资源。
例如,要使用数据库 testdb
、架构 testschema
和仓库 tiny_warehouse
(之前创建),请执行以下操作:
conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg") conn.cursor().execute("USE DATABASE testdb_mg") conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
创建表并插入数据¶
使用 CREATE TABLE 命令创建表,并使用 INSERT 命令在表中填充数据。
例如,创建名为 testtable
的表并在表中插入两行:
conn.cursor().execute( "CREATE OR REPLACE TABLE " "test_table(col1 integer, col2 string)") conn.cursor().execute( "INSERT INTO test_table(col1, col2) VALUES " + " (123, 'test string1'), " + " (456, 'test string2')")
加载数据¶
您可以批量加载在内部或外部位置暂存的文件中的数据,而不必使用单个 INSERT 命令将数据插入表中。
从内部位置复制数据¶
要将主机上文件中的数据加载到表中,请先使用 PUT 命令将文件暂存在内部位置,然后使用 COPY INTO <table> 命令将文件中的数据复制到表中。
例如:
# Putting Data con.cursor().execute("PUT file:///tmp/data/file* @%testtable") con.cursor().execute("COPY INTO testtable")其中,CSV 数据存储在 Linux 或 macOS 环境中名为
/tmp/data
的本地目录中,并且该目录包含以下名称的文件:file0
、file1
...file100
。
从外部位置复制数据¶
要将在外部位置(即 S3 桶)暂存的文件中的数据加载到表中,请使用 COPY INTO <table> 命令。
例如:
# Copying Data con.cursor().execute(""" COPY INTO testtable FROM s3://<s3_bucket>/data/ STORAGE_INTEGRATION = myint FILE_FORMAT=(field_delimiter=',') """.format( aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY))其中:
s3://<s3_bucket>/data/
指定 S3 桶的名称桶中的文件具有前缀
data
。桶使用存储集成访问,该集成由账户管理员(即具有 ACCOUNTADMIN 角色的用户)或具有全局 CREATE INTEGRATION 权限的角色使用 CREATE STORAGE INTEGRATION 创建。通过存储集成,用户不必提供凭据即可访问私有存储位置。
备注
此示例使用 format() 函数来编写语句。如果您的环境存在 SQL 注入攻击的风险,您可能最好绑定值而不是使用 format()。
查询数据¶
借助 Snowflake Connector for Python,您可以提交:
在查询完成后,您可以使用 Cursor
对象来 提取结果中的值。默认情况下,Snowflake Connector for Python 将值从 Snowflake 数据类型 转换为原生 Python 数据类型。(请注意,您可以选择将值作为字符串返回,并在应用程序中执行类型转换。请参阅 通过绕过数据转换来提高查询性能。)
备注
默认情况下, NUMBER 列中的值以双精度浮点值 (float64
) 的形式返回。要在 fetch_pandas_all()
和 fetch_pandas_batches()
方法中以十进制值 (decimal.Decimal
) 形式返回这些值,请将 connect()
方法中的 arrow_number_to_decimal
参数设置为 True
。
执行同步查询¶
要执行同步查询,请在 Cursor
对象中调用 execute()
方法。例如:
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
cur.execute('select * from products')
使用 Cursor
对象提取结果中的值,如 使用 cursor 提取值 中所述。
执行异步查询¶
Snowflake Connector for Python 支持异步查询(即,在查询完成前将控制权交还给用户的查询)。您可以提交异步查询,并使用轮询来确定查询何时完成。查询完成后,您可以获得结果。
备注
要执行异步查询,您必须确保 ABORT_DETACHED_QUERY
配置参数为 FALSE
(默认值)。
Snowflake 在一段时间(默认:5 分钟)后会自动关闭连接,这会使所有活跃的查询处于孤立状态。如果值为 TRUE
,Snowflake 将终止这些孤立查询,这可能会影响异步查询。
使用此功能,您可以并行提交多个查询,而无需等待每个查询完成。您还可以在同一个会话中组合运行同步查询和异步查询。
最后,您可以从一个连接提交异步查询,然后从另一个连接检查结果。例如,用户可以从应用程序启动长时间运行的查询,然后退出应用程序,稍后重新启动应用程序以检查结果。
提交异步查询¶
要提交异步查询,请在 Cursor
对象中调用 execute_async()
方法。例如:
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
提交查询后:
要确定查询是否仍在运行,请参阅 检查查询的状态。
要检索查询结果,请参阅 使用查询 ID 检索查询结果。
有关执行异步查询的示例,请参阅 异步查询示例。
异步查询的最佳实践¶
在提交异步查询时,请遵循以下最佳实践:
在并行运行任何查询之前,请确保知道哪些查询依赖于其他查询。有些查询相互依赖且对顺序敏感,因此不适合并行化。例如,很显然, INSERT 语句应该在相应的 CREATE TABLE 语句完成后开始。
确保运行的查询数量不要超过可用内存承受能力。并行运行多个查询通常会消耗更多内存,尤其是当多组结果同时存储在内存中时。
轮询时,处理查询不成功的罕见情况。
确保事务控制语句(BEGIN、COMMIT 和 ROLLBACK)不与其他语句并行执行。
检索 Snowflake 查询 ID¶
查询 ID 用于标识 Snowflake 执行的每个查询。当您使用 Snowflake Connector for Python 执行查询时,您可以通过 Cursor
对象中的 sfqid
属性访问查询 ID :
# Retrieving a Snowflake Query ID cur = con.cursor() cur.execute("SELECT * FROM testtable") print(cur.sfqid)
您可以使用查询 ID 来执行以下操作:
在 Web 界面中检查查询的状态。
在 Classic Console 中,查询 IDs 在 History 页面中显示。请参阅 使用历史记录页面监控查询。
以编程方式检查查询的状态(例如,确定异步查询是否已完成)。
请参阅 检查查询的状态。
检索异步查询或之前提交的同步查询的结果。
请参阅 使用查询 ID 检索查询结果。
取消正在运行的查询。
请参阅 按查询 ID 取消查询。
检查查询的状态¶
要检查查询的状态,请执行以下操作:
从
Cursor
对象中的sfqid
字段获取查询 ID。将查询 ID 传递到
Connection
对象的get_query_status()
方法,以返回代表查询状态的QueryStatus
枚举常量。默认情况下,如果查询导致错误,则
get_query_status()
不会引发错误。如果您想引发错误,请改为调用get_query_status_throw_if_error()
。使用
QueryStatus
枚举常量检查查询的状态。要确定查询是否仍在运行(例如,如果此查询是异步查询),请将常量传递到
Connection
对象的is_still_running()
方法。要确定是否发生了错误,请将该常量传递到
is_an_error()
方法。
有关枚举常量的完整列表,请参阅
QueryStatus
。
以下示例执行异步查询并检查查询的状态:
import time
...
# Execute a long-running query asynchronously.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
...
# Wait for the query to finish running.
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status(query_id)):
time.sleep(1)
如果查询导致错误,则以下示例会引发错误:
from snowflake.connector import ProgrammingError
import time
...
# Wait for the query to finish running and raise an error
# if a problem occurred with the execution of the query.
try:
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status_throw_if_error(query_id)):
time.sleep(1)
except ProgrammingError as err:
print('Programming Error: {0}'.format(err))
使用查询 ID 检索查询结果¶
备注
如果您通过调用 Cursor
对象上的 execute()
方法来 执行同步查询,您无需使用查询 ID 即可检索结果。您可以只从结果中提取值,如 使用 cursor 提取值 中所述。
如果您要检索异步查询或之前提交的同步查询的结果,请执行以下步骤:
获取查询的查询 ID。请参阅 检索 Snowflake 查询 ID。
调用
Cursor
对象中的get_results_from_sfqid()
方法以检索结果。使用
Cursor
对象提取结果中的值,如 使用 cursor 提取值 中所述。
请注意,如果查询仍在运行,则提取方法(fetchone()
、fetchmany()
、fetchall()
等)将等待查询完成。
例如:
# Get the results from a query.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
使用 cursor
提取值¶
使用游标对象迭代器方法从表中提取值。
例如,要从之前(在 创建表并插入数据 中)创建的名为 testtable
的表中,提取名为“col1”和“col2”的列,请使用类似如下的代码:
cur = conn.cursor() try: cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1") for (col1, col2) in cur: print('{0}, {1}'.format(col1, col2)) finally: cur.close()
或者,使用 Snowflake Connector for Python 提供的便捷快捷方式:
for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"): print('{0}, {1}'.format(col1, col2))
如果您需要获取单个结果(即单行),请使用 fetchone
方法:
col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone() print('{0}, {1}'.format(col1, col2))
如果您需要一次获取指定的行数,请使用具有行数的 fetchmany
方法:
cur = con.cursor().execute("SELECT col1, col2 FROM testtable") ret = cur.fetchmany(3) print(ret) while len(ret) > 0: ret = cur.fetchmany(3) print(ret)备注
如果结果集太大而无法放入内存,请使用
fetchone
或fetchmany
。
如果您需要一次获得所有结果,请执行:
results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall() for rec in results: print('%s, %s' % (rec[0], rec[1]))
要为查询设置超时,请执行“begin”命令并在查询中加入超时参数。如果查询超过参数值的长度,则会产生错误并进行回滚。
在以下代码中,错误 604 表示查询已取消。如果查询未在指定时间内完成,则超时参数将启动 Timer()
并取消。
conn.cursor().execute("create or replace table testtbl(a int, b string)") conn.cursor().execute("begin") try: conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query except ProgrammingError as e: if e.errno == 604: print("timeout") conn.cursor().execute("rollback") else: raise e else: conn.cursor().execute("commit")
使用 DictCursor
按列名提取值¶
如果您要按列名提取值,请创建类型为 DictCursor
的 cursor
对象。
例如:
# Querying data by DictCursor from snowflake.connector import DictCursor cur = con.cursor(DictCursor) try: cur.execute("SELECT col1, col2 FROM testtable") for rec in cur: print('{0}, {1}'.format(rec['COL1'], rec['COL2'])) finally: cur.close()
异步查询示例¶
以下是异步查询的简单示例:
from snowflake.connector import ProgrammingError
import time
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
# Retrieve the results.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
下一个示例从一个连接提交异步查询,并从另一个连接检索结果:
from snowflake.connector import ProgrammingError
import time
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
# Get the query ID for the asynchronous query.
query_id = cur.sfqid
# Close the cursor and the connection.
cur.close()
conn.close()
# Open a new connection.
new_conn = snowflake.connector.connect( ... )
# Create a new cursor.
new_cur = new_conn.cursor()
# Retrieve the results.
new_cur.get_results_from_sfqid(query_id)
results = new_cur.fetchall()
print(f'{results[0]}')
按查询 ID 取消查询¶
按 查询 ID 取消查询:
cur = cn.cursor() try: cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')") result = cur.fetchall() print(len(result)) print(result[0]) finally: cur.close()
将字符串“queryID”替换为实际查询 ID。要获取查询 ID,请参阅 检索 Snowflake 查询 ID。
通过绕过数据转换来提高查询性能¶
为提高查询性能,请使用 snowflake.connector.converter_null
模块中的 SnowflakeNoConverterToPython
类,绕过从 Snowflake 内部数据类型到原生 Python 数据类型的数据转换,例如:
from snowflake.connector.converter_null import SnowflakeNoConverterToPython con = snowflake.connector.connect( ... converter_class=SnowflakeNoConverterToPython ) for rec in con.cursor().execute("SELECT * FROM large_table"): # rec includes raw Snowflake data
因此,所有数据都以字符串形式表示,以至于应用程序负责将其转换为原生 Python 数据类型。例如, TIMESTAMP_NTZ
和 TIMESTAMP_LTZ
数据是以字符串形式表示的纪元时间,而 TIMESTAMP_TZ
数据是纪元时间,后跟一个空格,然后是以字符串形式表示的以分钟为单位的 UTC 偏移。
对绑定数据没有影响;仍然可以绑定 Python 原生数据以进行更新。
绑定数据¶
要指定在 SQL 语句中使用的值,您可以在语句中包含字面量,也可以 绑定变量。绑定变量时,在 SQL 语句文本中放置一个或多个占位符,然后为每个占位符指定变量(要使用的值)。
以下示例对比了字面量和绑定的使用:
字面量:
con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")绑定:
con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%s, %s)", ( 789, 'test string3' ))
备注
可以绑定的数据大小或可以批量合并的数据大小存在上限。有关详细信息,请参阅 查询文本大小的限制。
Snowflake 支持以下类型的绑定:
下文将逐一解释。
pyformat
或 format
绑定¶
pyformat
绑定和 format
绑定都在客户端绑定数据,而不是在服务器端绑定数据。
默认情况下,Snowflake Connector for Python 支持 pyformat
和 format
,因此您可以使用 %(name)s
或 %s
作为占位符。例如:
使用
%(name)s
作为占位符:conn.cursor().execute( "INSERT INTO test_table(col1, col2) " "VALUES(%(col1)s, %(col2)s)", { 'col1': 789, 'col2': 'test string3', })
使用
%s
作为占位符:con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%s, %s)", ( 789, 'test string3' ))
借助 pyformat
和 format
,您还可以使用列表对象为 IN 运算符绑定数据:
# Binding data for IN operator con.cursor().execute( "SELECT col1, col2 FROM testtable" " WHERE col2 IN (%s)", ( ['test string1', 'test string3'], ))
百分号字符(“%”)既是 SQL LIKE 的通配符,也是 Python 的格式绑定字符。如果您使用格式绑定,并且 SQL 命令包含百分号字符,您可能需要对百分号字符进行转义。例如,如果 SQL 语句是:
SELECT col1, col2 FROM test_table WHERE col2 ILIKE '%York' LIMIT 1; -- Find York, New York, etc.
那么 Python 代码应该如下所示(注意额外的百分号,对原始百分号进行转义):
sql_command = "select col1, col2 from test_table " sql_command += " where col2 like '%%York' limit %(lim)s" parameter_dictionary = {'lim': 1 } cur.execute(sql_command, parameter_dictionary)
qmark
或 numeric
绑定¶
qmark
绑定和 numeric
绑定都在服务器端绑定数据,而不是在客户端绑定数据:
要进行
qmark
绑定,请使用问号字符 (?
),表示您要将变量值插入字符串的哪个位置。要进行
numeric
绑定,请使用冒号 (:
) 后跟数字,表示要在该位置替换的变量的位置。例如,:2
指定第二个变量。使用 numeric 绑定,可在同一个查询中多次绑定相同的值。例如,如果您要多次使用长 VARCHAR 或 BINARY 或者 半结构化 值,则
numeric
绑定允许您将该值发送到服务器一次,然后多次使用。
下一部分说明如何使用 qmark
和 numeric
绑定:
使用 qmark
或 numeric
绑定¶
要使用 qmark
或 numeric
样式绑定,请执行以下任一语句:
snowflake.connector.paramstyle='qmark'
snowflake.connector.paramstyle='numeric'
重要
在调用 connect()
方法 之前,您必须设置 paramstyle
属性。
如果您将 paramstyle
设置为 qmark
或 numeric
,您必须分别使用 ?
或 :N
(其中 N
被替换为数字)作为占位符。
例如:
使用
?
作为占位符:import snowflake.connector snowflake.connector.paramstyle='qmark' con = snowflake.connector.connect(...) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(?, ?)", ( 789, 'test string3' ))
使用
:N
作为占位符:import snowflake.connector snowflake.connector.paramstyle='numeric' con = snowflake.connector.connect(...) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(:1, :2)", ( 789, 'test string3' ))
以下查询显示如何使用
numeric
绑定来重用变量:con.cursor().execute( "INSERT INTO testtable(complete_video, short_sample_of_video) " "VALUES(:1, SUBSTRING(:1, :2, :3))", ( binary_value_that_stores_video, # variable :1 starting_offset_in_bytes_of_video_clip, # variable :2 length_in_bytes_of_video_clip # variable :3 ))
将 qmark
或 numeric
绑定与 datetime
对象结合使用¶
使用 qmark
或 numeric
绑定将数据绑定到 Snowflake TIMESTAMP 数据类型时,请将绑定变量设置为元组,指定 Snowflake 时间戳数据类型(TIMESTAMP_LTZ
或 TIMESTAMP_TZ
)和值。例如:
import snowflake.connector snowflake.connector.paramstyle='qmark' con = snowflake.connector.connect(...) con.cursor().execute( "CREATE OR REPLACE TABLE testtable2 (" " col1 int, " " col2 string, " " col3 timestamp_ltz" ")" ) con.cursor().execute( "INSERT INTO testtable2(col1,col2,col3) " "VALUES(?,?,?)", ( 987, 'test string4', ("TIMESTAMP_LTZ", datetime.now()) ) )
与客户端绑定不同,服务器端绑定要求列使用 Snowflake 数据类型。最常见的 Python 数据类型已经隐式映射到 Snowflake 数据类型(例如, int
映射到 FIXED
)。但是,由于 Python datetime
数据可以绑定到多种 Snowflake 数据类型(TIMESTAMP_NTZ
、TIMESTAMP_LTZ
或 TIMESTAMP_TZ
)中的一种,并且默认映射为 TIMESTAMP_NTZ
,因此您必须指定要使用的 Snowflake 数据类型。
将绑定变量与 IN 运算符结合使用¶
qmark
和 numeric
(服务器端绑定)不支持将绑定变量与 IN 运算符结合使用。
如果您需要将绑定变量与 IN 运算符结合使用,请使用 :ref:` 客户端绑定 <label-python_connector_binding_data_client>` (pyformat
或 format
)。
将参数绑定到变量以进行批量插入¶
在应用程序代码中,您可以单批次插入多行。为此,请在 INSERT 语句中为值使用参数。例如,以下语句使用占位符在 INSERT 语句中进行 qmark
绑定:
insert into grocery (item, quantity) values (?, ?)
然后,要指定应插入的数据,请定义表示序列顺序的变量(例如,元组列表):
rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
如上例所示,列表中的每个项目都是元组,其中包含要插入的行的列值。
要执行绑定,请调用 executemany()
方法,将变量作为第二个实参传递。例如:
conn = snowflake.connector.connect( ... ) rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)] conn.cursor().executemany( "insert into grocery (item, quantity) values (?, ?)", rows_to_insert)
如果您 :ref:` 在服务器上绑定数据 <label-python_connector_binding_data_server>` (即使用 qmark
或 numeric
绑定),则连接器可以通过绑定来优化批量插入的性能。
使用此技术插入大量值时,驱动程序可以通过将数据流传输(而无需在本地计算机上创建文件)到临时暂存区进行引入来提高性能。当值数超过阈值时,驱动程序会自动执行此操作。
此外,您必须设置会话的当前数据库和架构。如果未设置这些值,则驱动程序执行的 CREATE TEMPORARY STAGE 命令可能会失败,并出现以下错误:
CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
Cannot perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
备注
有关将数据加载到 Snowflake 数据库的其他方法(包括使用 COPY 命令进行批量加载),请参阅 将数据载入 Snowflake。
避免 SQL 注入攻击¶
避免使用 Python 的格式化函数绑定数据,因为这样存在 SQL 注入风险。例如:
# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%(col1)d, '%(col2)s')" % { 'col1': 789, 'col2': 'test string3' })# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%d, '%s')" % ( 789, 'test string3' ))# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES({col1}, '{col2}')".format( col1=789, col2='test string3') )
改为以变量存储值,然后使用 qmark 或 numeric 绑定样式来绑定这些变量。
检索列元数据¶
要检索结果集中每列的元数据(例如每列的名称、类型、精度、标度等),请使用以下其中一种方法:
要在调用
execute()
方法以执行查询之后访问元数据,请使用Cursor
对象的description
属性。要在 无需 执行查询的情况下访问元数据,请调用
describe()
方法。describe
方法适用于 Snowflake Connector for Python 2.4.6 和最近的版本。
description
属性设置为以下值:
版本 2.4.5 及更早版本: 元组列表。
版本 2.4.6 及更高版本: ResultMetadata 对象列表。(
describe
方法也会返回此列表。)
每个元组和 ResultMetadata
对象都包含列的元数据(列名、数据类型等)。您可以 按索引访问元数据,或者在 2.4.6 及更高版本中,按 ResultMetadata
属性访问元数据。
以下示例演示如何访问返回元组和 ResultMetadata
对象中的元数据。
示例:按索引获取列名元数据(版本 2.4.5 及更早版本):
以下示例在执行查询后使用 description
属性检索列名列表。该属性是一个元组列表,该示例从每个元组的第一个值访问列名。
cur = conn.cursor() cur.execute("SELECT * FROM test_table") print(','.join([col[0] for col in cur.description]))
示例:按属性获取列名元数据(版本 2.4.6 及更高版本):
以下示例在执行查询后使用 description
属性检索列名列表。该属性是一个 ResultMetaData 对象列表,该示例从每个 ResultMetadata
对象的 name
属性中访问列名。
cur = conn.cursor() cur.execute("SELECT * FROM test_table") print(','.join([col.name for col in cur.description]))
示例:在不执行查询的情况下获取列名元数据(版本 2.4.6 及更高版本):
以下示例使用 describe
方法检索列名列表,无需执行查询。describe()
方法返回 ResultMetaData 对象列表,该示例从每个 ResultMetadata
对象的 name
属性中访问列名。
cur = conn.cursor() result_metadata_list = cur.describe("SELECT * FROM test_table") print(','.join([col.name for col in result_metadata_list]))
处理错误¶
应用程序必须正确处理由 Snowflake Connector 引发的异常,并决定继续或停止运行代码。
# Catching the syntax error cur = con.cursor() try: cur.execute("SELECT * FROM testtable") except snowflake.connector.errors.ProgrammingError as e: # default error message print(e) # customer error message print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid)) finally: cur.close()
使用 execute_stream
来执行 SQL 脚本¶
execute_stream
函数支持您在一个流中运行一个或多个 SQL 脚本:
from codecs import open with open(sqlfile, 'r', encoding='utf-8') as f: for cur in con.execute_stream(f): for ret in cur: print(ret)
关闭连接¶
最佳实践是通过调用 close
方法来关闭连接:
connection.close()
这样可以确保将收集的客户端指标提交到服务器并删除会话。此外,即使中间出现异常, try-finally
块也有助于确保连接关闭:
# Connecting to Snowflake con = snowflake.connector.connect(...) try: # Running queries con.cursor().execute(...) ... finally: # Closing the connection con.close()
使用上下文管理器来连接和控制事务¶
Snowflake Connector for Python 支持上下文管理器,可根据需要分配和释放资源。禁用 autocommit
时,上下文管理器有助于根据语句状态提交或回滚事务。
# Connecting to Snowflake using the context manager with snowflake.connector.connect( user=USER, password=PASSWORD, account=ACCOUNT, autocommit=False, ) as con: con.cursor().execute("INSERT INTO a VALUES(1, 'test1')") con.cursor().execute("INSERT INTO a VALUES(2, 'test2')") con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
在上面的示例中,当第三条语句失败时,上下文管理器会回滚事务中的变更并关闭连接。如果所有语句都成功,则上下文管理器将提交变更并关闭连接。
使用 try
和 except
块的等效代码如下所示:
# Connecting to Snowflake using try and except blocks con = snowflake.connector.connect( user=USER, password=PASSWORD, account=ACCOUNT, autocommit=False) try: con.cursor().execute("INSERT INTO a VALUES(1, 'test1')") con.cursor().execute("INSERT INTO a VALUES(2, 'test2')") con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail con.commit() except Exception as e: con.rollback() raise e finally: con.close()
使用 VECTOR 数据类型¶
Snowflake Python Connector 3.6.0 版本引入了对 VECTOR 数据类型 的支持。您可以使用 VECTOR 数据类型与 向量相似度函数,来实现基于向量搜索或检索增强生成 (RAG) 的应用程序。
下面的代码示例展示了如何使用 Python Connector 创建具有 VECTOR 列的表,并调用 VECTOR_INNER_PRODUCT 函数:
import snowflake.connector
conn = ... # Set up connection
cur = conn.cursor()
# Create a table and insert some vectors
cur.execute("CREATE OR REPLACE TABLE vectors (a VECTOR(FLOAT, 3), b VECTOR(FLOAT, 3))")
values = [([1.1, 2.2, 3], [1, 1, 1]), ([1, 2.2, 3], [4, 6, 8])]
for row in values:
cur.execute(f"""
INSERT INTO vectors(a, b)
SELECT {row[0]}::VECTOR(FLOAT,3), {row[1]}::VECTOR(FLOAT,3)
""")
# Compute the pairwise inner product between columns a and b
cur.execute("SELECT VECTOR_INNER_PRODUCT(a, b) FROM vectors")
print(cur.fetchall())
[(6.30...,), (41.2...,)]
下面的代码示例展示了如何使用 Python Connector 调用 VECTOR_COSINE_SIMILARITY 函数,来查找最接近 [1,2,3]
的向量:
cur.execute(f"""
SELECT a, VECTOR_COSINE_SIMILARITY(a, {[1,2,3]}::VECTOR(FLOAT, 3))
AS similarity
FROM vectors
ORDER BY similarity DESC
LIMIT 1;
""")
print(cur.fetchall())
[([1.0, 2.2..., 3.0], 0.9990...)]
备注
VECTOR 数据类型不支持变量绑定。
日志记录¶
Snowflake Connector for Python 利用标准 Python logging
模块定期记录状态,以便应用程序可以跟踪其在幕后工作的活动。要启用日志记录,最简单方法是在应用程序的开头调用 logging.basicConfig()
。
例如,将日志记录级别设置为 INFO
,并将日志存储在名为 /tmp/snowflake_python_connector.log
的文件中:
logging.basicConfig( filename=file_name, level=logging.INFO)
将日志记录级别设置为 DEBUG
,可以启用更全面的日志记录,如下所示:
# Logging including the timestamp, thread and the source code location import logging for logger_name in ['snowflake.connector', 'botocore', 'boto3']: logger = logging.getLogger(logger_name) logger.setLevel(logging.DEBUG) ch = logging.FileHandler('/tmp/python_connector.log') ch.setLevel(logging.DEBUG) ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s')) logger.addHandler(ch)SecretDetector 格式化程序类虽为可选,但推荐使用,可确保特定的一组已知敏感信息在写入 Snowflake Python Connector 日志文件之前经过掩码处理。要使用 SecretDetector,请使用类似如下的代码:
# Logging including the timestamp, thread and the source code location import logging from snowflake.connector.secret_detector import SecretDetector for logger_name in ['snowflake.connector', 'botocore', 'boto3']: logger = logging.getLogger(logger_name) logger.setLevel(logging.DEBUG) ch = logging.FileHandler('/tmp/python_connector.log') ch.setLevel(logging.DEBUG) ch.setFormatter(SecretDetector('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s')) logger.addHandler(ch)备注
botocore
和boto3
适用于 AWS (Amazon Web Services) SDK for Python。
日志记录配置文件¶
或者,您也可以在 config.toml
配置文件中轻松指定日志级别和保存日志文件的目录。有关此文件的详细信息,请参阅 使用 connections.toml 文件进行连接。
备注
此日志记录配置功能支持 Python 日志记录文档中定义的日志级别。
此日志记录配置文件使用 toml 定义 save_logs
、level
和 path
日志记录参数,如下所示:
[log]
save_logs = true
level = "INFO"
path = "<directory to store logs>"
其中:
save_logs
决定是否保存日志。level
指定日志记录级别。如果未定义,驱动程序默认为INFO
。path
用于标识保存日志文件的目录。如果未定义,驱动程序会将日志保存在默认$SNOWFLAKE_HOME/logs/
目录中。
备注
如果您的 config.toml
文件不包含 [log]
部分,则不会保存日志消息。
一天的日志消息将附加到 python-connector.log
文件中,该文件后续重命名为 python-connector.log.YYYY-MM-DD
。
示例程序¶
以下示例代码将前面各部分中描述的许多示例合并到一个有效的 Python 程序中。此示例包含两部分:
父类(“python_veritas_base”)包含许多常见操作(例如连接到服务器)的代码。
子类(“python_connector_example”)代表特定客户端的自定义部分,例如查询表。
此示例代码直接从我们的一项测试中导入,以帮助确保它已在产品的最新内部版本上执行。
由于此代码是从测试中提取,因此它包含用于设置某些测试中使用的备用端口和协议的少量代码。用户 不应 设置协议或端口号;而是省略这些协议或端口号并使用默认值。
此代码还包含一些部分标记(有时称为“片段标签”),用于识别可独立导入到文档中的代码。部分标记通常看起来类似如下:
# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------
用户代码中不需要这些部分标记。
代码示例的第一部分包含以下常用子程序:
读取包含连接信息的命令行实参(例如,“--warehouse MyWarehouse”)。
连接到服务器。
创建并使用仓库、数据库和架构。
使用完架构、数据库和仓库后,将其删除。
import logging
import os
import sys
# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------
class python_veritas_base:
"""
PURPOSE:
This is the Base/Parent class for programs that use the Snowflake
Connector for Python.
This class is intended primarily for:
* Sample programs, e.g. in the documentation.
* Tests.
"""
def __init__(self, p_log_file_name = None):
"""
PURPOSE:
This does any required initialization steps, which in this class is
basically just turning on logging.
"""
file_name = p_log_file_name
if file_name is None:
file_name = '/tmp/snowflake_python_connector.log'
# -- (> ---------- SECTION=begin_logging -----------------------------
logging.basicConfig(
filename=file_name,
level=logging.INFO)
# -- <) ---------- END_SECTION ---------------------------------------
# -- (> ---------------------------- SECTION=main ------------------------
def main(self, argv):
"""
PURPOSE:
Most tests follow the same basic pattern in this main() method:
* Create a connection.
* Set up, e.g. use (or create and use) the warehouse, database,
and schema.
* Run the queries (or do the other tasks, e.g. load data).
* Clean up. In this test/demo, we drop the warehouse, database,
and schema. In a customer scenario, you'd typically clean up
temporary tables, etc., but wouldn't drop your database.
* Close the connection.
"""
# Read the connection parameters (e.g. user ID) from the command line
# and environment variables, then connect to Snowflake.
connection = self.create_connection(argv)
# Set up anything we need (e.g. a separate schema for the test/demo).
self.set_up(connection)
# Do the "real work", for example, create a table, insert rows, SELECT
# from the table, etc.
self.do_the_real_work(connection)
# Clean up. In this case, we drop the temporary warehouse, database, and
# schema.
self.clean_up(connection)
print("\nClosing connection...")
# -- (> ------------------- SECTION=close_connection -----------------
connection.close()
# -- <) ---------------------------- END_SECTION ---------------------
# -- <) ---------------------------- END_SECTION=main --------------------
def args_to_properties(self, args):
"""
PURPOSE:
Read the command-line arguments and store them in a dictionary.
Command-line arguments should come in pairs, e.g.:
"--user MyUser"
INPUTS:
The command line arguments (sys.argv).
RETURNS:
Returns the dictionary.
DESIRABLE ENHANCEMENTS:
Improve error detection and handling.
"""
connection_parameters = {}
i = 1
while i < len(args) - 1:
property_name = args[i]
# Strip off the leading "--" from the tag, e.g. from "--user".
property_name = property_name[2:]
property_value = args[i + 1]
connection_parameters[property_name] = property_value
i += 2
return connection_parameters
def create_connection(self, argv):
"""
PURPOSE:
This gets account identifier and login information from the
environment variables and command-line parameters, connects to the
server, and returns the connection object.
INPUTS:
argv: This is usually sys.argv, which contains the command-line
parameters. It could be an equivalent substitute if you get
the parameter information from another source.
RETURNS:
A connection.
"""
# Get account identifier and login information from environment variables and command-line parameters.
# For information about account identifiers, see
# https://docs.snowflake.com/en/user-guide/admin-account-identifier.html .
# -- (> ----------------------- SECTION=set_login_info ---------------
# Get the password from an appropriate environment variable, if
# available.
PASSWORD = os.getenv('SNOWSQL_PWD')
# Get the other login info etc. from the command line.
if len(argv) < 11:
msg = "ERROR: Please pass the following command-line parameters:\n"
msg += "--warehouse <warehouse> --database <db> --schema <schema> "
msg += "--user <user> --account <account_identifier> "
print(msg)
sys.exit(-1)
else:
connection_parameters = self.args_to_properties(argv)
USER = connection_parameters["user"]
ACCOUNT = connection_parameters["account"]
WAREHOUSE = connection_parameters["warehouse"]
DATABASE = connection_parameters["database"]
SCHEMA = connection_parameters["schema"]
# Optional: for internal testing only.
try:
PORT = connection_parameters["port"]
except:
PORT = ""
try:
PROTOCOL = connection_parameters["protocol"]
except:
PROTOCOL = ""
# If the password is set by both command line and env var, the
# command-line value takes precedence over (is written over) the
# env var value.
# If the password wasn't set either in the environment var or on
# the command line...
if PASSWORD is None or PASSWORD == '':
print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
sys.exit(-2)
# -- <) ---------------------------- END_SECTION ---------------------
# Optional diagnostic:
#print("USER:", USER)
#print("ACCOUNT:", ACCOUNT)
#print("WAREHOUSE:", WAREHOUSE)
#print("DATABASE:", DATABASE)
#print("SCHEMA:", SCHEMA)
#print("PASSWORD:", PASSWORD)
#print("PROTOCOL:" "'" + PROTOCOL + "'")
#print("PORT:" + "'" + PORT + "'")
print("Connecting...")
# If the PORT is set but the protocol is not, we ignore the PORT (bug!!).
if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
# -- (> ------------------- SECTION=connect_to_snowflake ---------
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA
)
# -- <) ---------------------------- END_SECTION -----------------
else:
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA,
# Optional: for internal testing only.
protocol=PROTOCOL,
port=PORT
)
return conn
def set_up(self, connection):
"""
PURPOSE:
Set up to run a test. You can override this method with one
appropriate to your test/demo.
"""
# Create a temporary warehouse, database, and schema.
self.create_warehouse_database_and_schema(connection)
def do_the_real_work(self, conn):
"""
PURPOSE:
Your sub-class should override this to include the code required for
your documentation sample or your test case.
This default method does a very simple self-test that shows that the
connection was successful.
"""
# Create a cursor for this connection.
cursor1 = conn.cursor()
# This is an example of an SQL statement we might want to run.
command = "SELECT PI()"
# Run the statement.
cursor1.execute(command)
# Get the results (should be only one):
for row in cursor1:
print(row[0])
# Close this cursor.
cursor1.close()
def clean_up(self, connection):
"""
PURPOSE:
Clean up after a test. You can override this method with one
appropriate to your test/demo.
"""
# Create a temporary warehouse, database, and schema.
self.drop_warehouse_database_and_schema(connection)
def create_warehouse_database_and_schema(self, conn):
"""
PURPOSE:
Create the temporary schema, database, and warehouse that we use
for most tests/demos.
"""
# Create a database, schema, and warehouse if they don't already exist.
print("\nCreating warehouse, database, schema...")
# -- (> ------------- SECTION=create_warehouse_database_schema -------
conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
# -- <) ---------------------------- END_SECTION ---------------------
# -- (> --------------- SECTION=use_warehouse_database_schema --------
conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
# -- <) ---------------------------- END_SECTION ---------------------
def drop_warehouse_database_and_schema(self, conn):
"""
PURPOSE:
Drop the temporary schema, database, and warehouse that we create
for most tests/demos.
"""
# -- (> ------------- SECTION=drop_warehouse_database_schema ---------
conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
# -- <) ---------------------------- END_SECTION ---------------------
# ----------------------------------------------------------------------------
if __name__ == '__main__':
pvb = python_veritas_base()
pvb.main(sys.argv)
代码示例的第二部分会创建一个表,在其中插入行等:
import sys
# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------
# Import the base class that contains methods used in many tests and code
# examples.
from python_veritas_base import python_veritas_base
class python_connector_example (python_veritas_base):
"""
PURPOSE:
This is a simple example program that shows how to use the Snowflake
Python Connector to create and query a table.
"""
def __init__(self):
pass
def do_the_real_work(self, conn):
"""
INPUTS:
conn is a Connection object returned from snowflake.connector.connect().
"""
print("\nCreating table test_table...")
# -- (> ----------------------- SECTION=create_table ---------------------
conn.cursor().execute(
"CREATE OR REPLACE TABLE "
"test_table(col1 integer, col2 string)")
conn.cursor().execute(
"INSERT INTO test_table(col1, col2) VALUES " +
" (123, 'test string1'), " +
" (456, 'test string2')")
# -- <) ---------------------------- END_SECTION -------------------------
print("\nSelecting from test_table...")
# -- (> ----------------------- SECTION=querying_data --------------------
cur = conn.cursor()
try:
cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
for (col1, col2) in cur:
print('{0}, {1}'.format(col1, col2))
finally:
cur.close()
# -- <) ---------------------------- END_SECTION -------------------------
# ============================================================================
if __name__ == '__main__':
test_case = python_connector_example()
test_case.main(sys.argv)
要运行此示例,请执行以下操作:
将第一段代码复制到名为“python_veritas_base.py”的文件中。
将第二段代码复制到名为“python_connector_example.py”的文件中。
将 SNOWSQL_PWD 环境变量设置为您的密码,例如:
export SNOWSQL_PWD='MyPassword'使用类似如下的命令行执行程序(当然,将用户和账户信息替换为您自己的用户和账户信息)。
警告
这样会在程序结束时删除仓库、数据库和架构!不要 使用现有数据库的名称,以免丢失!
python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account myorganization-myaccount --user MyUserName
以下是输出:
Connecting...
Creating warehouse, database, schema...
Creating table test_table...
Selecting from test_table...
123, test string1
456, test string2
Closing connection...
此处是更长的示例:
备注
在设置账户和登录信息的部分中,确保根据需要替换变量,以匹配您的 Snowflake 登录信息(名称、密码等)。
此示例使用 format() 函数来编写语句。如果您的环境存在 SQL 注入攻击的风险,您可能最好绑定值而不是使用 format()。
#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#
# Logging
import logging
logging.basicConfig(
filename='/tmp/snowflake_python_connector.log',
level=logging.INFO)
import snowflake.connector
# Set your account and login information (replace the variables with
# the necessary values).
ACCOUNT = '<account_identifier>'
USER = '<login_name>'
PASSWORD = '<password>'
import os
# Only required if you copy data from your S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
# Connecting to Snowflake
con = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
)
# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")
# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")
# Creating a table and inserting data
con.cursor().execute(
"CREATE OR REPLACE TABLE "
"testtable(col1 integer, col2 string)")
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(123, 'test string1'),(456, 'test string2')")
# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")
# Copying data from external stage (S3 bucket -
# replace <s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
STORAGE_INTEGRATION = myint
FILE_FORMAT=(field_delimiter=',')
""".format(
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY))
# Querying data
cur = con.cursor()
try:
cur.execute("SELECT col1, col2 FROM testtable")
for (col1, col2) in cur:
print('{0}, {1}'.format(col1, col2))
finally:
cur.close()
# Binding data
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(%(col1)s, %(col2)s)", {
'col1': 789,
'col2': 'test string3',
})
# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))
# Catching syntax errors
cur = con.cursor()
try:
cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
# default error message
print(e)
# user error message
print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
cur.close()
# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)
# Closing the connection
con.close()