EnforceOrder 2025.5.31.15¶
捆绑包¶
org.apache.nifi | nifi-standard-nar
描述¶
强制执行单个节点内属于同一数据组的 FlowFiles 预期排序。 尽管 PriorityAttributePrioritizer 可以在连接上使用以确保通过该连接的 FlowFile 按优先顺序排列,但 FlowFiles 可能会出现乱序,具体取决于错误处理、分支和其他流设计。EnforceOrder 可用于强制执行这些 FlowFiles 的原排序。[IMPORTANT] 为使 EnforceOrder 生效,必须在 EVERY 的下游关系 UNTIL 中按 FlowFiles 的顺序使用 FirstInFirstOutPrioritizer,通过 MergeContent 等操作实际获取 FIXED,或将其存储至最终目标位置。
输入要求¶
REQUIRED
支持敏感的动态属性¶
false
属性¶
属性 |
描述 |
---|---|
batch-count |
一次执行时 EnforceOrder 可以处理的最大 FlowFiles 数量。 |
group-id |
EnforceOrder 可进行多组排序。“组标识符”用于确定 FlowFile 所属的组。将在每次传入 FlowFile 时评估此属性。如果评估结果为空,则 FlowFile 会路由至“failure”。 |
inactive-timeout |
表示不活动组的状态将在多长时间后从托管状态中清除。如果在指定持续时间内未出现任何新传入 FlowFile,则该组被确定为不活动状态。“因不活动超时时间”必须长于“等待超时时间”。若 FlowFile 在其所属组已被清除后延迟到达,系统将视其为全新组,但由于预期的前置 FlowFiles 已不存在,其顺序永远无法匹配。最终 FlowFile 会等待超时并路由到“overtook”。为避免这种情况,组状态应保持足够长的时间,但是,较短的持续时间将有助于再次重复使用相同的组标识符。 |
initial-order |
当组的第一个 FlowFile 到达时,将计算初始目标顺序并将其存储在托管状态下。之后,EnforceOrder 将开始跟踪目标顺序并将其存储在状态管理存储中。如果使用了“表达式语言”但计算结果不是整数,则 FlowFile 会路由至“failure”,并且在连续 FlowFiles 提供有效的初始顺序之前,初始顺序将保持未知状态。 |
maximum-order |
如有指定,则任何顺序较大的 FlowFiles 都将失败。对于给定组,此属性仅计算一次。计算出最大顺序后,它将保留在状态管理存储中,并用于其他属于同一组的 FlowFiles。如果使用了“表达式语言”但计算结果不是整数,则 FlowFile 会路由至“failure”,并且在连续 FlowFiles 提供有效的最大顺序之前,最大顺序将保持未知状态。 |
order-attribute |
FlowFile 属性名称,其值将用于强制执行组内的 FlowFiles 顺序。如果 FlowFile 没有此属性或其值非整数,FlowFile 将会路由到“failure”。 |
wait-timeout |
表示等待持续时间过后 FlowFiles 将会路由到“overtook”关系。 |
状态管理¶
范围 |
描述 |
---|---|
LOCAL |
EnforceOrder 对每个顺序组使用以下状态:“<groupId>.target”是一个正在等待下一个到达的顺序编号。若到达的 FlowFile 顺序与当前等待值匹配,或另一个 FlowFile 因 FlowFile 等待超时而抢先到达,则目标顺序将更新为 (FlowFile.order + 1)。“<groupId>.max”是该组内的最大顺序编号,“<groupId>.updatedAt”是该组顺序最近一次更新的时间戳。一旦组被确定为不活动状态,这些托管状态将自动移除,有关详细信息,请参阅“因不活动超时时间”。 |
关系¶
名称 |
描述 |
---|---|
failure |
无必填属性或无法计算这些属性的 FlowFiles 将路由到此关系 |
overtook |
等待前面 FlowFiles 的时间超过“等待超时”并超越 FlowFiles 的 FlowFile 将路由到此关系。 |
skipped |
如果 FlowFile 的顺序编号早于当前值,这意味着到达时间太晚且已跳过,则会路由到此关系。 |
success |
顺序编号匹配的 FlowFile 将路由到此关系。 |
wait |
顺序编号不匹配的 FlowFile 将路由到此关系 |
写入属性¶
名称 |
描述 |
---|---|
EnforceOrder.startedAt |
通过该处理器的所有 FlowFiles 都将具有此属性。该值用于确定等待超时。 |
EnforceOrder.result |
通过该处理器的所有 FlowFiles 都将具有此属性,表示它被路由到哪个关系。 |
EnforceOrder.detail |
路由到“failure”或“skipped”关系的 FlowFiles 将使用此属性描述详细信息。 |
EnforceOrder.expectedOrder |
路由到“failure”或“skipped”关系的 FlowFiles 将使用此属性表示处理 FlowFile 时的预期顺序。 |