ConsumeKafka 2025.5.31.15¶
捆绑包¶
com.snowflake.openflow.runtime | runtime-kafka-nar
描述¶
使用来自 Apache Kafka Consumer API 的消息。对应的发送消息的 NiFi 处理器是 PublishKafka。处理器支持使用 Kafka 消息,并可以选择将其解释为 NiFi 记录。请注意,目前在读取记录模式下,处理器假设从给定分区检索的所有记录具有相同的架构。在此模式下,如果提取了任何 Kafka 消息,但无法使用配置的记录读取器或记录写入器进行解析或写入,则消息的内容将写入一个单独的 FlowFile,并将 FlowFile 将被传送到“parse.failure”关系中。否则,每个 FlowFile 都将发送到“success”关系,并且单个 FlowFile 中可能包含多条单独的消息。添加了“record.count”属性以指示 FlowFile 中包含多少条消息。如果两条 Kafka 消息具有不同的架构,或在 <Headers to Add as Attributes> 属性中包含的消息头值不同,它们不会被置于同一个 FlowFile。
输入要求¶
FORBIDDEN
支持敏感的动态属性¶
false
属性¶
属性 |
描述 |
---|---|
提交偏移量 |
指定处理器在收到消息后是否应将偏移量提交给 Kafka。通常,应将此值设置为 true,这样收到的消息就不会重复。但是,在某些情况下,我们可能希望避免提交偏移量,然后通过 PublishKafka 可以处理和稍后确认数据,从而实现 Exactly Once 语义。 |
内容字段 |
指定将在记录的哪个字段下添加内容。如果未设置,则内容将位于记录的根目录 |
组 ID |
与 Kafka group.id 属性对应的 Kafka 使用者组标识符 |
标头编码 |
读取 Kafka 记录标头值和写入 FlowFile 属性时应用的字符编码 |
标头名称模式 |
应用于 Kafka 记录标头名称的正则表达式模式,用于选择要写入为 FlowFile 属性的标头值 |
标头字段父级 |
指定将在记录的哪个字段下添加标头字段。如果未设置,则标头字段将位于记录的根目录 |
Kafka 连接服务 |
提供与 Kafka Broker 的连接以发布 Kafka 记录 |
键属性编码 |
包含 Kafka Record Key 的已配置 FlowFile 属性值的编码方式。 |
键字段父级 |
指定要在记录的哪个字段下添加键字段。如果未设置,则键字段将位于记录的根目录 |
键格式 |
指定如何在输出 FlowFile 中表示 Kafka 记录键 |
键记录读取器 |
用于将 Kafka 记录键解析到记录的记录读取器 |
最大未提交时间 |
指定处理器在必须通过流程传输 FlowFiles 并将偏移量提交给 Kafka(如果适用)之前可以从 Kafka 使用的最大时间。时间越长,延迟可能越大 |
消息分隔符 |
由于 KafkaConsumer 是批量接收消息的,因此处理器提供了一个选项,可以将给定主题和分区内的所有 Kafka 消息以单个批次输出为 FlowFiles,而此属性允许您提供一个字符串(解释为 UTF-8),用于区分多条 Kafka 消息。这是一个可选属性,如果未提供,则收到的每条 Kafka 消息都会在处理器触发时生成一个单独的 FlowFile。要输入特殊字符,例如“换行符”,请根据 OS 使用 CTRL +Enter 或 Shift+Enter |
元数据字段 |
指定将在记录的哪个字段下添加元数据。如果未设置,则元数据将位于记录的根目录 |
元数据接收的时间戳字段 |
如果指定了该字段,则时间戳将放在输出 FlowFile 中的记录元数据的指定字段下方 |
输出策略 |
用于将 Kafka 记录输出到 FlowFile 记录的格式。 |
处理策略 |
处理 Kafka 记录和将序列化输出写入 FlowFiles 的策略 |
记录读取器 |
用于传入 Kafka 消息的记录读取器 |
记录写入器 |
用于序列化传出 FlowFiles 的记录写入器 |
按键分隔 |
启用此属性后,只有当两个 Kafka 消息具有相同的键时,才会将两条消息添加到同一个 FlowFile 中。 |
主题格式 |
指定提供的主题是以逗号分隔的名称列表还是单个正则表达式 |
主题 |
处理器从中使用 Kafka 记录的 Kafka 主题的名称或模式。如果用逗号分隔,则可提供多个主题。 |
auto.offset.reset |
当未找到与 Kafka auto.offset.reset 属性对应的之前的使用者偏移量时,将应用自动偏移量配置 |
关系¶
名称 |
描述 |
---|---|
success |
包含一个或多个已序列化 Kafka 记录的 FlowFiles |
写入属性¶
名称 |
描述 |
---|---|
record.count |
收到的记录数 |
mime.type |
配置的记录写入器提供的 MIME 类型 |
kafka.count |
写入的消息数量(若有多条) |
kafka.key |
消息的键(如果存在且为单条消息)。键的编码方式取决于“Key Attribute Encoding”属性的值。 |
kafka.offset |
消息在主题分区中的偏移量。 |
kafka.timestamp |
主题分区中消息的时间戳。 |
kafka.partition |
消息或消息捆绑包所在主题的分区 |
kafka.topic |
消息或消息捆绑包来自的主题 |
kafka.tombstone |
如果使用的消息是墓碑消息,则设置为 true |