使用 Openflow 创建数据流

本主题介绍在 Openflow 中创建数据流的过程。

先决条件

设置 Openflow

过程

设置好运行时环境后,我们来创建一个简单的数据管道。例如,我们将基于一个指定架构生成记录,基于 SQL 查询筛选这些记录,然后将数据发送到 Snowflake。

有关如何构建数据流的详细描述,请参阅 Apache NiFi 文档 (https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#building-dataflow)。

  1. 打开 Openflow 应用程序。很可能为空白状态的大型网格区域称为“画布”,在您为实现数据流创建组件时,这里就是这些组件的“大本营”。

  2. 创建流程组。将页面顶部的工具面板中的“Process Group”图标拖放到画布上。松开指针后,将显示一个 Create Process Group 弹出窗口。

  3. 输入数据流的名称,例如“Flow Example”,然后点击 Add

  4. 可选:右键点击您刚刚创建的流程组,然后在上下文菜单中选择 Enter Group`。您也可以双击该流程组。这会创建一个不在画布顶层的视觉抽象。

  5. 添加处理器。要添加处理器,请选中 Processor 工具并将其拖放到画布上,然后松开鼠标。

    随即出现 Add Processor 对话框。

  6. 从列表中选择 GenerateRecord` 处理器,然后点击 Add

    画布中现在会显示您新添加的处理器。

    备注

    您可以添加多个处理器。

  7. 添加以下处理器。我们将在后续步骤中对其进行配置:

    1. QueryRecord

    2. PutDatabaseRecord

  8. 配置处理器。

    双击一个处理器。 随即出现 Edit Processor 对话框。

    修改以下属性:

    1. 设置

    2. Scheduling

    3. 属性

    4. Relationships:

    5. 注释

  9. 在处理器之间创建连接。

    1. 将鼠标悬停在第一个处理器上。处理器中间会显示一个内有箭头的圆圈。

    2. 点击内有箭头的圆圈,然后将指针朝着第二个处理器处拖动。这将创建一条红色虚线,表示尚未准备好进行连接。

    3. 将字画面移到第二个处理器上。

      虚线会变为绿色,目标处理器周围会出现绿色边框。

    4. 松开鼠标按键。此时会出现 Create Connection 弹出窗口。

    5. 记下 From ProcessorTo Processor 的名称。选择 :ui:'Relationships` 部分,确认显示 Success

    6. 点击 Add。新连接已创建。

    连接由一个 FlowFiles 队列提供支持,该队列存储这些 FlowFile,直到下一个处理器被触发并使用它们。

  10. 将 SnowflakeConnectionService 控制器服务添加到流程中。

  11. 编辑控制器服务并填写必填字段。

  12. 登录您的 Snowflake 账户,并创建数据库。

  13. 在数据库的 PUBLIC 架构中,创建一个标准表。

create table SAMPLE_DATA (
    name STRING,
    country STRING
)
Copy
  1. 在 Openflow 上运行流程。

  2. 查询数据。

语言: 中文