ConsumeKafka 2025.5.31.15

捆绑包

com.snowflake.openflow.runtime | runtime-kafka-nar

描述

使用来自 Apache Kafka Consumer API 的消息。对应的发送消息的 NiFi 处理器是 PublishKafka。处理器支持使用 Kafka 消息,并可以选择将其解释为 NiFi 记录。请注意,目前在读取记录模式下,处理器假设从给定分区检索的所有记录具有相同的架构。在此模式下,如果提取了任何 Kafka 消息,但无法使用配置的记录读取器或记录写入器进行解析或写入,则消息的内容将写入一个单独的 FlowFile,并将 FlowFile 将被传送到“parse.failure”关系中。否则,每个 FlowFile 都将发送到“success”关系,并且单个 FlowFile 中可能包含多条单独的消息。添加了“record.count”属性以指示 FlowFile 中包含多少条消息。如果两条 Kafka 消息具有不同的架构,或在 <Headers to Add as Attributes> 属性中包含的消息头值不同,它们不会被置于同一个 FlowFile。

标签

avro、consume、csv、get、ingest、ingress、json、kafka、openflow、pubsub、record、topic

输入要求

FORBIDDEN

支持敏感的动态属性

false

属性

属性

描述

提交偏移量

指定处理器在收到消息后是否应将偏移量提交给 Kafka。通常,应将此值设置为 true,这样收到的消息就不会重复。但是,在某些情况下,我们可能希望避免提交偏移量,然后通过 PublishKafka 可以处理和稍后确认数据,从而实现 Exactly Once 语义。

内容字段

指定将在记录的哪个字段下添加内容。如果未设置,则内容将位于记录的根目录

组 ID

与 Kafka group.id 属性对应的 Kafka 使用者组标识符

标头编码

读取 Kafka 记录标头值和写入 FlowFile 属性时应用的字符编码

标头名称模式

应用于 Kafka 记录标头名称的正则表达式模式,用于选择要写入为 FlowFile 属性的标头值

标头字段父级

指定将在记录的哪个字段下添加标头字段。如果未设置,则标头字段将位于记录的根目录

Kafka 连接服务

提供与 Kafka Broker 的连接以发布 Kafka 记录

键属性编码

包含 Kafka Record Key 的已配置 FlowFile 属性值的编码方式。

键字段父级

指定要在记录的哪个字段下添加键字段。如果未设置,则键字段将位于记录的根目录

键格式

指定如何在输出 FlowFile 中表示 Kafka 记录键

键记录读取器

用于将 Kafka 记录键解析到记录的记录读取器

最大未提交时间

指定处理器在必须通过流程传输 FlowFiles 并将偏移量提交给 Kafka(如果适用)之前可以从 Kafka 使用的最大时间。时间越长,延迟可能越大

消息分隔符

由于 KafkaConsumer 是批量接收消息的,因此处理器提供了一个选项,可以将给定主题和分区内的所有 Kafka 消息以单个批次输出为 FlowFiles,而此属性允许您提供一个字符串(解释为 UTF-8),用于区分多条 Kafka 消息。这是一个可选属性,如果未提供,则收到的每条 Kafka 消息都会在处理器触发时生成一个单独的 FlowFile。要输入特殊字符,例如“换行符”,请根据 OS 使用 CTRL +Enter 或 Shift+Enter

元数据字段

指定将在记录的哪个字段下添加元数据。如果未设置,则元数据将位于记录的根目录

元数据接收的时间戳字段

如果指定了该字段,则时间戳将放在输出 FlowFile 中的记录元数据的指定字段下方

输出策略

用于将 Kafka 记录输出到 FlowFile 记录的格式。

处理策略

处理 Kafka 记录和将序列化输出写入 FlowFiles 的策略

记录读取器

用于传入 Kafka 消息的记录读取器

记录写入器

用于序列化传出 FlowFiles 的记录写入器

按键分隔

启用此属性后,只有当两个 Kafka 消息具有相同的键时,才会将两条消息添加到同一个 FlowFile 中。

主题格式

指定提供的主题是以逗号分隔的名称列表还是单个正则表达式

主题

处理器从中使用 Kafka 记录的 Kafka 主题的名称或模式。如果用逗号分隔,则可提供多个主题。

auto.offset.reset

当未找到与 Kafka auto.offset.reset 属性对应的之前的使用者偏移量时,将应用自动偏移量配置

关系

名称

描述

success

包含一个或多个已序列化 Kafka 记录的 FlowFiles

写入属性

名称

描述

record.count

收到的记录数

mime.type

配置的记录写入器提供的 MIME 类型

kafka.count

写入的消息数量(若有多条)

kafka.key

消息的键(如果存在且为单条消息)。键的编码方式取决于“Key Attribute Encoding”属性的值。

kafka.offset

消息在主题分区中的偏移量。

kafka.timestamp

主题分区中消息的时间戳。

kafka.partition

消息或消息捆绑包所在主题的分区

kafka.topic

消息或消息捆绑包来自的主题

kafka.tombstone

如果使用的消息是墓碑消息,则设置为 true

另请参阅

语言: 中文