ConsumeMQTT 2025.5.31.15

捆绑包

org.apache.nifi | nifi-mqtt-nar

描述

订阅主题并接收来自 MQTT 代理的消息

标签

IOT、MQTT、consume、listen、subscribe

输入要求

FORBIDDEN

支持敏感的动态属性

false

属性

属性

描述

代理 URI

用于连接到 MQTT 代理的 URI(例如 tcp://localhost:1883)。支持“tcp”、“ssl”、“ws”和“wss”架构。要使用“ssl”,必须设置 SSL Context Service 属性。当设置以逗号分隔的 URI 列表(例如,tcp://localhost:1883tcp://localhost:1884)时,处理器将在连接失败时使用循环算法连接到代理。

客户端 ID

要使用的 MQTT 客户端 ID。如果未设置,将生成 UUID。

连接超时(秒)

客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。默认超时时间为 30 秒。值为 0 会禁用超时处理,这表示客户端会一直等到网络连接成功或失败。

组 ID

要使用的 MQTT 使用者群组 ID。如果未设置群组 ID,则客户端将以个人使用者身份连接。

保持活动间隔(秒)

定义消息发送或接收之间的最大时间间隔。这让客户端能检测服务器是否不再可用,而不必等待 TCP/IP 超时。在每个保持活动状态的时间段内,客户端将确保至少有一条消息通过网络传输。如果在这段时间内没有与数据相关的消息,客户端会发送一条非常小的“ping”消息,服务器将确认该消息。如果值为 0,客户端中的保持活动状态处理将会被禁用。

遗愿消息

作为客户端遗愿发送的消息。

遗愿 QoS 级别

发布遗愿消息时使用的 QoS 级别。

遗愿保留

是否保留客户端的遗愿

遗愿主题

要将客户端遗愿发送到的主题。

MQTT 规范版本

与代理连接时的 MQTT 规范版本。有关更多详细信息,请参阅允许的值的说明。

最大队列规模

无论处理器计划的运行频率如何,都会向订阅者发送有关某个主题的 MQTT 消息。如果“Run Schedule”明显落后于消息到达该处理器的速度,则可以在该处理器的内部队列中进行备份。此属性指定了该处理器在内部队列中每次存储在内存中的最大消息数。如果 NiFi 重新启动,这些数据将丢失。

密码

连接代理时使用的密码

服务质量 (QoS)

用于接收消息的服务质量 (QoS)。接受值“0”、“1”或“2”;“0”表示“最多一次”,“1”表示“至少一次”,“2”表示“仅一次”。

SSL Context Service

用于为 TLS/SSL 连接提供客户端证书信息的 SSL Context Service。

会话过期间隔

在此间隔之后,代理会将客户端设为过期并清除会话状态。

会话状态

开始全新的流,还是恢复之前的流。有关更多详细信息,请参阅允许的值的说明。

主题筛选器

用于指定要订阅的主题的 MQTT 主题筛选器。

用户名

连接代理时使用的用户名

add-attributes-as-fields

如果将此属性设置为 True,则将在每条记录中添加默认字段:_topic、_qos、_isDuplicate、_isRetained。

message-demarcator

使用此属性,您可以选择输出包含多条消息的输出 FlowFiles。此属性允许您提供一个字符串(解释为 UTF–8),用于划分多条消息。这是一个可选属性;如果未提供,并且未定义记录读取器/写入器,则收到的每条消息都将生成一个 FlowFile。要输入特殊字符,例如“换行”,请根据 OS 使用 CTRL+Enter 或 Shift+Enter。

record-reader

用于将收到的 MQTT 消息解析为记录的记录读取器。

record-writer

将记录写入到 FlowFile 之前对其进行序列化时要使用的记录写入器。

关系

名称

描述

消息

MQTT 消息输出

parse.failure

如果无法使用配置的记录读取器解析消息,则消息的内容将作为其自身的 FlowFile 单独路由到此关系。

写入属性

名称

描述

record.count

收到的记录数

mqtt.broker

曾经是消息源的 MQTT 代理

mqtt.topic

收到的消息的 MQTT 主题

mqtt.qos

此消息的服务质量。

mqtt.isDuplicate

此消息是否可能与已收到的消息重复。

mqtt.isRetained

此消息是否来自当前发布者,还是由服务器“保留”为主题上发布的最后一条消息。

另请参阅

语言: 中文