教程:Snowflake JSON 基础知识¶
简介¶
在本教程中,您将学习将 JSON 与 Snowflake 一起使用的基础知识。
您将学习的内容¶
在本教程中,您将学习如何执行以下操作:
将示例 JSON 数据从公共 S3 桶上传到 Snowflake 表中
variant
类型的列中。测试表中 JSON 数据的简单查询。
探索 FLATTEN 函数,将 JSON 数据展平为关系表示并保存在另一个表中。
探索在展平版本的数据中插入行时确保唯一性的方法。
先决条件¶
本教程假设如下:
您有一个配置为使用Amazon AWS 的 Snowflake 账户,以及具有可以授予创建数据库、表和虚拟仓库对象的必要权限的角色用户。
您已经安装了 SnowSQL(CLI 客户端)。
20 分钟学会使用 Snowflake 教程提供满足这些要求的相关分步说明。
Snowflake 在公共 S3 桶中提供示例数据文件,供本教程使用。但是在开始之前,您需要为本教程创建数据库、表、虚拟仓库和外部暂存区。这些是大多数 Snowflake 活动所需的基本 Snowflake 对象。
关于示例数据文件¶
在本教程中,您将使用公共 S3 桶中提供的以下示例应用事件 JSON 数据。
{
"device_type": "server",
"events": [
{
"f": 83,
"rv": "15219.64,783.63,48674.48,84679.52,27499.78,2178.83,0.42,74900.19",
"t": 1437560931139,
"v": {
"ACHZ": 42869,
"ACV": 709489,
"DCA": 232,
"DCV": 62287,
"ENJR": 2599,
"ERRS": 205,
"MXEC": 487,
"TMPI": 9
},
"vd": 54,
"z": 1437644222811
},
{
"f": 1000083,
"rv": "8070.52,54470.71,85331.27,9.10,70825.85,65191.82,46564.53,29422.22",
"t": 1437036965027,
"v": {
"ACHZ": 6953,
"ACV": 346795,
"DCA": 250,
"DCV": 46066,
"ENJR": 9033,
"ERRS": 615,
"MXEC": 0,
"TMPI": 112
},
"vd": 626,
"z": 1437660796958
}
],
"version": 2.6
}
数据代表应用程序上传到 S3 的示例事件。服务器、手机和浏览器等各种设备和应用程序都会发布事件。在常见的数据收集场景中,可扩展的 Web 端点收集来自不同来源的 POSTed 数据并将其写入队列系统。然后,引入服务/实用程序将数据写入 S3 桶,您可以从中将数据加载到 Snowflake 中。
示例数据说明以下概念:
应用程序可以选择对事件进行批量分组。批次是一个容器,用于保存批次中所有事件共有的标头信息。例如,前面的 JSON 是一批包含共同标题信息的两个事件:
device_type
和version
生成了这些事件。Amazon S3 支持使用文件夹概念来整理桶。应用程序可以利用此功能对事件数据进行分区。分区架构通常会识别详细信息,例如生成事件的应用程序或位置,以及将其写入 S3 时的事件日期。这样的分区架构使您能够使用单个 COPY 命令将分区数据的任何一部分复制到 Snowflake。例如,在最初填充表时,您可以按小时、数据、月或年复制事件数据。
例如:
s3://bucket_name/application_a/2016/07/01/11/
s3://bucket_name/application_b/location_c/2016/07/01/14/
注意,
application_a
、application_b
、location_c
等会识别路径中所有数据源的详细信息。数据可按编写日期整理。可选的 24 小时目录减少了每个目录中的数据量。备注
S3 传输 Snowflake 使用的每个 COPY 语句的目录列表,因此减少每个目录中的文件数量可以提高 COPY 语句的表现。您甚至可以考虑每小时创建 10-15 分钟的增量文件夹。
S3 桶中提供的示例数据使用类似的分区架构。在 COPY 命令中,您将指定用于复制事件数据的特定文件夹路径。
创建数据库、表、仓库和外部暂存区¶
执行以下语句来创建本教程所需的数据库、表、虚拟仓库和外部暂存区。完成本教程后,您可以删除这些对象。
CREATE OR REPLACE DATABASE mydatabase; USE SCHEMA mydatabase.public; CREATE OR REPLACE TABLE raw_source ( SRC VARIANT); CREATE OR REPLACE WAREHOUSE mywarehouse WITH WAREHOUSE_SIZE='X-SMALL' AUTO_SUSPEND = 120 AUTO_RESUME = TRUE INITIALLY_SUSPENDED=TRUE; USE WAREHOUSE mywarehouse; CREATE OR REPLACE STAGE my_stage URL = 's3://snowflake-docs/tutorials/json';
请注意以下事项:
CREATE DATABASE
语句创建一个数据库。数据库自动包含一个名为“public”的架构。USE SCHEMA
语句为当前用户会话指定活动数据库和架构。现在,您可以通过指定数据库在此数据库中执行工作,而不必在每次请求时都提供名称。CREATE TABLE
语句为 JSON 数据创建目标表。CREATE WAREHOUSE
语句创建一个最初暂停的仓库。该语句还将 AUTO_RESUME 设置为 true,当您执行需要计算资源的 SQL 语句时,它会自动启动仓库。USE WAREHOUSE
语句会将您创建的仓库指定为当前用户会话的活动仓库。CREATE STAGE
语句创建了一个外部暂存区,该暂存区指向包含本教程示例文件的 S3 桶。
第 1 步:将数据复制到目标表¶
执行 COPY INTO <table> 可将暂存数据加载到目标 RAW_SOURCE
表中。
COPY INTO raw_source
FROM @my_stage/server/2.6/2016/07/15/15
FILE_FORMAT = (TYPE = JSON);
该命令将所有新数据从外部暂存区的指定路径复制到目标 RAW_SOURCE
表。在此示例中,指定路径的目标是 2016 年 7 月 15 日 15 时 (3 PM) 写入的数据。请注意,Snowflake 会检查每个文件的 S3 ETag 值,以确保该值仅复制一次。
执行 SELECT 查询可验证数据是否已成功复制。
SELECT * FROM raw_source;
查询返回以下结果:
+-----------------------------------------------------------------------------------+
| SRC |
|-----------------------------------------------------------------------------------|
| { |
| "device_type": "server", |
| "events": [ |
| { |
| "f": 83, |
| "rv": "15219.64,783.63,48674.48,84679.52,27499.78,2178.83,0.42,74900.19", |
| "t": 1437560931139, |
| "v": { |
| "ACHZ": 42869, |
| "ACV": 709489, |
| "DCA": 232, |
| "DCV": 62287, |
| "ENJR": 2599, |
| "ERRS": 205, |
| "MXEC": 487, |
| "TMPI": 9 |
| }, |
| "vd": 54, |
| "z": 1437644222811 |
| }, |
| { |
| "f": 1000083, |
| "rv": "8070.52,54470.71,85331.27,9.10,70825.85,65191.82,46564.53,29422.22", |
| "t": 1437036965027, |
| "v": { |
| "ACHZ": 6953, |
| "ACV": 346795, |
| "DCA": 250, |
| "DCV": 46066, |
| "ENJR": 9033, |
| "ERRS": 615, |
| "MXEC": 0, |
| "TMPI": 112 |
| }, |
| "vd": 626, |
| "z": 1437660796958 |
| } |
| ], |
| "version": 2.6 |
| } |
+-----------------------------------------------------------------------------------+
在此示例 JSON 数据中,有两个事件。 device_type
和 version
键值标识来自特定设备的事件的数据源和版本。
第 2 步:查询数据¶
在本部分中,您将了解用于查询 JSON 数据的 SELECT 语句。
检索
device_type
。SELECT src:device_type FROM raw_source;
查询返回以下结果:
+-----------------+ | SRC:DEVICE_TYPE | |-----------------| | "server" | +-----------------+
查询使用
src:device_type
表示法来指定要检索的列名和 JSON 元素名称。此表示法类似于大家熟悉的 SQLtable.column
表示法。Snowflake 允许您在父列中指定子列,Snowflak 从 JSON 数据中嵌入的架构定义中动态派生子列。有关更多信息,请参阅 查询半结构化数据。备注
列名不区分大小写,但 JSON 元素名称区分大小写。
检索不带引号的
device_type
值。上述查询返回引号中的 JSON 数据值。您可以通过将数据类型转换为特定数据类型(在本例中为字符串)来删除引号。
此查询还可以选择使用别名为列分配名称。
SELECT src:device_type::string AS device_type FROM raw_source;
此查询将返回以下结果:
+-------------+ | DEVICE_TYPE | |-------------| | server | +-------------+
检索嵌套在数组事件对象中的重复
f
键。示例 JSON 数据包括
events
数组。数组中的每个事件对象都具有如下所示的f
字段。{ "device_type": "server", "events": [ { "f": 83, .. } { "f": 1000083, .. } ]}
要检索这些嵌套键,可以使用 FLATTEN 函数。该函数将事件展平为单独的行。
SELECT value:f::number FROM raw_source , LATERAL FLATTEN( INPUT => SRC:events );
查询返回以下结果:
+-----------------+ | VALUE:F::NUMBER | |-----------------| | 83 | | 1000083 | +-----------------+
请注意,
value
是 FLATTEN 函数返回的列之一。下一步骤将提供有关使用 FLATTEN 函数的更多详细信息。
第 3 步:展平数据¶
FLATTEN 是一个表函数,用于生成 VARIANT、OBJECT 或者 ARRAY 列的横向视图。在此步骤中,您将使用此函数来了解不同级别的展平。
将变体列中的数组对象展平¶
您可以使用 FLATTEN
函数,将 events
数组中的事件对象展平为单独的行。函数输出包含一个存储这些单个事件的 VALUE 列。
然后,使用 LATERAL 修饰符将 FLATTEN
函数的输出与对象外部的信息联接起来(在本例中是 device_type
和 version
)。
查询每个事件的数据:
SELECT src:device_type::string, src:version::String, VALUE FROM raw_source, LATERAL FLATTEN( INPUT => SRC:events );
查询返回以下结果:
+-------------------------+---------------------+-------------------------------------------------------------------------------+ | SRC:DEVICE_TYPE::STRING | SRC:VERSION::STRING | VALUE | |-------------------------+---------------------+-------------------------------------------------------------------------------| | server | 2.6 | { | | | | "f": 83, | | | | "rv": "15219.64,783.63,48674.48,84679.52,27499.78,2178.83,0.42,74900.19", | | | | "t": 1437560931139, | | | | "v": { | | | | "ACHZ": 42869, | | | | "ACV": 709489, | | | | "DCA": 232, | | | | "DCV": 62287, | | | | "ENJR": 2599, | | | | "ERRS": 205, | | | | "MXEC": 487, | | | | "TMPI": 9 | | | | }, | | | | "vd": 54, | | | | "z": 1437644222811 | | | | } | | server | 2.6 | { | | | | "f": 1000083, | | | | "rv": "8070.52,54470.71,85331.27,9.10,70825.85,65191.82,46564.53,29422.22", | | | | "t": 1437036965027, | | | | "v": { | | | | "ACHZ": 6953, | | | | "ACV": 346795, | | | | "DCA": 250, | | | | "DCV": 46066, | | | | "ENJR": 9033, | | | | "ERRS": 615, | | | | "MXEC": 0, | | | | "TMPI": 112 | | | | }, | | | | "vd": 626, | | | | "z": 1437660796958 | | | | } | +-------------------------+---------------------+-------------------------------------------------------------------------------+
使用 CREATE TABLE AS SELECT 语句将上述查询结果存储在表中:
CREATE OR REPLACE TABLE flattened_source AS SELECT src:device_type::string AS device_type, src:version::string AS version, VALUE AS src FROM raw_source, LATERAL FLATTEN( INPUT => SRC:events );
查询生成的表。
SELECT * FROM flattened_source;
查询返回以下结果:
+-------------+---------+-------------------------------------------------------------------------------+ | DEVICE_TYPE | VERSION | SRC | |-------------+---------+-------------------------------------------------------------------------------| | server | 2.6 | { | | | | "f": 83, | | | | "rv": "15219.64,783.63,48674.48,84679.52,27499.78,2178.83,0.42,74900.19", | | | | "t": 1437560931139, | | | | "v": { | | | | "ACHZ": 42869, | | | | "ACV": 709489, | | | | "DCA": 232, | | | | "DCV": 62287, | | | | "ENJR": 2599, | | | | "ERRS": 205, | | | | "MXEC": 487, | | | | "TMPI": 9 | | | | }, | | | | "vd": 54, | | | | "z": 1437644222811 | | | | } | | server | 2.6 | { | | | | "f": 1000083, | | | | "rv": "8070.52,54470.71,85331.27,9.10,70825.85,65191.82,46564.53,29422.22", | | | | "t": 1437036965027, | | | | "v": { | | | | "ACHZ": 6953, | | | | "ACV": 346795, | | | | "DCA": 250, | | | | "DCV": 46066, | | | | "ENJR": 9033, | | | | "ERRS": 615, | | | | "MXEC": 0, | | | | "TMPI": 112 | | | | }, | | | | "vd": 626, | | | | "z": 1437660796958 | | | | } | +-------------+---------+-------------------------------------------------------------------------------+
在单独的列中展平对象键¶
在前面的示例中,您将 events
数组中的事件对象展平为单独的行。生成的 flattened_source
表在 VARIANT 类型的 src
列中保留了事件结构。
在 VARIANT 类型的 src
列中保留事件对象的一个优势是:当事件格式发生更改时,您不必重新创建和重新填充此类表。但是,您也可以选择将事件对象中的各个键复制到单独的类型化列中,如以下查询所示。
以下 CREATE TABLE AS SELECT 语句创建一个名为 events
的新表,并将事件对象键存储在单独的列中。每个值都转换为适合该值的数据类型,并使用后跟类型的双冒号 (::)。如果省略类型转换,则列将假定为 VARIANT 数据类型,该数据类型可以保存任何值:
create or replace table events as
select
src:device_type::string as device_type
, src:version::string as version
, value:f::number as f
, value:rv::variant as rv
, value:t::number as t
, value:v.ACHZ::number as achz
, value:v.ACV::number as acv
, value:v.DCA::number as dca
, value:v.DCV::number as dcv
, value:v.ENJR::number as enjr
, value:v.ERRS::number as errs
, value:v.MXEC::number as mxec
, value:v.TMPI::number as tmpi
, value:vd::number as vd
, value:z::number as z
from
raw_source
, lateral flatten ( input => SRC:events );
该语句将 EVENTS.SRC:V 中的嵌套数据展平,并为每个值单独添加一列。该语句为每个键/值对输出一行。以下输出显示新的 events
表中的前两条记录:
SELECT * FROM events;
+-------------+---------+---------+----------------------------------------------------------------------+---------------+-------+--------+-----+-------+------+------+------+------+-----+---------------+
| DEVICE_TYPE | VERSION | F | RV | T | ACHZ | ACV | DCA | DCV | ENJR | ERRS | MXEC | TMPI | VD | Z |
|-------------+---------+---------+----------------------------------------------------------------------+---------------+-------+--------+-----+-------+------+------+------+------+-----+---------------|
| server | 2.6 | 83 | "15219.64,783.63,48674.48,84679.52,27499.78,2178.83,0.42,74900.19" | 1437560931139 | 42869 | 709489 | 232 | 62287 | 2599 | 205 | 487 | 9 | 54 | 1437644222811 |
| server | 2.6 | 1000083 | "8070.52,54470.71,85331.27,9.10,70825.85,65191.82,46564.53,29422.22" | 1437036965027 | 6953 | 346795 | 250 | 46066 | 9033 | 615 | 0 | 112 | 626 | 1437660796958 |
+-------------+---------+---------+----------------------------------------------------------------------+---------------+-------+--------+-----+-------+------+------+------+------+-----+---------------+
第 4 步:更新数据¶
到目前为止,在本教程中,您执行了以下操作:
将样本 JSON 事件数据从 S3 桶复制到
RAW_SOURCE
表中,并探索了简单的查询。您还探索了 FLATTEN 函数来展平 JSON 数据并获得数据的关系表示。例如,您提取了事件键并将这些键存储在另一个 EVENTS 表的单独列中。
本教程首先介绍了多个源生成事件并由 Web 端点将其保存到 S3 桶的应用程序方案。随着新事件被添加到 S3 桶中,您可能会使用脚本将新数据连续复制到 RAW_SOURCE
表中。但是如何在 EVENTS
表中只插入新的事件数据。
有许多方法可以保持数据一致性。本节介绍两个选项。
使用主键列进行比较¶
在本节中,您将向 EVENTS
表添加一个主键。然后,主键保证唯一性。
检查 JSON 数据,看是否有任何值是唯一的,并且是主键的良好候选值。例如,假设
src:device_type
和value:rv
的组合可以是主键。这两个 JSON 键对应于EVENTS
表中的DEVICE_TYPE
和RV
列。备注
Snowflake 不强制执行主键约束。相反,约束用作标识 Information Schema 中的自然键的元数据。
将主键约束添加到
EVENTS
表中:ALTER TABLE events ADD CONSTRAINT pk_DeviceType PRIMARY KEY (device_type, rv);
在
RAW_SOURCE
表中插入新的 JSON 事件记录:insert into raw_source select PARSE_JSON ('{ "device_type": "cell_phone", "events": [ { "f": 79, "rv": "786954.67,492.68,3577.48,40.11,343.00,345.8,0.22,8765.22", "t": 5769784730576, "v": { "ACHZ": 75846, "ACV": 098355, "DCA": 789, "DCV": 62287, "ENJR": 2234, "ERRS": 578, "MXEC": 999, "TMPI": 9 }, "vd": 54, "z": 1437644222811 } ], "version": 3.2 }');
根据主键值的比较,将添加到
RAW_SOURCE
表中的新记录插入到EVENTS
表中:insert into events select src:device_type::string , src:version::string , value:f::number , value:rv::variant , value:t::number , value:v.ACHZ::number , value:v.ACV::number , value:v.DCA::number , value:v.DCV::number , value:v.ENJR::number , value:v.ERRS::number , value:v.MXEC::number , value:v.TMPI::number , value:vd::number , value:z::number from raw_source , lateral flatten( input => src:events ) where not exists (select 'x' from events where events.device_type = src:device_type and events.rv = value:rv);
查询
EVENTS
表将显示添加的行:select * from EVENTS;
查询返回以下结果:
+-------------+---------+---------+----------------------------------------------------------------------+---------------+-------+--------+-----+-------+------+------+------+------+-----+---------------+ | DEVICE_TYPE | VERSION | F | RV | T | ACHZ | ACV | DCA | DCV | ENJR | ERRS | MXEC | TMPI | VD | Z | |-------------+---------+---------+----------------------------------------------------------------------+---------------+-------+--------+-----+-------+------+------+------+------+-----+---------------| | server | 2.6 | 83 | "15219.64,783.63,48674.48,84679.52,27499.78,2178.83,0.42,74900.19" | 1437560931139 | 42869 | 709489 | 232 | 62287 | 2599 | 205 | 487 | 9 | 54 | 1437644222811 | | server | 2.6 | 1000083 | "8070.52,54470.71,85331.27,9.10,70825.85,65191.82,46564.53,29422.22" | 1437036965027 | 6953 | 346795 | 250 | 46066 | 9033 | 615 | 0 | 112 | 626 | 1437660796958 | | cell_phone | 3.2 | 79 | "786954.67,492.68,3577.48,40.11,343.00,345.8,0.22,8765.22" | 5769784730576 | 75846 | 98355 | 789 | 62287 | 2234 | 578 | 999 | 9 | 54 | 1437644222811 | +-------------+---------+---------+----------------------------------------------------------------------+---------------+-------+--------+-----+-------+------+------+------+------+-----+---------------+
使用所有列进行比较¶
如果 JSON 数据没有可以作为主键候选项的字段,您可以将 RAW_SOURCE
表中所有重复的 JSON 键与 EVENTS
表中相应的列值进行比较。
无需对现有 EVENTS
表进行任何更改。
在
RAW_SOURCE
表中插入新的 JSON 事件记录:insert into raw_source select parse_json ('{ "device_type": "web_browser", "events": [ { "f": 79, "rv": "122375.99,744.89,386.99,12.45,78.08,43.7,9.22,8765.43", "t": 5769784730576, "v": { "ACHZ": 768436, "ACV": 9475, "DCA": 94835, "DCV": 88845, "ENJR": 8754, "ERRS": 567, "MXEC": 823, "TMPI": 0 }, "vd": 55, "z": 8745598047355 } ], "version": 8.7 }');
根据对所有重复键值的比较,将
RAW_SOURCE
表中的新记录插入到EVENTS
表中:insert into events select src:device_type::string , src:version::string , value:f::number , value:rv::variant , value:t::number , value:v.ACHZ::number , value:v.ACV::number , value:v.DCA::number , value:v.DCV::number , value:v.ENJR::number , value:v.ERRS::number , value:v.MXEC::number , value:v.TMPI::number , value:vd::number , value:z::number from raw_source , lateral flatten( input => src:events ) where not exists (select 'x' from events where events.device_type = src:device_type and events.version = src:version and events.f = value:f and events.rv = value:rv and events.t = value:t and events.achz = value:v.ACHZ and events.acv = value:v.ACV and events.dca = value:v.DCA and events.dcv = value:v.DCV and events.enjr = value:v.ENJR and events.errs = value:v.ERRS and events.mxec = value:v.MXEC and events.tmpi = value:v.TMPI and events.vd = value:vd and events.z = value:z);
查询
EVENTS
表将显示添加的行:select * from EVENTS;
查询返回以下结果:
+-------------+---------+---------+----------------------------------------------------------------------+---------------+--------+--------+-------+-------+------+------+------+------+-----+---------------+ | DEVICE_TYPE | VERSION | F | RV | T | ACHZ | ACV | DCA | DCV | ENJR | ERRS | MXEC | TMPI | VD | Z | |-------------+---------+---------+----------------------------------------------------------------------+---------------+--------+--------+-------+-------+------+------+------+------+-----+---------------| | server | 2.6 | 83 | "15219.64,783.63,48674.48,84679.52,27499.78,2178.83,0.42,74900.19" | 1437560931139 | 42869 | 709489 | 232 | 62287 | 2599 | 205 | 487 | 9 | 54 | 1437644222811 | | server | 2.6 | 1000083 | "8070.52,54470.71,85331.27,9.10,70825.85,65191.82,46564.53,29422.22" | 1437036965027 | 6953 | 346795 | 250 | 46066 | 9033 | 615 | 0 | 112 | 626 | 1437660796958 | | cell_phone | 3.2 | 79 | "786954.67,492.68,3577.48,40.11,343.00,345.8,0.22,8765.22" | 5769784730576 | 75846 | 98355 | 789 | 62287 | 2234 | 578 | 999 | 9 | 54 | 1437644222811 | | web_browser | 8.7 | 79 | "122375.99,744.89,386.99,12.45,78.08,43.7,9.22,8765.43" | 5769784730576 | 768436 | 9475 | 94835 | 88845 | 8754 | 567 | 823 | 0 | 55 | 8745598047355 | +-------------+---------+---------+----------------------------------------------------------------------+---------------+--------+--------+-------+-------+------+------+------+------+-----+---------------+
第 5 步:恭喜¶
恭喜,您已成功完成本教程。
教程关键点¶
通过使用逻辑粒度路径对 S3 桶中的事件数据进行分区,您可以使用单个命令将分区数据的子集复制到 Snowflake 中。
Snowflake 的
column:key
表示法与为人熟悉的 SQLtable.column
表示法很相似,允许您有效地查询列中的列(即子列),该子列是根据 JSON 数据中嵌入的架构定义动态派生的。FLATTEN 函数允许您将 JSON 数据解析为不同的列。
可以使用多个选项根据与暂存数据文件的比较结果来更新表数据。
教程清理(可选)¶
执行以下 DROP <object> 命令,将系统恢复到教程开始前的状态:
DROP DATABASE IF EXISTS mydatabase; DROP WAREHOUSE IF EXISTS mywarehouse;
删除数据库会自动移除所有子数据库对象,例如表。