使用 Snowflake Connector for Python 分发提取结果的工作负载¶
如果使用分布式环境并行处理工作负载,则可以使用 Snowflake Connector for Python 对提取和处理结果的工作进行分发。
本主题内容:
简介¶
使用 Cursor
对象执行查询后,可以使用结果批次来分发提取结果的工作。结果批次 封装了一个函数,用于检索结果的子集。您可以分配不同的工作进程使用不同的结果批次,从而并行提取和处理结果。
检索结果批次列表¶
执行查询后,可以采用以下格式之一检索结果:
ResultBatch 对象。
要执行此操作,请对 Cursor 对象调用
get_result_batches()
方法。这将返回一个ResultBatch
对象列表,您可以将这些对象分配给不同的工作进程来处理。例如:with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Get the list of result batches result_batch_list = cur.get_result_batches() # Get the number of result batches in the list. num_result_batches = len(result_batch_list) # Split the list of result batches into two # to distribute the work of fetching results # between two workers. result_batch_list_1 = result_batch_list[:: 2] result_batch_list_2 = result_batch_list[1 :: 2]
PyArrow 表。
您可以使用以下方法,以 PyArrow 表的形式检索结果批次:
fetch_arrow_all()
:调用此方法可返回包含所有结果的 PyArrow 表。fetch_arrow_batches()
:调用此方法可返回一个迭代器,您可以使用该迭代器为每个结果批次返回一个 PyArrow 表。
例如:
with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Return a PyArrow table containing all of the results. table = cur.fetch_arrow_all() # Iterate over a list of PyArrow tables for result batches. for table_for_batch in cur.fetch_arrow_batches(): my_pyarrow_table_processing_function(table_for_batch)
pandas DataFrame 对象。
如果 安装了兼容 pandas 的 Snowflake Connector for Python 版本,则可以使用以下方法,以 pandas DataFrame 对象的形式检索结果批次:
fetch_pandas_all()
:调用此方法可返回包含所有结果的 pandas DataFrame。fetch_pandas_batches()
:调用此方法可返回一个迭代器,您可以使用该迭代器为每个结果批次返回一个 pandas DataFrame。
例如:
with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Return a pandas DataFrame containing all of the results. table = cur.fetch_pandas_all() # Iterate over a list of pandas DataFrames for result batches. for dataframe_for_batch in cur.fetch_pandas_batches(): my_dataframe_processing_function(dataframe_for_batch)
序列化结果批次¶
若要将结果批次移动到其他工作进程或节点,可以对结果批次进行序列化和反序列化。例如:
import pickle
# Serialize a result batch from the first list.
pickled_batch = pickle.dumps(result_batch_list_1[1])
# At this point, you can move the serialized data to
# another worker/node.
...
# Deserialize the result batch for processing.
unpickled_batch = pickle.loads(pickled_batch)
使用结果批次¶
接下来的几个部分将介绍如何使用 ResultBatch 对象:
迭代结果批次中的行¶
可以使用 ResultBatch
对象来迭代该批次中的行。例如:
# Iterate over the list of result batches.
for batch in result_batch_list_1:
# Iterate over the subset of rows in a result batch.
for row in batch:
print(row)
创建 ResultBatch
对象的迭代器时,该对象会提取并转换该批次的行子集。
具体化结果批次中的行¶
通过将该 ResultBatch
对象传递给 list()
函数,来将结果批次中的行子集具体化。例如:
# Materialize the subset of results for the first result batch
# in the list.
first_result_batch = result_batch_list_1[1]
first_result_batch_data = list(first_result_batch)
获取结果批次的行数和大小¶
如果需要确定结果批次中的行数和数据大小,可以使用 ResultBatch
对象的 rowcount、compressed_size 和 uncompressed_size 属性。例如:
# Get the number of rows in a result batch.
num_rows = first_result_batch.rowcount
# Get the size of the data in a result batch.
compressed_size = first_result_batch.compressed_size
uncompressed_size = first_result_batch.uncompressed_size
请注意,这些属性可在迭代结果批次之前获得。您无需提取批次的行子集即可获取这些属性的值。
将 Arrow 结果批次转换为 PyArrow 表或 pandas DataFrame¶
要将 ArrowResultBatch
转换为 PyArrow 表或 pandas DataFrame,请使用以下方法:
to_pandas()
:如果 安装了兼容 pandas 的 Snowflake Connector for Python 版本,则调用此方法可返回一个 pandas DataFrame,其中包含ArrowResultBatch
中的行。to_arrow()
:调用此方法可返回一个 PyArrow 表,其中包含ResultBatch
中的行。
例如:
with conn_cnx as con:
with con.cursor() as cur:
cur.execute("select col1 from table")
batches = cur.get_result_batches()
# Get the row from the ResultBatch as a pandas DataFrame.
dataframe = batches[0].to_pandas()
# Get the row from the ResultBatch as a PyArrow table.
table = batches[0].to_arrow()