使用 Openflow 创建数据流¶
本主题介绍在 Openflow 中创建数据流的过程。
先决条件¶
过程¶
设置好运行时环境后,我们来创建一个简单的数据管道。例如,我们将基于一个指定架构生成记录,基于 SQL 查询筛选这些记录,然后将数据发送到 Snowflake。
有关如何构建数据流的详细描述,请参阅 Apache NiFi 文档 (https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#building-dataflow)。
打开 Openflow 应用程序。很可能为空白状态的大型网格区域称为“画布”,在您为实现数据流创建组件时,这里就是这些组件的“大本营”。
创建流程组。将页面顶部的工具面板中的“Process Group”图标拖放到画布上。松开指针后,将显示一个 Create Process Group 弹出窗口。
输入数据流的名称,例如“Flow Example”,然后点击 Add。
可选:右键点击您刚刚创建的流程组,然后在上下文菜单中选择 Enter Group`。您也可以双击该流程组。这会创建一个不在画布顶层的视觉抽象。
添加处理器。要添加处理器,请选中 Processor 工具并将其拖放到画布上,然后松开鼠标。
随即出现 Add Processor 对话框。
从列表中选择 GenerateRecord` 处理器,然后点击 Add。
画布中现在会显示您新添加的处理器。
备注
您可以添加多个处理器。
添加以下处理器。我们将在后续步骤中对其进行配置:
QueryRecord
PutDatabaseRecord
配置处理器。
双击一个处理器。 随即出现 Edit Processor 对话框。
修改以下属性:
设置
Scheduling
属性
Relationships:
注释
在处理器之间创建连接。
将鼠标悬停在第一个处理器上。处理器中间会显示一个内有箭头的圆圈。
点击内有箭头的圆圈,然后将指针朝着第二个处理器处拖动。这将创建一条红色虚线,表示尚未准备好进行连接。
将字画面移到第二个处理器上。
虚线会变为绿色,目标处理器周围会出现绿色边框。
松开鼠标按键。此时会出现 Create Connection 弹出窗口。
记下 From Processor 和 To Processor 的名称。选择 :ui:'Relationships` 部分,确认显示 Success。
点击 Add。新连接已创建。
连接由一个 FlowFiles 队列提供支持,该队列存储这些 FlowFile,直到下一个处理器被触发并使用它们。
将 SnowflakeConnectionService 控制器服务添加到流程中。
编辑控制器服务并填写必填字段。
登录您的 Snowflake 账户,并创建数据库。
在数据库的 PUBLIC 架构中,创建一个标准表。
create table SAMPLE_DATA (
name STRING,
country STRING
)
在 Openflow 上运行流程。
查询数据。