将数据加载到 Apache Iceberg™ 表中¶
Snowflake 支持通过以下选项将数据加载到 Snowflake 管理的Iceberg 表中:
文件格式¶
您可以将数据从支持加载到标准 Snowflake 表中的任何格式的文件加载到 Iceberg 表中。
对于 CSV、JSON、Avro 和 ORC,Snowflake 将非 Parquet 文件格式的数据转换为 Iceberg Parquet 文件,并将数据存储在 Iceberg 表的基础位置。对于这些需要类型转换的文件格式加载场景,仅支持默认 LOAD_MODE = FULL_INGEST
选项。
对于 Apache Parquet 文件,Snowflake 将数据直接加载到表列中,并允许您从以下 LOAD_MODE
选项中进行选择:
FULL_INGEST
:扫描文件并重写 Iceberg 表基础位置下的 Parquet 数据。ADD_FILES_COPY
:二进制将未在 Iceberg 目录中注册的与 Iceberg 兼容的 Apache Parquet 文件复制到 Iceberg 表的基本位置,然后将这些文件注册到 Iceberg 表。
有关更多信息,请参阅 COPY INTO <table>。
示例:加载与 Iceberg 兼容的 Parquet 文件¶
此示例介绍如何创建 Iceberg 表,然后在外部暂存区从与 Iceberg 兼容的 Parquet 数据文件将数据加载到其中。
出于演示目的,此示例使用以下资源:
名为
iceberg_ingest_vol
的外部卷。要创建外部卷,请参阅 配置外部卷。名为
my_parquet_stage
的外部暂存区,其中包含与 Iceberg 兼容的 Parquet 文件。要创建外部暂存区,请参阅 CREATE STAGE。
使用复制与 Iceberg 兼容的 Parquet 数据所需的配置,创建一个描述暂存 Parquet 文件的文件格式对象 (
TYPE = PARQUET USE_VECTORIZED_SCANNER = TRUE
):CREATE OR REPLACE FILE FORMAT my_parquet_format TYPE = PARQUET USE_VECTORIZED_SCANNER = TRUE;
创建 Snowflake 管理的Iceberg 表,并定义包含与源 Parquet 文件数据类型兼容的数据类型的列:
CREATE OR REPLACE ICEBERG TABLE customer_iceberg_ingest ( c_custkey INTEGER, c_name STRING, c_address STRING, c_nationkey INTEGER, c_phone STRING, c_acctbal INTEGER, c_mktsegment STRING, c_comment STRING ) CATALOG = 'SNOWFLAKE' EXTERNAL_VOLUME = 'iceberg_ingest_vol' BASE_LOCATION = 'customer_iceberg_ingest/';
备注
示例语句指定映射到 Snowflake 数据类型的 Iceberg 数据类型。有关更多信息,请参阅 Apache Iceberg™ 表的数据类型。
使用 COPY INTO 语句将暂存的 Parquet 文件(直接位于暂存区 URL 路径下)中的数据加载到 Iceberg 表中:
COPY INTO customer_iceberg_ingest FROM @my_parquet_stage FILE_FORMAT = 'my_parquet_format' LOAD_MODE = ADD_FILES_COPY PURGE = TRUE MATCH_BY_COLUMN_NAME = CASE_SENSITIVE;
备注
该示例指定
LOAD_MODE = ADD_FILES_COPY
,它告诉 Snowflake 将文件复制到外部卷位置,然后将文件注册到表中。此选项可避免文件费用,因为 Snowflake 不会扫描源 Parquet 文件并将数据重写到新的 Parquet 文件中。
输出:
+---------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+ | file | status | rows_parsed | rows_loaded | error_limit | errors_seen | first_error | first_error_line | first_error_character | first_error_column_name | |---------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------| | my_parquet_stage/snow_af9mR2HShTY_AABspxOVwhc_0_1_008.parquet | LOADED | 15000 | 15000 | 0 | 0 | NULL | NULL | NULL | NULL | | my_parquet_stage/snow_af9mR2HShTY_AABspxOVwhc_0_1_006.parquet | LOADED | 15000 | 15000 | 0 | 0 | NULL | NULL | NULL | NULL | | my_parquet_stage/snow_af9mR2HShTY_AABspxOVwhc_0_1_005.parquet | LOADED | 15000 | 15000 | 0 | 0 | NULL | NULL | NULL | NULL | | my_parquet_stage/snow_af9mR2HShTY_AABspxOVwhc_0_1_002.parquet | LOADED | 5 | 5 | 0 | 0 | NULL | NULL | NULL | NULL | | my_parquet_stage/snow_af9mR2HShTY_AABspxOVwhc_0_1_010.parquet | LOADED | 15000 | 15000 | 0 | 0 | NULL | NULL | NULL | NULL | +---------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
查询表:
SELECT c_custkey, c_name, c_mktsegment FROM customer_iceberg_ingest LIMIT 10;
输出:
+-----------+--------------------+--------------+ | C_CUSTKEY | C_NAME | C_MKTSEGMENT | |-----------+--------------------+--------------| | 75001 | Customer#000075001 | FURNITURE | | 75002 | Customer#000075002 | FURNITURE | | 75003 | Customer#000075003 | MACHINERY | | 75004 | Customer#000075004 | AUTOMOBILE | | 75005 | Customer#000075005 | FURNITURE | | 1 | Customer#000000001 | BUILDING | | 2 | Customer#000000002 | AUTOMOBILE | | 3 | Customer#000000003 | AUTOMOBILE | | 4 | Customer#000000004 | MACHINERY | | 5 | Customer#000000005 | HOUSEHOLD | +-----------+--------------------+--------------+
示例:将与 Iceberg 兼容的 Parquet 文件加载到使用 INFER_SCHEMA 函数创建的表中¶
此示例介绍如何执行以下操作:
使用 INFER_SCHEMA 函数创建 Apache Iceberg™ 表。
从外部暂存区上与 Iceberg 兼容的 Parquet 数据文件中加载数据。
出于演示目的,此示例使用以下资源:
名为
iceberg_ingest_vol
的外部卷。要创建外部卷,请参阅 配置外部卷。名为
my_parquet_stage
的外部暂存区,其中包含与 Iceberg 兼容的 Parquet 文件。要创建外部暂存区,请参阅 CREATE STAGE。
使用复制与 Iceberg 兼容的 Parquet 数据所需的配置,创建一个描述暂存 Parquet 文件的文件格式对象 (
TYPE = PARQUET USE_VECTORIZED_SCANNER = TRUE
):CREATE OR REPLACE FILE FORMAT my_parquet_format TYPE = PARQUET USE_VECTORIZED_SCANNER = TRUE;
检索
my_parquet_stage
暂存区中 Parquet 文件的列定义:SELECT * FROM TABLE( INFER_SCHEMA( LOCATION=>'@my_parquet_stage/customer_iceberg/files-to-ingest/' , FILE_FORMAT=>'my_parquet_format' , KIND => 'ICEBERG' ) );
输出:
+-------------+---------+----------+---------------------+------------------------------------------------------+----------+ | COLUMN_NAME | TYPE | NULLABLE | EXPRESSION | FILENAMES | ORDER_ID | |-------------+---------+----------+---------------------+------------------------------------------------------|----------+ | id | INT | False | $1:id::INT | customer_iceberg/files-to-ingest/customers.parquet | 0 | | custnum | INT | False | $1:custnum::INT | customer_iceberg/files-to-ingest/customers.parquet | 1 | +-------------+---------+----------+---------------------+------------------------------------------------------+----------+
使用检测到的架构创建 Iceberg 表。
CREATE ICEBERG TABLE myicebergtable USING TEMPLATE ( SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*)) WITHIN GROUP (ORDER BY order_id) FROM TABLE( INFER_SCHEMA( LOCATION=>'@my_parquet_stage/customer_iceberg/files-to-ingest/', FILE_FORMAT=>'my_parquet_format', KIND => 'ICEBERG' ) )) ... {rest of the ICEBERG options} ;
备注
如果返回的结果大于 16MB,则在
ARRAY_AGG(OBJECT_CONSTRUCT())
中使用*
可能会导致错误。对于较大的结果集,我们建议避免使用*
,只使用必需的列、COLUMN NAME
、TYPE
和:code:NULLABLE
进行查询。使用ORDER_ID
时可以包含可选列WITHIN GROUP (ORDER BY order_id)
。使用 COPY INTO 语句将暂存的 Parquet 文件中的数据加载到 Iceberg 表中:
COPY INTO myicebergtable FROM @my_parquet_stage/customer_iceberg/files-to-ingest/ FILE_FORMAT = 'my_parquet_format' LOAD_MODE = ADD_FILES_COPY MATCH_BY_COLUMN_NAME = CASE_SENSITIVE;
备注
该示例指定
LOAD_MODE = ADD_FILES_COPY
,它告诉 Snowflake 将文件复制到外部卷位置,然后将文件注册到表中。此选项可避免文件费用,因为 Snowflake 不会扫描源 Parquet 文件并将数据重写到新的 Parquet 文件中。
输出:
+---------------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+ | file | status | rows_parsed | rows_loaded | error_limit | errors_seen | first_error | first_error_line | first_error_character | first_error_column_name | |---------------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------| | my_parquet_stage/customer_iceberg/files-to-ingest/customers.parquet | LOADED | 15000 | 15000 | 0 | 0 | NULL | NULL | NULL | NULL | +---------------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
加载数据后,查询表:
SELECT id, custnum FROM myicebergtable LIMIT 10;
输出:
+-----------+---------+ | id | custnum | |-----------+---------+ | 1 | 75001 | | 2 | 75002 | | 3 | 75003 | | 4 | 75004 | | 5 | 75005 | | 6 | 75006 | | 7 | 75007 | | 8 | 75008 | | 9 | 75009 | | 10 | 75010 | +-----------+---------+