PutElasticsearchRecord 2025.10.9.21¶
捆绑包¶
org.apache.nifi | nifi-elasticsearch-restapi-nar
描述¶
使用官方 Elastic REST 客户端库的记录感知 Elasticsearch put 处理器。FlowFile 中的每条记录都将转换为文档,然后发送到 Elasticsearch _bulk APi。发送到 Elasticsearch 的每个请求中可以批处理多个文档。可以使用记录路径表达式配置每个文档的批量操作。
输入要求¶
REQUIRED
支持敏感的动态属性¶
false
属性¶
属性 |
描述 |
|---|---|
批处理大小 |
单批发送的记录数。 |
Client Service |
用于运行查询的 Elasticsearch 客户端服务。 |
日期格式 |
指定写入“Date”字段时使用的格式。如果未指定,则使用默认格式“yyyy-MM-dd”。如果已指定,则该值必须与 Java 简单日期格式相匹配(例如,MM/dd/yyyy 表示两位数月份,后面依次是两位数日期、四位数年份,全部由“/”字符分隔,如 01/25/2017)。 |
Dynamic Templates Record Path |
RecordPath 用于指向记录中包含文档的 dynamic_templates 的字段。字段必须与 Map-type 兼容(例如 Map 或 Record),或者是可解析为 JSON 对象的字符串。需要 Elasticsearch 7+ |
按批量错误类型对结果进行分组 |
The errored records written to the "errors" relationship will be grouped by error type and the error related to the first record within the FlowFile added to the FlowFile as "elasticsearch.bulk.error". If "Treat Not Found as Success" is "false" then records associated with "not_found" Elasticsearch document responses will also be send to the "errors" relationship. |
ID 记录路径 |
用于检索 ID 字段以供 Elasticsearch 使用的记录路径表达式。如果留空,ID 将由 Elasticsearch 自动生成。 |
索引 |
要使用的索引的名称。 |
索引操作 |
用于索引的操作的类型(创建、删除、索引、更新、更新插入) |
索引操作记录路径 |
用于检索“Index Operation”字段以供 Elasticsearch 使用的记录路径表达式。如果留空,将使用主索引操作属性来确定索引操作。 |
索引记录路径 |
用于检索“index”字段以供 Elasticsearch 使用的记录路径表达式。如果留空,则将使用主索引属性确定索引。 |
Log Error Responses |
如果启用了此属性,则将在错误日志级别将错误记录到 NiFi 日志中。否则,只有在 NiFi 上整体启用调试日志记录时,才会记录错误。此选项的目的是,使用户能够在不必打开调试日志记录的情况下调试失败的操作。 |
最大 JSON 字段字符串长度 |
解析 JSON 文档或属性时字符串值的最大允许长度。 |
Output Error Responses |
If this is enabled, response messages from Elasticsearch marked as "error" will be output to the "error_responses" relationship. This does not impact the output of flowfiles to the "successful" or "errors" relationships |
Record Reader |
用于从 FlowFile 读取传入记录的记录读取器。 |
Result Record Writer |
将检查 Elasticsearch 的响应中是否存在失败的记录,将使用此记录写入器服务将失败的记录写入到记录集并发送到“errors”关系中。将使用此记录写入器服务将成功的记录写入到记录集,并发送到“successful”关系。 |
保留 ID(记录路径) |
是否保留用作 ID 记录路径的现有字段。 |
Retain Record Timestamp |
是否保留用作 @timestamp 记录路径的现有字段。 |
Script Record Path |
RecordPath 用于指向记录中包含文档更新/更新插入脚本的字段。仅适用于更新/更新插入操作。字段必须与 Map-type 兼容(例如 Map 或 Record),或者是可解析为 JSON 对象的字符串 |
脚本化更新插入记录路径 |
A RecordPath pointing to a field in the record(s) that contains the scripted_upsert boolean flag. Whether to add the scripted_upsert flag to the Upsert Operation. Forces Elasticsearch to execute the Script whether or not the document exists, defaults to false. If the Upsert Document provided (from FlowFile content) will be empty, but sure to set the Client Service controller service's Suppress Null and Empty Values to Never Suppress or no "upsert" doc will be, included in the request to Elasticsearch and the operation will not create a new document for the script to execute against, resulting in a "not_found" error |
时间格式 |
指定写入“Time”字段时使用的格式。如果未指定,则使用默认格式“HH:mm:ss”。如果已指定,则该值必须与 Java 简单日期格式相匹配(例如,HH:mm:ss 表示 24 小时格式的两位数小时,后面依次是两位数分钟和两位数秒钟,全部由“:”字符分隔,如 18:04:15)。 |
时间戳格式 |
指定写入“Timestamp”字段时使用的格式。如果未指定,则使用默认格式“yyyy-MM-dd HH:mm:ss”。如果指定,则该值必须与 Java 简单日期格式相匹配(例如,MM/dd/yyyy HH:mm:ss 表示两位数月份,后面依次是两位数日期、四位数年份,全部由“/”字符分隔;再后面依次是 24 小时格式的两位数小时、两位数分钟和两位数秒钟,全部由“:”字符分隔,如 01/25/2017 18:04:15)。 |
Timestamp Record Path |
RecordPath 用于指向记录中包含文档的 @timestamp 的字段。如果留空,则会使用主 @timestamp 属性确定@timestamp |
时间戳值 |
用作 @timestamp 字段的值(Elasticsearch 数据流的必填项) |
未找到时视为成功 |
如果为 true,“not_found”Elasticsearch 文档关联记录将路由到“successful”关系,否则将路由到“errors”关系。如果输出错误响应为“true”,则来自 Elasticsearch 的“not_found”响应将发送到 error_responses 关系。 |
类型 |
此文档的类型(由 Elasticsearch 用于建立索引和执行搜索)。 |
Type Record Path |
用于检索“type”字段以供 Elasticsearch 使用的记录路径表达式。如果留空,将使用主类型属性来确定类型。 |
关系¶
名称 |
描述 |
|---|---|
errors |
与导致“error”(在 Elasticsearch 中)的 Elasticsearch 文档对应的记录/FlowFile 将路由到此处。 |
failure |
所有因与服务器可用性无关的原因而失败的 FlowFile 都将转到此关系。 |
original |
在没有请求失败的情况下发送到 Elasticsearch 的所有 FlowFile 都将转到此关系。 |
retry |
所有因服务器/集群可用性而失败的 Flowfile 都将转到此关系。 |
successful |
与未导致“error”(在 Elasticsearch 中)的 Elasticsearch 文档对应的记录/FlowFile 将路由到此处。 |
写入属性¶
名称 |
描述 |
|---|---|
elasticsearch.put.error |
如果在解析 FlowFile 记录、将已解析的文档发送到 Elasticsearch 或解析 Elasticsearch 响应时出现问题,则显示此错误消息。 |
elasticsearch.put.error.count |
在 Elasticsearch _bulk API 中生成错误的记录数。 |
elasticsearch.put.success.count |
由 Elasticsearch _bulk API 成功处理的记录数。 |
elasticsearch.bulk.error |
如果在 Elasticsearch 中处理记录时出现错误,则返回 _bulk 响应。 |