ConsumeAMQP 2025.5.31.15

捆绑包

org.apache.nifi | nifi-amqp-nar

描述

通过 AMQP 0.9.1 协议使用来自 AMQP 代理的 AMQP 消息。从 AMQP 代理收到的每条消息都将作为其自己的 FlowFile 发送到“success”关系。

标签

amqp、consume、get、message、rabbit、receive

输入要求

FORBIDDEN

支持敏感的动态属性

false

属性

属性

描述

AMQP 版本

AMQP 版本。目前仅支持 AMQP v0.9.1。

代理

以逗号分隔的已知 AMQP 代理列表,格式为 <host>:<port>(例如,localhost:5672)。如果设置了此属性,则忽略“Host Name”和“Port”。仅包括来自同一 AMQP 集群的主机。

已启用客户端证书身份验证

使用 SSL 证书而非用户名/密码进行身份验证。

主机名

AMQP 代理的网络地址(例如,localhost)。如果设置了代理,则忽略此属性。

入站消息最大正文大小

入站(接收)消息的最大正文大小。

密码

用于身份验证与授权的密码。

端口

标识 AMQP 代理端口的数值(例如,5671)。如果设置了代理,则忽略此属性。

队列

待使用消息的现有 AMQP Queue 名称。通常由 AMQP 管理员预先定义。

SSL Context Service

用于为 TLS/SSL 连接提供客户端证书信息的 SSL Context Service。

用户名

用于身份验证和授权的用户名。

虚拟主机

虚拟主机名,用于隔离 AMQP 系统以增强安全性。

auto.acknowledge

如果为 False(非自动确认),则消息将在 FlowFiles 成功传输并提交 NiFi 会话后由处理器确认。“Non-Auto-Acknowledge”模式提供“at-least-once”的传递语义。如果为 True(自动确认),则发送给“AMQP Client”的消息将在发送后立即由“AMQP 代理”自动确认。这通常可以提供更好的吞吐量,但也会导致“AMQP 代理”、NiFi 或处理器重启/崩溃时消息丢失。“Auto-Acknowledge”模式提供“at-most-once”的传递语义,建议仅在可以接受丢失消息的情况下使用。

batch.size

单个会话中应处理的最大消息数。一旦收到这么多消息(或者一旦没有更多消息可用),收到的消息将转移到“success”关系中且消息将得到“AMQP 代理”的确认。将此值设置为更大的数值可以提高性能,特别是对于非常小的消息,但也可能导致在 NiFi 突然重启时复制更多消息。

header.format

定义如何从接收到的消息中输出标头

header.key.prefix

在将标头键添加到 FlowFile 属性时,所要添加的前缀文本。处理器会将“.”追加到该属性的值中

header.separator

用于分隔字符串中标头的键值的字符。该值必须只有一个字符。

prefetch.count

使用者未确认的消息的最大数量。如果使用者拥有这一数量的未确认消息,则在使用者确认已发送给其的部分消息之前,“AMQP 代理”将不再发送新消息。允许的值:从 0 到 65535。0 表示没有限制

remove.curly.braces

如果“Remove Curly Braces”为 True,则自动移除标头中的花括号。

关系

名称

描述

success

从 AMQP Queue 接收到的所有 FlowFiles 都将路由到此关系

写入属性

名称

描述

amqp$appId

“AMQP Message”中的“App ID”字段

amqp$contentEncoding

“AMQP Message”报告的内容编码

amqp$contentType

“AMQP Message”报告的内容类型

amqp$headers

“AMQP Message”中显示的标头。仅当处理器配置为输出此属性时才添加。

<Header Key Prefix>.<attribute>

如果处理器配置为将标头作为属性输出,则将使用此属性名称插入每个消息标头

amqp$deliveryMode

消息的传递模式数字指示信号

amqp$priority

消息优先级

amqp$correlationId

消息的“Correlation ID”

amqp$replyTo

消息的“Reply-To”字段的值

amqp$expiration

消息有效期

amqp$messageId

消息的唯一 ID

amqp$timestamp

消息的时间戳,即自纪元以来的毫秒数

amqp$type

消息的类型

amqp$userId

用户的 ID

amqp$clusterId

AMQP 集群的 ID

amqp$routingKey

“AMQP Message”的“routingKey”

amqp$exchange

接收“AMQP Message”的源交换过程

语言: 中文