在加载期间转换数据¶
Snowflake 支持在使用 COPY INTO <table> 命令将数据加载到表中时转换数据,从而极大地简化了基本转换的 ETL 管道。此功能可帮助您在数据加载期间对列进行重新排序时,避免使用临时表来存储预转换的数据。此功能适用于批量加载和 Snowpipe。
COPY 命令支持:
使用 SELECT 语句进行列重新排序、列省略和数据类型转换。您的数据文件不需要与目标表具有相同的列数和顺序。
ENFORCE_LENGTH | TRUNCATECOLUMNS 选项,它可以截断超过目标列长度的文本字符串。
有关查询暂存数据文件的一般信息,请参阅 查询暂存文件中的数据。
本主题内容:
使用说明¶
本部分提供在加载期间转换暂存数据文件的使用信息。
支持的文件格式¶
COPY 转换支持以下文件格式类型:
CSV
JSON
Avro
ORC
Parquet
XML
要解析暂存数据文件,需要描述其文件格式:
- CSV:
默认格式是字符分隔的 UTF-8 文本。默认字段分隔符是逗号字符 (
,
)。默认记录分隔符是换行符。如果源数据采用其他格式,请指定文件格式类型和选项。查询暂存数据文件时,
ERROR_ON_COLUMN_COUNT_MISMATCH
选项被忽略。您的数据文件不需要与目标表具有相同的列数和顺序。- 所有其他文件格式类型:
指定与您的数据文件匹配的格式类型和选项。
要显式指定文件格式选项,请通过以下方式之一进行设置:
使用 SELECT 语句查询暂存数据文件: |
|
使用 COPY INTO <table> 语句从暂存数据文件加载列: |
|
支持的函数¶
Snowflake 目前支持 COPY 转换的以下函数子集:
-
注意,当使用此函数显式强制转换值时,既不应用 DATE_FORMAT 文件格式选项,也不应用 DATE_INPUT_FORMAT 参数。
-
注意,当使用此函数显式强制转换值时,既不应用 TIME_FORMAT 文件格式选项,也不应用 TIME_INPUT_FORMAT 参数。
-
注意,当使用此函数显式强制转换值时,既不应用 TIMESTAMP_FORMAT 文件格式选项,也不应用 TIMESTAMP_INPUT_FORMAT 参数。
-
请注意,COPY INTO <table> 命令不支持此函数的可选
format
实参。 -
请注意,COPY INTO <table> 命令不支持此函数的可选
format
实参。 TRY_TO_TIMESTAMP / TRY_TO_TIMESTAMP_*
请注意,COPY INTO <table> 命令不支持此函数的可选
format
实参。
请特别注意,VALIDATE 函数会忽略 COPY INTO <table> 语句中的 SELECT 列表。该函数解析语句中引用的文件,并返回任何解析错误。如果您期望函数在 COPY INTO <table> 表达式的上下文中评估文件,这种行为可能令人惊讶。
请注意,COPY 转换 不 支持 FLATTEN 函数、JOIN 或者 GROUP BY (汇总)语法。
支持的函数列表可能会随着时间的推移而扩展。
还支持以下类别的函数:
标量 SQL UDFs。
备注
对于标量 SQL UDFs,Snowflake 对转换错误处理的支持有限,您可能会遇到不一致或意外的 ON_ERROR 复制选项行为。
筛选结果¶
不支持使用 WHERE 子句筛选 FROM 子句的结果。也不支持 SELECT 语句中的 ORDER BY、 LIMIT、FETCH、TOP 关键字。
不完全支持 SELECT 语句中的 DISTINCT 关键字。指定关键字可能会导致不一致或意外的 ON_ERROR 复制选项行为。
VALIDATION_MODE 参数¶
VALIDATION_MODE 参数不支持在加载期间转换数据的 COPY 语句。
CURRENT_TIME、CURRENT_TIMESTAMP 默认列值¶
我们建议您查询 METADATA$START_SCAN_TIME 以获得记录加载的准确时间值,而不是使用 CURRENT_TIME、CURRENT_TIMESTAMP 默认列值来获取加载时间。有关更多信息,请参阅 查询暂存文件的元数据。
MATCH_BY_COLUMN_NAME 复制选项¶
在所有情况下,都不得同时使用 MATCH_BY_COLUMN_NAME 复制选项和 SELECT 语句来在加载期间转换数据。这两个选项仍然可以单独使用,但不能一起使用。如果尝试一起使用,则会导致以下错误:SQL compilation error: match_by_column_name is not supported with copy transform
。
转换 CSV 数据¶
加载表数据的子集¶
将数据子集加载到表中。对于任何缺失的列,Snowflake 都会插入默认值。以下示例从暂存 CSV 文件的第 1、2、6 和 7 列加载数据:
copy into home_sales(city, zip, sale_date, price) from (select t.$1, t.$2, t.$6, t.$7 from @mystage/sales.csv.gz t) FILE_FORMAT = (FORMAT_NAME = mycsvformat);
在加载期间重新排序 CSV 列¶
以下示例在将暂存 CSV 文件加载到表中之前对列数据重新排序。此外,COPY 语句使用 SUBSTR、SUBSTRING 函数在插入字符串之前删除该字符串的前几个字符:
copy into home_sales(city, zip, sale_date, price) from (select SUBSTR(t.$2,4), t.$1, t.$5, t.$4 from @mystage t) FILE_FORMAT = (FORMAT_NAME = mycsvformat);
在加载期间转换数据类型¶
在数据加载期间将暂存数据转换为其他数据类型。支持所有 转换函数。
例如,分别使用 TO_BINARY、TO_DECIMAL、TO_NUMBER、TO_NUMERIC 和 TO_TIMESTAMP / TO_TIMESTAMP_* 函数将字符串转换为二进制值、小数或时间戳。
样本 CSV 文件:
snowflake,2.8,2016-10-5 warehouse,-12.3,2017-01-23
SQL 语句:
-- Stage a data file in the internal user stage PUT file:///tmp/datafile.csv @~; -- Query the staged data file select t.$1,t.$2,t.$3 from @~/datafile.csv.gz t; -- Create the target table create or replace table casttb ( col1 binary, col2 decimal, col3 timestamp_ntz ); -- Convert the staged CSV column data to the specified data types before loading it into the destination table copy into casttb(col1, col2, col3) from ( select to_binary(t.$1, 'utf-8'),to_decimal(t.$2, '99.9', 9, 5),to_timestamp_ntz(t.$3) from @~/datafile.csv.gz t ) file_format = (type = csv); -- Query the target table select * from casttb; +--------------------+------+-------------------------+ | COL1 | COL2 | COL3 | |--------------------+------+-------------------------| | 736E6F77666C616B65 | 3 | 2016-10-05 00:00:00.000 | | 77617265686F757365 | -12 | 2017-01-23 00:00:00.000 | +--------------------+------+-------------------------+
在加载的数据中包含序列列¶
使用 CREATE SEQUENCE 创建序列对象。当使用 COPY 命令将数据加载到表中时,使用 NEXTVAL
表达式访问对象,以便对目标数字列中的数据进行排序。有关在查询中使用序列的更多信息,请参阅 使用序列。
-- Create a sequence create sequence seq1; -- Create the target table create or replace table mytable ( col1 number default seq1.nextval, col2 varchar, col3 varchar ); -- Stage a data file in the internal user stage PUT file:///tmp/myfile.csv @~; -- Query the staged data file select $1, $2 from @~/myfile.csv.gz t; +-----+-----+ | $1 | $2 | |-----+-----| | abc | def | | ghi | jkl | | mno | pqr | | stu | vwx | +-----+-----+ -- Include the sequence nextval expression in the COPY statement copy into mytable (col1, col2, col3) from ( select seq1.nextval, $1, $2 from @~/myfile.csv.gz t ) ; select * from mytable; +------+------+------+ | COL1 | COL2 | COL3 | |------+------+------| | 1 | abc | def | | 2 | ghi | jkl | | 3 | mno | pqr | | 4 | stu | vwx | +------+------+------+
在加载的数据中包含 AUTOINCREMENT 列或 IDENTITY 列¶
设置数字列的 AUTOINCREMENT 或 IDENTITY 默认值。使用 COPY 命令将数据加载到表中时,会省略 SELECT 语句中的列。语句会自动填充该列。
-- Create the target table create or replace table mytable ( col1 number autoincrement start 1 increment 1, col2 varchar, col3 varchar ); -- Stage a data file in the internal user stage PUT file:///tmp/myfile.csv @~; -- Query the staged data file select $1, $2 from @~/myfile.csv.gz t; +-----+-----+ | $1 | $2 | |-----+-----| | abc | def | | ghi | jkl | | mno | pqr | | stu | vwx | +-----+-----+ -- Omit the sequence column in the COPY statement copy into mytable (col2, col3) from ( select $1, $2 from @~/myfile.csv.gz t ) ; select * from mytable; +------+------+------+ | COL1 | COL2 | COL3 | |------+------+------| | 1 | abc | def | | 2 | ghi | jkl | | 3 | mno | pqr | | 4 | stu | vwx | +------+------+------+
转换半结构化数据¶
本部分中的示例适用于任何半结构化数据类型,除非另有说明。
将半结构化数据加载到单独的列中¶
以下示例将重复元素从暂存半结构化文件加载到具有不同数据类型的单独表列中。
此示例将以下半结构化数据加载到关系表中的单独列中,location
对象值加载到 VARIANT 列中,其余值加载到关系列中:
-- Sample data:
{"location": {"city": "Lexington","zip": "40503"},"dimensions": {"sq_ft": "1000"},"type": "Residential","sale_date": "4-25-16","price": "75836"},
{"location": {"city": "Belmont","zip": "02478"},"dimensions": {"sq_ft": "1103"},"type": "Residential","sale_date": "6-18-16","price": "92567"},
{"location": {"city": "Winchester","zip": "01890"},"dimensions": {"sq_ft": "1122"},"type": "Condo","sale_date": "1-31-16","price": "89921"}
以下 SQL 语句从内部暂存区 mystage
加载文件 sales.json
:
备注
此示例加载 JSON 数据,但在加载其他类型(例如 Avro、 ORC 等)的半结构化数据时,SQL 语句是相似的。
有关使用 Parquet 数据的其他示例,请参阅 将 Parquet 数据加载到单独的列中 (本主题内容)。
-- Create an internal stage with the file type set as JSON.
CREATE OR REPLACE STAGE mystage
FILE_FORMAT = (TYPE = 'json');
-- Stage a JSON data file in the internal stage.
PUT file:///tmp/sales.json @mystage;
-- Query the staged data. The data file comprises three objects in NDJSON format.
SELECT t.$1 FROM @mystage/sales.json.gz t;
+------------------------------+
| $1 |
|------------------------------|
| { |
| "dimensions": { |
| "sq_ft": "1000" |
| }, |
| "location": { |
| "city": "Lexington", |
| "zip": "40503" |
| }, |
| "price": "75836", |
| "sale_date": "2022-08-25", |
| "type": "Residential" |
| } |
| { |
| "dimensions": { |
| "sq_ft": "1103" |
| }, |
| "location": { |
| "city": "Belmont", |
| "zip": "02478" |
| }, |
| "price": "92567", |
| "sale_date": "2022-09-18", |
| "type": "Residential" |
| } |
| { |
| "dimensions": { |
| "sq_ft": "1122" |
| }, |
| "location": { |
| "city": "Winchester", |
| "zip": "01890" |
| }, |
| "price": "89921", |
| "sale_date": "2022-09-23", |
| "type": "Condo" |
| } |
+------------------------------+
-- Create a target table for the data.
CREATE OR REPLACE TABLE home_sales (
CITY VARCHAR,
POSTAL_CODE VARCHAR,
SQ_FT NUMBER,
SALE_DATE DATE,
PRICE NUMBER
);
-- Copy elements from the staged file into the target table.
COPY INTO home_sales(city, postal_code, sq_ft, sale_date, price)
FROM (select
$1:location.city::varchar,
$1:location.zip::varchar,
$1:dimensions.sq_ft::number,
$1:sale_date::date,
$1:price::number
FROM @mystage/sales.json.gz t);
-- Query the target table.
SELECT * from home_sales;
+------------+-------------+-------+------------+-------+
| CITY | POSTAL_CODE | SQ_FT | SALE_DATE | PRICE |
|------------+-------------+-------+------------+-------|
| Lexington | 40503 | 1000 | 2022-08-25 | 75836 |
| Belmont | 02478 | 1103 | 2022-09-18 | 92567 |
| Winchester | 01890 | 1122 | 2022-09-23 | 89921 |
+------------+-------------+-------+------------+-------+
将 Parquet 数据加载到单独的列中¶
与前面的示例类似,但从 Parquet 格式的文件中加载半结构化数据。此示例是为熟悉 Apache Parquet 的用户提供的:
-- Create a file format object that sets the file format type. Accept the default options. create or replace file format my_parquet_format type = 'parquet'; -- Create an internal stage and specify the new file format create or replace temporary stage mystage file_format = my_parquet_format; -- Create a target table for the data. create or replace table parquet_col ( custKey number default NULL, orderDate date default NULL, orderStatus varchar(100) default NULL, price varchar(255) ); -- Stage a data file in the internal stage put file:///tmp/mydata.parquet @mystage; -- Copy data from elements in the staged Parquet file into separate columns -- in the target table. -- Note that all Parquet data is stored in a single column ($1) -- SELECT list items correspond to element names in the Parquet file -- Cast element values to the target column data type copy into parquet_col from (select $1:o_custkey::number, $1:o_orderdate::date, $1:o_orderstatus::varchar, $1:o_totalprice::varchar from @mystage/mydata.parquet); -- Query the target table SELECT * from parquet_col; +---------+------------+-------------+-----------+ | CUSTKEY | ORDERDATE | ORDERSTATUS | PRICE | |---------+------------+-------------+-----------| | 27676 | 1996-09-04 | O | 83243.94 | | 140252 | 1994-01-09 | F | 198402.97 | ... +---------+------------+-------------+-----------+
展平半结构化数据¶
FLATTEN 是一个表函数,用于生成 VARIANT、OBJECT 或者 ARRAY 列的横向视图。使用 将半结构化数据加载到单独的列中 中的示例数据,为对象中的每个元素创建一个具有单独行的表。
-- Create an internal stage with the file delimiter set as none and the record delimiter set as the new line character
create or replace stage mystage
file_format = (type = 'json');
-- Stage a JSON data file in the internal stage with the default values
put file:///tmp/sales.json @mystage;
-- Create a table composed of the output from the FLATTEN function
create or replace table flattened_source
(seq string, key string, path string, index string, value variant, element variant)
as
select
seq::string
, key::string
, path::string
, index::string
, value::variant
, this::variant
from @mystage/sales.json.gz
, table(flatten(input => parse_json($1)));
select * from flattened_source;
+-----+-----------+-----------+-------+-------------------------+-----------------------------+
| SEQ | KEY | PATH | INDEX | VALUE | ELEMENT |
|-----+-----------+-----------+-------+-------------------------+-----------------------------|
| 1 | location | location | NULL | { | { |
| | | | | "city": "Lexington", | "location": { |
| | | | | "zip": "40503" | "city": "Lexington", |
| | | | | } | "zip": "40503" |
| | | | | | }, |
| | | | | | "price": "75836", |
| | | | | | "sale_date": "2017-3-5", |
| | | | | | "sq__ft": "1000", |
| | | | | | "type": "Residential" |
| | | | | | } |
...
| 3 | type | type | NULL | "Condo" | { |
| | | | | | "location": { |
| | | | | | "city": "Winchester", |
| | | | | | "zip": "01890" |
| | | | | | }, |
| | | | | | "price": "89921", |
| | | | | | "sale_date": "2017-3-21", |
| | | | | | "sq__ft": "1122", |
| | | | | | "type": "Condo" |
| | | | | | } |
+-----+-----------+-----------+-------+-------------------------+-----------------------------+
将半结构化元素拆分并作为 VARIANT 值加载到单独的列中¶
按照 将半结构化数据加载到单独的列中 的说明操作,您可以将半结构化数据中的各个元素加载到目标表的不同列中。此外,您可以使用 SPLIT 函数拆分包含分隔符的元素值并将它们作为数组加载。
例如,将重复元素中的 IP 地址按点分隔符进行拆分。将 IP 地址作为数组加载到单独的列中:
-- Create an internal stage with the file delimiter set as none and the record delimiter set as the new line character create or replace stage mystage file_format = (type = 'json'); -- Stage a semi-structured data file in the internal stage put file:///tmp/ipaddress.json @mystage auto_compress=true; -- Query the staged data select t.$1 from @mystage/ipaddress.json.gz t; +----------------------------------------------------------------------+ | $1 | |----------------------------------------------------------------------| | {"ip_address": {"router1": "192.168.1.1","router2": "192.168.0.1"}}, | | {"ip_address": {"router1": "192.168.2.1","router2": "192.168.3.1"}} | +----------------------------------------------------------------------+ -- Create a target table for the semi-structured data create or replace table splitjson ( col1 array, col2 array ); -- Split the elements into individual arrays using the SPLIT function and load them into separate columns -- Note that all JSON data is stored in a single column ($1) copy into splitjson(col1, col2) from ( select split($1:ip_address.router1, '.'),split($1:ip_address.router2, '.') from @mystage/ipaddress.json.gz t ); -- Query the target table select * from splitjson; +----------+----------+ | COL1 | COL2 | |----------+----------| | [ | [ | | "192", | "192", | | "168", | "168", | | "1", | "0", | | "1" | "1" | | ] | ] | | [ | [ | | "192", | "192", | | "168", | "168", | | "2", | "3", | | "1" | "1" | | ] | ] | +----------+----------+