ConsumeAMQP 2025.5.31.15¶
捆绑包¶
org.apache.nifi | nifi-amqp-nar
描述¶
通过 AMQP 0.9.1 协议使用来自 AMQP 代理的 AMQP 消息。从 AMQP 代理收到的每条消息都将作为其自己的 FlowFile 发送到“success”关系。
输入要求¶
FORBIDDEN
支持敏感的动态属性¶
false
属性¶
属性 |
描述 |
---|---|
AMQP 版本 |
AMQP 版本。目前仅支持 AMQP v0.9.1。 |
代理 |
以逗号分隔的已知 AMQP 代理列表,格式为 <host>:<port>(例如,localhost:5672)。如果设置了此属性,则忽略“Host Name”和“Port”。仅包括来自同一 AMQP 集群的主机。 |
已启用客户端证书身份验证 |
使用 SSL 证书而非用户名/密码进行身份验证。 |
主机名 |
AMQP 代理的网络地址(例如,localhost)。如果设置了代理,则忽略此属性。 |
入站消息最大正文大小 |
入站(接收)消息的最大正文大小。 |
密码 |
用于身份验证与授权的密码。 |
端口 |
标识 AMQP 代理端口的数值(例如,5671)。如果设置了代理,则忽略此属性。 |
队列 |
待使用消息的现有 AMQP Queue 名称。通常由 AMQP 管理员预先定义。 |
SSL Context Service |
用于为 TLS/SSL 连接提供客户端证书信息的 SSL Context Service。 |
用户名 |
用于身份验证和授权的用户名。 |
虚拟主机 |
虚拟主机名,用于隔离 AMQP 系统以增强安全性。 |
auto.acknowledge |
如果为 False(非自动确认),则消息将在 FlowFiles 成功传输并提交 NiFi 会话后由处理器确认。“Non-Auto-Acknowledge”模式提供“at-least-once”的传递语义。如果为 True(自动确认),则发送给“AMQP Client”的消息将在发送后立即由“AMQP 代理”自动确认。这通常可以提供更好的吞吐量,但也会导致“AMQP 代理”、NiFi 或处理器重启/崩溃时消息丢失。“Auto-Acknowledge”模式提供“at-most-once”的传递语义,建议仅在可以接受丢失消息的情况下使用。 |
batch.size |
单个会话中应处理的最大消息数。一旦收到这么多消息(或者一旦没有更多消息可用),收到的消息将转移到“success”关系中且消息将得到“AMQP 代理”的确认。将此值设置为更大的数值可以提高性能,特别是对于非常小的消息,但也可能导致在 NiFi 突然重启时复制更多消息。 |
header.format |
定义如何从接收到的消息中输出标头 |
header.key.prefix |
在将标头键添加到 FlowFile 属性时,所要添加的前缀文本。处理器会将“.”追加到该属性的值中 |
header.separator |
用于分隔字符串中标头的键值的字符。该值必须只有一个字符。 |
prefetch.count |
使用者未确认的消息的最大数量。如果使用者拥有这一数量的未确认消息,则在使用者确认已发送给其的部分消息之前,“AMQP 代理”将不再发送新消息。允许的值:从 0 到 65535。0 表示没有限制 |
remove.curly.braces |
如果“Remove Curly Braces”为 True,则自动移除标头中的花括号。 |
关系¶
名称 |
描述 |
---|---|
success |
从 AMQP Queue 接收到的所有 FlowFiles 都将路由到此关系 |
写入属性¶
名称 |
描述 |
---|---|
amqp$appId |
“AMQP Message”中的“App ID”字段 |
amqp$contentEncoding |
“AMQP Message”报告的内容编码 |
amqp$contentType |
“AMQP Message”报告的内容类型 |
amqp$headers |
“AMQP Message”中显示的标头。仅当处理器配置为输出此属性时才添加。 |
<Header Key Prefix>.<attribute> |
如果处理器配置为将标头作为属性输出,则将使用此属性名称插入每个消息标头 |
amqp$deliveryMode |
消息的传递模式数字指示信号 |
amqp$priority |
消息优先级 |
amqp$correlationId |
消息的“Correlation ID” |
amqp$replyTo |
消息的“Reply-To”字段的值 |
amqp$expiration |
消息有效期 |
amqp$messageId |
消息的唯一 ID |
amqp$timestamp |
消息的时间戳,即自纪元以来的毫秒数 |
amqp$type |
消息的类型 |
amqp$userId |
用户的 ID |
amqp$clusterId |
AMQP 集群的 ID |
amqp$routingKey |
“AMQP Message”的“routingKey” |
amqp$exchange |
接收“AMQP Message”的源交换过程 |