连续数据管道示例¶
本主题提供了数据管道用例的实际示例。
本主题内容:
先决条件¶
在这些示例中用于执行 SQL 语句的角色需要以下访问控制权限:
EXECUTE TASK
运行任务的全局 EXECUTE TASK 权限
USAGE
执行 SQL 语句的数据库和模式以及运行这些示例中任何任务的仓库的 USAGE 权限。
CREATE object
执行 SQL 语句的模式上的各种
CREATE object
权限,用于创建表、流和任务等对象。
有关 Snowflake 中访问控制的详细信息,请参阅 访问控制概述。
按计划转换加载的 JSON 数据¶
下面的示例将原始 JSON 数据加载到名为 raw
的单一登陆表中。两个任务查询在 raw
表上创建的表流,并将行子集插入多个表中。由于每个任务都会消耗表流中的变更数据获取记录,因此需要多个表流。
-- Create a landing table to store raw JSON data.
-- Snowpipe could load data into this table.
create or replace table raw (var variant);
-- Create a stream to capture inserts to the landing table.
-- A task will consume a set of columns from this stream.
create or replace stream rawstream1 on table raw;
-- Create a second stream to capture inserts to the landing table.
-- A second task will consume another set of columns from this stream.
create or replace stream rawstream2 on table raw;
-- Create a table that stores the names of office visitors identified in the raw data.
create or replace table names (id int, first_name string, last_name string);
-- Create a table that stores the visitation dates of office visitors identified in the raw data.
create or replace table visits (id int, dt date);
-- Create a task that inserts new name records from the rawstream1 stream into the names table
-- every minute when the stream contains records.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task raw_to_names
warehouse = mywh
schedule = '1 minute'
when
system$stream_has_data('rawstream1')
as
merge into names n
using (select var:id id, var:fname fname, var:lname lname from rawstream1) r1 on n.id = to_number(r1.id)
when matched then update set n.first_name = r1.fname, n.last_name = r1.lname
when not matched then insert (id, first_name, last_name) values (r1.id, r1.fname, r1.lname)
;
-- Create another task that merges visitation records from the rawstream2 stream into the visits table
-- every minute when the stream contains records.
-- Records with new IDs are inserted into the visits table;
-- Records with IDs that exist in the visits table update the DT column in the table.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task raw_to_visits
warehouse = mywh
schedule = '1 minute'
when
system$stream_has_data('rawstream2')
as
merge into visits v
using (select var:id id, var:visit_dt visit_dt from rawstream2) r2 on v.id = to_number(r2.id)
when matched then update set v.dt = r2.visit_dt
when not matched then insert (id, dt) values (r2.id, r2.visit_dt)
;
-- Resume both tasks.
alter task raw_to_names resume;
alter task raw_to_visits resume;
-- Insert a set of records into the landing table.
insert into raw
select parse_json(column1)
from values
('{"id": "123","fname": "Jane","lname": "Smith","visit_dt": "2019-09-17"}'),
('{"id": "456","fname": "Peter","lname": "Williams","visit_dt": "2019-09-17"}');
-- Query the change data capture record in the table streams
select * from rawstream1;
select * from rawstream2;
-- Wait for the tasks to run.
-- A tiny buffer is added to the wait time
-- because absolute precision in task scheduling is not guaranteed.
call system$wait(70);
-- Query the table streams again.
-- Records should be consumed and no longer visible in streams.
-- Verify the records were inserted into the target tables.
select * from names;
select * from visits;
-- Insert another set of records into the landing table.
-- The records include both new and existing IDs in the target tables.
insert into raw
select parse_json(column1)
from values
('{"id": "456","fname": "Peter","lname": "Williams","visit_dt": "2019-09-25"}'),
('{"id": "789","fname": "Ana","lname": "Glass","visit_dt": "2019-09-25"}');
-- Wait for the tasks to run.
call system$wait(70);
-- Records should be consumed and no longer visible in streams.
select * from rawstream1;
select * from rawstream2;
-- Verify the records were inserted into the target tables.
select * from names;
select * from visits;
按计划卸载数据¶
以下示例将数据流中的更改数据获取记录卸载到内部(即 Snowflake)暂存区。
-- Use the landing table from the previous example.
-- Alternatively, create a landing table.
-- Snowpipe could load data into this table.
create or replace table raw (id int, type string);
-- Create a stream on the table. We will use this stream to feed the unload command.
create or replace stream rawstream on table raw;
-- Create a task that executes the COPY statement every minute.
-- The COPY statement reads from the stream and loads into the table stage for the landing table.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task unloadtask
warehouse = mywh
schedule = '1 minute'
when
system$stream_has_data('RAWSTREAM')
as
copy into @%raw/rawstream from rawstream overwrite=true;
;
-- Resume the task.
alter task unloadtask resume;
-- Insert raw data into the landing table.
insert into raw values (3,'processed');
-- Query the change data capture record in the table stream
select * from rawstream;
-- Wait for the tasks to run.
-- A tiny buffer is added to the wait time
-- because absolute precision in task scheduling is not guaranteed.
call system$wait(70);
-- Records should be consumed and no longer visible in the stream.
select * from rawstream;
-- Verify the COPY statement unloaded a data file into the table stage.
ls @%raw;
按计划刷新外部表元数据¶
以下示例将按计划刷新名为 mydb.myschema.exttable
的外部表元数据(使用 ALTER EXTERNAL TABLE ... REFRESH)。
备注
创建外部表时,参数 AUTO_REFRESH 默认设置为 TRUE
。对于引用 Amazon S3 或 Microsoft Azure 暂存区中数据文件的外部表,我们建议您接受此默认值。但是,对于引用 Google Cloud Storage 暂存区的外部表,自动刷新选项目前不可用。对于这些外部表,按计划手动刷新元数据非常有用。
-- Create a task that executes an ALTER EXTERNAL TABLE ... REFRESH statement every 5 minutes.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
CREATE TASK exttable_refresh_task
WAREHOUSE=mywh
SCHEDULE='5 minutes'
AS
ALTER EXTERNAL TABLE mydb.myschema.exttable REFRESH;