ConsumeKinesisStream 2025.5.31.15

捆绑包

org.apache.nifi | nifi-aws-nar

描述

从指定的 AWS Kinesis Stream 读取数据,并为每条已处理的记录(原始)输出 FlowFile,或者为一批已处理的记录输出 FlowFile(如果配置了记录读取器和记录写入器)。处理器运行时,至少传送一次 Stream 中的所有 Kinesis 记录。在开始提取数据之前,AWS Kinesis Client Library 初始化可能需要几秒钟的时间。DynamoDB 用于检查点,CloudWatch(可选)用于指标。确保提供的凭据可以访问 DynamoDB 和 CloudWatch(可选)以及 Kinesis。

标签

amazon、aws、consume、kinesis、stream

输入要求

FORBIDDEN

支持敏感的动态属性

false

属性

属性

描述

AWS 凭据提供商服务

用于获取 AWS 凭据提供商的控制器服务

Amazon Kinesis Stream 名称

Kinesis Stream 的名称

应用程序名称

Kinesis Stream 阅读器应用程序名称。

检查点间隔

Kinesis 检查点之间的间隔

通信超时

DynamoDB 替换

要使用非 AWS 部署的 DynamoDB 替换

端点替换 URL

要使用的端点 URL 而不是 AWS 默认端点,包括架构、主机、端口和路径。AWS 库会根据 AWS 区域选择端点 URL,但此属性会替换所选端点 URL,允许与其他与 S3 兼容的端点一起使用。

故障转移超时

Kinesis 客户端库故障转移超时

正常关机超时

Kinesis 客户端库正常关机超时

初始 Stream 位置

读取 Kinesis Stream 的初始位置。

输出策略

用于将 Kinesis 记录输出到 FlowFile 记录的格式。

记录读取器

用于读取收到的消息的记录读取器。表达式语言“${kinesis.name}”可以引用 Kinesis Stream 名称来访问架构。如果未指定记录读取器/记录写入器,则每条 Kinesis 记录都将创建一个 FlowFile。

记录写入器

用于将记录序列到为输出 FlowFile 的记录写入器。表达式语言“${kinesis.name}”可以引用 Kinesis Stream 名称来访问架构。如果未指定记录读取器/记录写入器,则每条 Kinesis 记录都将创建一个 FlowFile。

区域

向 CloudWatch 报告指标

是否向 CloudWatch 报告 Kinesis 使用指标。

重试次数

重试 Kinesis 操作(处理记录、检查点、关机)的次数

重试等待时间

重试 Kinesis 操作(处理记录、检查点、关机)之间的时间间隔

Stream 位置时间戳

Stream 中开始读取 Kinesis 记录的时间戳位置。如果读取 Kinesis Stream 的初始位置为 AT_TIMESTAMP,则为必填项。使用“时间戳格式”将值解析为“日期”。

时间戳格式

用于将“Stream 位置时间戳”解析为“日期”以及将 Kinesis 记录的“大约抵达时间戳”转换为 FlowFile 属性的格式。

proxy-configuration-service

指定代理配置控制器服务来代理网络请求。

关系

名称

描述

success

FlowFiles 将路由到 success 关系

写入属性

名称

描述

aws.kinesis.partition.key

从分片读取的(最后一个)Kinesis 记录的分区键

aws.kinesis.shard.id

从中读取 Kinesis 记录的分片 ID

aws.kinesis.sequence.number

分片中(最后一个)Kinesis 记录的唯一标识符

aws.kinesis.approximate.arrival.timestamp

从 Stream 中读取的(最后一次)Kinesis 记录的大致到达时间戳

mime.type

将 mime.type 属性设置为记录写入器指定的 MIME 类型(如果已配置)

record.count

记录写入器写入 FlowFiles 的记录数(如果已配置)

record.error.message

此属性提供失败时记录读取器或记录写入器遇到的错误消息(如果已配置)

另请参阅

语言: 中文