使用 Snowflake Connector for Python 分发提取结果的工作负载

如果使用分布式环境并行处理工作负载,则可以使用 Snowflake Connector for Python 对提取和处理结果的工作进行分发。

本主题内容:

简介

使用 Cursor 对象执行查询后,可以使用结果批次来分发提取结果的工作。结果批次 封装了一个函数,用于检索结果的子集。您可以分配不同的工作进程使用不同的结果批次,从而并行提取和处理结果。

检索结果批次列表

执行查询后,可以采用以下格式之一检索结果:

  • ResultBatch 对象。

    要执行此操作,请对 Cursor 对象调用 get_result_batches() 方法。这将返回一个 ResultBatch 对象列表,您可以将这些对象分配给不同的工作进程来处理。例如:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Get the list of result batches
            result_batch_list = cur.get_result_batches()
    
            # Get the number of result batches in the list.
            num_result_batches = len(result_batch_list)
    
            # Split the list of result batches into two
            # to distribute the work of fetching results
            # between two workers.
            result_batch_list_1 = result_batch_list[:: 2]
            result_batch_list_2 = result_batch_list[1 :: 2]
    
    Copy
  • PyArrow 表。

    您可以使用以下方法,以 PyArrow 表的形式检索结果批次:

    • fetch_arrow_all():调用此方法可返回包含所有结果的 PyArrow 表。

    • fetch_arrow_batches():调用此方法可返回一个迭代器,您可以使用该迭代器为每个结果批次返回一个 PyArrow 表。

    例如:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a PyArrow table containing all of the results.
            table = cur.fetch_arrow_all()
    
            # Iterate over a list of PyArrow tables for result batches.
            for table_for_batch in cur.fetch_arrow_batches():
              my_pyarrow_table_processing_function(table_for_batch)
    
    Copy
  • pandas DataFrame 对象。

    如果 安装了兼容 pandas 的 Snowflake Connector for Python 版本,则可以使用以下方法,以 pandas DataFrame 对象的形式检索结果批次:

    • fetch_pandas_all():调用此方法可返回包含所有结果的 pandas DataFrame。

    • fetch_pandas_batches():调用此方法可返回一个迭代器,您可以使用该迭代器为每个结果批次返回一个 pandas DataFrame。

    例如:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a pandas DataFrame containing all of the results.
            table = cur.fetch_pandas_all()
    
            # Iterate over a list of pandas DataFrames for result batches.
            for dataframe_for_batch in cur.fetch_pandas_batches():
              my_dataframe_processing_function(dataframe_for_batch)
    
    Copy

序列化结果批次

若要将结果批次移动到其他工作进程或节点,可以对结果批次进行序列化和反序列化。例如:

import pickle

# Serialize a result batch from the first list.
pickled_batch = pickle.dumps(result_batch_list_1[1])

# At this point, you can move the serialized data to
# another worker/node.
...

# Deserialize the result batch for processing.
unpickled_batch = pickle.loads(pickled_batch)
Copy

使用结果批次

接下来的几个部分将介绍如何使用 ResultBatch 对象:

迭代结果批次中的行

可以使用 ResultBatch 对象来迭代该批次中的行。例如:

# Iterate over the list of result batches.
for batch in result_batch_list_1:
    # Iterate over the subset of rows in a result batch.
    for row in batch:
        print(row)
Copy

创建 ResultBatch 对象的迭代器时,该对象会提取并转换该批次的行子集。

具体化结果批次中的行

通过将该 ResultBatch 对象传递给 list() 函数,来将结果批次中的行子集具体化。例如:

# Materialize the subset of results for the first result batch
# in the list.
first_result_batch = result_batch_list_1[1]
first_result_batch_data = list(first_result_batch)
Copy

获取结果批次的行数和大小

如果需要确定结果批次中的行数和数据大小,可以使用 ResultBatch 对象的 rowcountcompressed_sizeuncompressed_size 属性。例如:

# Get the number of rows in a result batch.
num_rows = first_result_batch.rowcount

# Get the size of the data in a result batch.
compressed_size = first_result_batch.compressed_size
uncompressed_size = first_result_batch.uncompressed_size
Copy

请注意,这些属性可在迭代结果批次之前获得。您无需提取批次的行子集即可获取这些属性的值。

将 Arrow 结果批次转换为 PyArrow 表或 pandas DataFrame

要将 ArrowResultBatch 转换为 PyArrow 表或 pandas DataFrame,请使用以下方法:

例如:

with conn_cnx as con:
  with con.cursor() as cur:
    cur.execute("select col1 from table")
    batches = cur.get_result_batches()

    # Get the row from the ResultBatch as a pandas DataFrame.
    dataframe = batches[0].to_pandas()

    # Get the row from the ResultBatch as a PyArrow table.
    table = batches[0].to_arrow()
Copy
语言: 中文