ConsumeAMQP 2025.10.2.19¶
捆绑包¶
org.apache.nifi | nifi-amqp-nar
描述¶
通过 AMQP 0.9.1 协议使用来自 AMQP 代理的 AMQP 消息。从 AMQP 代理收到的每条消息都将作为其自己的 FlowFile 发送到“success”关系。
输入要求¶
FORBIDDEN
支持敏感的动态属性¶
false
属性¶
属性 |
描述 |
|---|---|
AMQP 版本 |
AMQP 版本。目前仅支持 AMQP v0.9.1。 |
Auto-Acknowledge Messages |
如果为 False(非自动确认),则消息将在 FlowFiles 成功传输并提交 NiFi 会话后由处理器确认。“Non-Auto-Acknowledge”模式提供“at-least-once”的传递语义。如果为 True(自动确认),则发送给“AMQP Client”的消息将在发送后立即由“AMQP 代理”自动确认。这通常可以提供更好的吞吐量,但也会导致“AMQP 代理”、NiFi 或处理器重启/崩溃时消息丢失。“Auto-Acknowledge”模式提供“at-most-once”的传递语义,建议仅在可以接受丢失消息的情况下使用。 |
Batch Size |
单个会话中应处理的最大消息数。一旦收到这么多消息(或者一旦没有更多消息可用),收到的消息将转移到“success”关系中且消息将得到“AMQP 代理”的确认。将此值设置为更大的数值可以提高性能,特别是对于非常小的消息,但也可能导致在 NiFi 突然重启时复制更多消息。 |
代理 |
以逗号分隔的已知 AMQP 代理列表,格式为 <host>:<port>(例如,localhost:5672)。如果设置了此属性,则忽略“Host Name”和“Port”。仅包括来自同一 AMQP 集群的主机。 |
已启用客户端证书身份验证 |
使用 SSL 证书而非用户名/密码进行身份验证。 |
Header Key Prefix |
在将标头键添加到 FlowFile 属性时,所要添加的前缀文本。处理器会将“.”追加到该属性的值中 |
Header Output Format |
定义如何从接收到的消息中输出标头 |
Header Separator |
用于分隔字符串中标头的键值的字符。该值必须只有一个字符。 |
主机名 |
AMQP 代理的网络地址(例如,localhost)。如果设置了代理,则忽略此属性。 |
入站消息最大正文大小 |
入站(接收)消息的最大正文大小。 |
密码 |
用于身份验证与授权的密码。 |
端口 |
标识 AMQP 代理端口的数值(例如,5671)。如果设置了代理,则忽略此属性。 |
Prefetch Count |
The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will no longer send new messages until consumer acknowledges some of the messages already delivered to it. Allowed values: from 0 to 65535.0 means no limit |
队列 |
待使用消息的现有 AMQP Queue 名称。通常由 AMQP 管理员预先定义。 |
Remove Curly Braces |
如果“Remove Curly Braces”为 True,则自动移除标头中的花括号。 |
SSL Context Service |
用于为 TLS/SSL 连接提供客户端证书信息的 SSL Context Service。 |
用户名 |
用于身份验证和授权的用户名。 |
虚拟主机 |
虚拟主机名,用于隔离 AMQP 系统以增强安全性。 |
关系¶
名称 |
描述 |
|---|---|
success |
从 AMQP Queue 接收到的所有 FlowFiles 都将路由到此关系 |
写入属性¶
名称 |
描述 |
|---|---|
amqp$appId |
“AMQP Message”中的“App ID”字段 |
amqp$contentEncoding |
“AMQP Message”报告的内容编码 |
amqp$contentType |
“AMQP Message”报告的内容类型 |
amqp$headers |
“AMQP Message”中显示的标头。仅当处理器配置为输出此属性时才添加。 |
<Header Key Prefix>.<attribute> |
如果处理器配置为将标头作为属性输出,则将使用此属性名称插入每个消息标头 |
amqp$deliveryMode |
消息的传递模式数字指示信号 |
amqp$priority |
消息优先级 |
amqp$correlationId |
消息的“Correlation ID” |
amqp$replyTo |
消息的“Reply-To”字段的值 |
amqp$expiration |
消息有效期 |
amqp$messageId |
消息的唯一 ID |
amqp$timestamp |
消息的时间戳,即自纪元以来的毫秒数 |
amqp$type |
消息的类型 |
amqp$userId |
用户的 ID |
amqp$clusterId |
AMQP 集群的 ID |
amqp$routingKey |
“AMQP Message”的“routingKey” |
amqp$exchange |
接收“AMQP Message”的源交换过程 |