PutElasticsearchRecord 2025.10.9.21

捆绑包

org.apache.nifi | nifi-elasticsearch-restapi-nar

描述

使用官方 Elastic REST 客户端库的记录感知 Elasticsearch put 处理器。FlowFile 中的每条记录都将转换为文档,然后发送到 Elasticsearch _bulk APi。发送到 Elasticsearch 的每个请求中可以批处理多个文档。可以使用记录路径表达式配置每个文档的批量操作。

标签

elasticsearch、elasticsearch7、elasticsearch8、elasticsearch9、index、json、put、record

输入要求

REQUIRED

支持敏感的动态属性

false

属性

属性

描述

批处理大小

单批发送的记录数。

客户端服务

用于运行查询的 Elasticsearch 客户端服务。

日期格式

指定写入“Date”字段时使用的格式。如果未指定,则使用默认格式“yyyy-MM-dd”。如果已指定,则该值必须与 Java 简单日期格式相匹配(例如,MM/dd/yyyy 表示两位数月份,后面依次是两位数日期、四位数年份,全部由“/”字符分隔,如 01/25/2017)。

动态模板记录路径

RecordPath 用于指向记录中包含文档的 dynamic_templates 的字段。字段必须与 Map-type 兼容(例如 Map 或 Record),或者是可解析为 JSON 对象的字符串。需要 Elasticsearch 7+

按批量错误类型对结果进行分组

写入到“errors”关系的错误记录将按错误类型分组,与 FlowFile 中第一条记录相关的错误将作为“elasticsearch.bulk.error”添加到 FlowFile 中。如果“Treat Not Found as Success”为“false”,则与“not_found”Elasticsearch 文档响应相关的记录也将发送到“errors”关系。

ID 记录路径

用于检索 ID 字段以供 Elasticsearch 使用的记录路径表达式。如果留空,ID 将由 Elasticsearch 自动生成。

索引

要使用的索引的名称。

索引操作

用于索引的操作的类型(创建、删除、索引、更新、更新插入)

索引操作记录路径

用于检索“Index Operation”字段以供 Elasticsearch 使用的记录路径表达式。如果留空,将使用主索引操作属性来确定索引操作。

索引记录路径

用于检索“index”字段以供 Elasticsearch 使用的记录路径表达式。如果留空,则将使用主索引属性确定索引。

日志错误响应

如果启用了此属性,则将在错误日志级别将错误记录到 NiFi 日志中。否则,只有在 NiFi 上整体启用调试日志记录时,才会记录错误。此选项的目的是,使用户能够在不必打开调试日志记录的情况下调试失败的操作。

最大 JSON 字段字符串长度

解析 JSON 文档或属性时字符串值的最大允许长度。

输出错误响应

如果启用此选项,则来自 Elasticsearch 的标记为“error”的响应消息将输出到“error_responses”关系。这不会影响 Flowfile 输出到“successful”或“errors”关系

记录读取器

用于从 FlowFile 读取传入记录的记录读取器。

结果记录编写器

将检查 Elasticsearch 的响应中是否存在失败的记录,将使用此记录写入器服务将失败的记录写入到记录集并发送到“errors”关系中。将使用此记录写入器服务将成功的记录写入到记录集,并发送到“successful”关系。

保留 ID(记录路径)

是否保留用作 ID 记录路径的现有字段。

保留记录时间戳

是否保留用作 @timestamp 记录路径的现有字段。

脚本记录路径

RecordPath 用于指向记录中包含文档更新/更新插入脚本的字段。仅适用于更新/更新插入操作。字段必须与 Map-type 兼容(例如 Map 或 Record),或者是可解析为 JSON 对象的字符串

脚本化更新插入记录路径

RecordPath 用于指向记录中包含 scripted_upsert 布尔标志的字段。是否将 scripted_upsert 标志添加到更新插入操作中。无论文档是否存在,都强制 Elasticsearch 执行脚本,默认值为 false。如果提供的更新插入文档(来自 FlowFile 内容)为空,请务必将客户端服务控制器服务的“Suppress Null and Empty Values”设置为“Never Suppress”,否则发送到 Elasticsearch 的请求中将不包含“upsert”文档,该操作不会创建新文档供脚本执行,最终将导致“not_found”错误

时间格式

指定写入“Time”字段时使用的格式。如果未指定,则使用默认格式“HH:mm:ss”。如果已指定,则该值必须与 Java 简单日期格式相匹配(例如,HH:mm:ss 表示 24 小时格式的两位数小时,后面依次是两位数分钟和两位数秒钟,全部由“:”字符分隔,如 18:04:15)。

时间戳格式

指定写入“Timestamp”字段时使用的格式。如果未指定,则使用默认格式“yyyy-MM-dd HH:mm:ss”。如果指定,则该值必须与 Java 简单日期格式相匹配(例如,MM/dd/yyyy HH:mm:ss 表示两位数月份,后面依次是两位数日期、四位数年份,全部由“/”字符分隔;再后面依次是 24 小时格式的两位数小时、两位数分钟和两位数秒钟,全部由“:”字符分隔,如 01/25/2017 18:04:15)。

时间戳记录路径

RecordPath 用于指向记录中包含文档的 @timestamp 的字段。如果留空,则会使用主 @timestamp 属性确定@timestamp

时间戳值

用作 @timestamp 字段的值(Elasticsearch 数据流的必填项)

未找到时视为成功

如果为 true,“not_found”Elasticsearch 文档关联记录将路由到“successful”关系,否则将路由到“errors”关系。如果输出错误响应为“true”,则来自 Elasticsearch 的“not_found”响应将发送到 error_responses 关系。

类型

此文档的类型(由 Elasticsearch 用于建立索引和执行搜索)。

类型记录路径

用于检索“type”字段以供 Elasticsearch 使用的记录路径表达式。如果留空,将使用主类型属性来确定类型。

关系

名称

描述

errors

与导致“error”(在 Elasticsearch 中)的 Elasticsearch 文档对应的记录/FlowFile 将路由到此处。

failure

所有因与服务器可用性无关的原因而失败的 FlowFile 都将转到此关系。

original

在没有请求失败的情况下发送到 Elasticsearch 的所有 FlowFile 都将转到此关系。

retry

所有因服务器/集群可用性而失败的 Flowfile 都将转到此关系。

successful

与未导致“error”(在 Elasticsearch 中)的 Elasticsearch 文档对应的记录/FlowFile 将路由到此处。

写入属性

名称

描述

elasticsearch.put.error

如果在解析 FlowFile 记录、将已解析的文档发送到 Elasticsearch 或解析 Elasticsearch 响应时出现问题,则显示此错误消息。

elasticsearch.put.error.count

在 Elasticsearch _bulk API 中生成错误的记录数。

elasticsearch.put.success.count

由 Elasticsearch _bulk API 成功处理的记录数。

elasticsearch.bulk.error

如果在 Elasticsearch 中处理记录时出现错误,则返回 _bulk 响应。

另请参阅