ConsumeKinesisStream 2025.5.31.15¶
捆绑包¶
org.apache.nifi | nifi-aws-nar
描述¶
从指定的 AWS Kinesis Stream 读取数据,并为每条已处理的记录(原始)输出 FlowFile,或者为一批已处理的记录输出 FlowFile(如果配置了记录读取器和记录写入器)。处理器运行时,至少传送一次 Stream 中的所有 Kinesis 记录。在开始提取数据之前,AWS Kinesis Client Library 初始化可能需要几秒钟的时间。DynamoDB 用于检查点,CloudWatch(可选)用于指标。确保提供的凭据可以访问 DynamoDB 和 CloudWatch(可选)以及 Kinesis。
输入要求¶
FORBIDDEN
支持敏感的动态属性¶
false
属性¶
属性 |
描述 |
---|---|
AWS 凭据提供商服务 |
用于获取 AWS 凭据提供商的控制器服务 |
Amazon Kinesis Stream 名称 |
Kinesis Stream 的名称 |
应用程序名称 |
Kinesis Stream 阅读器应用程序名称。 |
检查点间隔 |
Kinesis 检查点之间的间隔 |
通信超时 |
|
DynamoDB 替换 |
要使用非 AWS 部署的 DynamoDB 替换 |
端点替换 URL |
要使用的端点 URL 而不是 AWS 默认端点,包括架构、主机、端口和路径。AWS 库会根据 AWS 区域选择端点 URL,但此属性会替换所选端点 URL,允许与其他与 S3 兼容的端点一起使用。 |
故障转移超时 |
Kinesis 客户端库故障转移超时 |
正常关机超时 |
Kinesis 客户端库正常关机超时 |
初始 Stream 位置 |
读取 Kinesis Stream 的初始位置。 |
输出策略 |
用于将 Kinesis 记录输出到 FlowFile 记录的格式。 |
记录读取器 |
用于读取收到的消息的记录读取器。表达式语言“${kinesis.name}”可以引用 Kinesis Stream 名称来访问架构。如果未指定记录读取器/记录写入器,则每条 Kinesis 记录都将创建一个 FlowFile。 |
记录写入器 |
用于将记录序列到为输出 FlowFile 的记录写入器。表达式语言“${kinesis.name}”可以引用 Kinesis Stream 名称来访问架构。如果未指定记录读取器/记录写入器,则每条 Kinesis 记录都将创建一个 FlowFile。 |
区域 |
|
向 CloudWatch 报告指标 |
是否向 CloudWatch 报告 Kinesis 使用指标。 |
重试次数 |
重试 Kinesis 操作(处理记录、检查点、关机)的次数 |
重试等待时间 |
重试 Kinesis 操作(处理记录、检查点、关机)之间的时间间隔 |
Stream 位置时间戳 |
Stream 中开始读取 Kinesis 记录的时间戳位置。如果读取 Kinesis Stream 的初始位置为 AT_TIMESTAMP,则为必填项。使用“时间戳格式”将值解析为“日期”。 |
时间戳格式 |
用于将“Stream 位置时间戳”解析为“日期”以及将 Kinesis 记录的“大约抵达时间戳”转换为 FlowFile 属性的格式。 |
proxy-configuration-service |
指定代理配置控制器服务来代理网络请求。 |
关系¶
名称 |
描述 |
---|---|
success |
FlowFiles 将路由到 success 关系 |
写入属性¶
名称 |
描述 |
---|---|
aws.kinesis.partition.key |
从分片读取的(最后一个)Kinesis 记录的分区键 |
aws.kinesis.shard.id |
从中读取 Kinesis 记录的分片 ID |
aws.kinesis.sequence.number |
分片中(最后一个)Kinesis 记录的唯一标识符 |
aws.kinesis.approximate.arrival.timestamp |
从 Stream 中读取的(最后一次)Kinesis 记录的大致到达时间戳 |
mime.type |
将 mime.type 属性设置为记录写入器指定的 MIME 类型(如果已配置) |
record.count |
记录写入器写入 FlowFiles 的记录数(如果已配置) |
record.error.message |
此属性提供失败时记录读取器或记录写入器遇到的错误消息(如果已配置) |