监控 Openflow¶
本主题介绍如何监控 Openflow 的状态和解决问题。
访问 Openflow 日志¶
Snowflake 会将 Openflow 日志发送到您在 设置 Openflow 时配置的事件表。
Snowflake 建议您在事件表查询的 WHERE 子句中包含时间戳。这一点尤为重要,因为各种 Snowflake 组件可能会生成大量数据。通过应用筛选器,可以检索到较小的数据子集,从而提高查询性能。
要快速开始使用 Openflow 的遥测功能,请参阅下面的 示例查询。
Openflow 遥测架构¶
有关事件表列的信息,请参阅 事件表列。
以下部分介绍了 Openflow 如何在事件表中构造遥测。
资源属性¶
描述 Openflow 设置的事件元数据。有关其他类型资源属性的一般信息,请参阅事件表列文档中的 RESOURCE_ATTRIBUTES 列。
名称 |
类型 |
描述 |
---|---|---|
application |
字符串 |
固定值 |
cloud.service.provider |
字符串 |
|
container.id |
字符串 |
容器的唯一标识符 |
container.image.name |
字符串 |
容器镜像的完全限定名称。Openflow 运行时容器将包含本地容器注册表的路径。 例如, |
container.image.tag |
字符串 |
容器镜像的版本 |
k8s.container.name |
字符串 |
K8s 容器的名称。Openflow 运行时容器将以“Runtime Key”开头,以 例如,名为“PostgreSQL CDC”的 Openflow 运行时 Runtime Key 为 postgresql-cdc,因此它的容器名称将为:
|
k8s.container.restart_count |
数字字符串 |
此容器自创建以来重新启动的次数。 |
k8s.namespace.name |
字符串 |
对于 Openflow 运行时,Pod 或容器的 K8s 命名空间以 |
k8s.node.name |
字符串 |
托管 Pod/容器的 EKS 节点或 EKS 节点本身的内部域名。 例如,ip-10-12-13-144.us-west-2.compute.internal |
k8s.pod.name |
字符串 |
K8s Pod 的名称。Openflow 运行时 Pod 将以“Runtime Key”开头,并以每个 Pod 副本的数字标识符结尾。这个数字可以增长到为运行时设置的“最大节点数”,索引为 0。 例如,名为“PostgreSQL CDC”的 Openflow 运行时 Runtime Key 为 postgresql-cdc 和 3 个节点的 Pod 名称将为:
|
k8s.pod.start_time |
ISO 8601 日期字符串 |
Pod 启动时的时间戳 |
k8s.pod.uid |
UUID 字符串 |
集群中 Pod 的唯一标识符 |
openflow.dataplane.id |
UUID 字符串 |
Openflow 部署的唯一标识符,与 Snowflake Openflow UI 中通过“Deployment”>“View Details”显示的“ID”相匹配。 |
- 资源属性示例:
{ "application": "openflow", "cloud.service.provider": "aws", "k8s.container.name": "pg-dev-server", "k8s.container.restart_count": "0", "k8s.namespace.name": "runtime-pg-dev", "k8s.node.name": "ip-10-10-62-36.us-east-2.compute.internal", "k8s.pod.name": "pg-dev-0", "k8s.pod.start_time": "2025-04-25T22:14:29Z", "k8s.pod.uid": "94610175-1685-4c8f-b0a1-42898d1058e6", "k8s.statefulset.name": "pg-dev", "openflow.dataplane.id": "abeddb4f-95ae-45aa-95b1-b4752f30c64a" }
范围¶
名称 |
类型 |
描述 |
---|---|---|
name |
字符串 |
指标的提供商。其中之一:
|
- 范围示例:
{ "name": "runtime" }
记录类型¶
根据此行所表示的 Openflow 遥测类型,类型将为以下之一:
LOG
METRIC
Openflow 不会收集 TRACE 记录,但在 Snowflake 事件表中,该记录也是此列的有效类型。
记录¶
可选。JSON 对象描述了该行所表示的指标的类型。
名称 |
类型 |
描述 |
---|---|---|
metric |
对象 |
包含两个字段:
name 和 unit 值差异很大。有关完整列表,请参阅下面的 应用程序指标。 |
metric_type |
字符串 |
其中之一:
|
value_type |
字符串 |
此指标生成的值的原始类型。其中之一:
|
aggregation_temporality |
字符串 |
可选。对于严格增加且依赖于先前值(例如 Pod CPU 时间和网络 IO)的指标,设置为累积值。 |
is_monotonic |
布尔 |
可选。对于累积指标,这确实表明它在时间序列内严格增加。 |
- 记录示例:
{ "metric": { "name": "connection.queued.duration.max", "unit": "millisecond" }, "metric_type": "gauge", "value_type": "INT" }
记录属性¶
日志¶
日志的记录属性通常会指示此日志的来源。例如,来自名为 testruntime
的 Openflow 运行时日志的记录属性可能为:
{ "log.file.path": "/var/log/pods/runtime-testruntime_testruntime-0_66d80cdb-9484-40a4-bdba-f92eb0af14c7/testruntime-server/0.log", "log.iostream": "stdout", "logtag": "F" }
系统指标¶
CPU 使用率等系统指标通常不会设置记录属性,因此此指标将为值为 空
。
Openflow 应用程序指标¶
应用程序或“流量”指标的记录属性提供有关数据管道中生成该指标的组件的详细信息。这将根据组件的类型而有所不同。请参阅 应用程序指标
{ "component": "PutSnowpipeStreaming", "execution.node": "ALL", "group.id": "c052f9d7-7f76-3013-a2c5-d3b064fa7326", "id": "c69e2913-22a9-36bb-a159-6a5ed1fb9d63", "name": "PutSnowpipeStreaming", "type": "processor" }
值¶
此列包含遥测的原始值。对于指标,这将是一个数值(整数或双精度)。对于日志,这将是一个半结构化的字符串值或格式良好的 JSON 字符串。
Openflow 运行时日志¶
Openflow 运行时大多数日志以 JSON 输出,因此可以对 VALUE
列使用 Snowflake 的 TRY_PARSE_JSON,将该值进一步解析为以下结构化字段:
名称 |
类型 |
描述 |
---|---|---|
formattedMessage |
字符串 |
运行时记录器输出的实际日志消息。 |
level |
字符串 |
其中之一:
|
loggerName |
字符串 |
记录器的完全限定类名。Openflow 处理器通常使用以 这对于查看特定处理器、控制器服务或捆绑库的日志很有用。 |
nanoseconds |
整数 |
创建此日志消息时的纳秒级时间,从毫秒开始。 例如,纳秒值 111222333 可能对应于时间戳值 1749180210111,其纳秒值的前三位与时间戳的后三位数字相匹配。 |
threadName |
字符串 |
用于处理此调用的线程名称。例如, |
throwable |
JSON 对象 |
如果此日志消息没有异常或堆栈跟踪,则为
|
timestamp |
整数 |
此日志消息的创建时间,单位为自 UNIX 纪元以来的毫秒数。 例如,1749180210044 表示该日志创建于 2025-06-05 03:23:30.044 UTC |
应用程序指标¶
备注
以下列表涵盖了可用于 Openflow 运行时的所有应用程序指标。运行时仅输出与 Openflow 连接器相关的部分指标,以保留在 Snowflake 事件表中。
Snowflake 的 OpenTelemetry 报告任务可以将部分或全部指标发送到任何 OTLP 目的地。
连接指标¶
指标名称 |
单位 |
描述 |
---|---|---|
connection.input.bytes |
bytes |
输入项目的大小 |
connection.input.count |
items |
输入项目的数量 |
connection.output.bytes |
bytes |
输出项目的大小 |
connection.output.count |
items |
输出项目的数量 |
connection.queued.bytes |
bytes |
已排队项目的大小 |
connection.queued.bytes.max |
bytes |
已排队项目的最大大小 |
connection.queued.count |
items |
已排队项目的数量 |
connection.queued.count.max |
items |
已排队项目的最大数量 |
connection.queued.duration.total |
milliseconds |
已排队项目的总计持续时间 |
connection.queued.duration.max |
milliseconds |
已排队项目的最大持续时间 |
connection.backpressure.threshold.bytes |
bytes |
在此连接中,开始施加背压之前允许排队的数据最大字节数。 |
connection.backpressure.threshold.objects |
items |
在施加背压之前,此连接中 FlowFiles 可以排队的最大数量。 |
connection.loadbalance.status.load_balance_not_configured |
binary,0 或 1 |
如果连接没有配置负载平衡设置,则为 1。否则为 0。 |
connection.loadbalance.status.load_balance_active |
binary,0 或 1 |
如果连接正在集群中进行负载平衡,则为 1。否则为 0。 |
connection.loadbalance.status.load_balance_inactive |
binary,0 或 1 |
如果连接未在集群中进行负载平衡,则为 1。否则为 0。 |
连接记录属性¶
每个连接指标都包含以下记录属性:
属性 |
描述 |
---|---|
id |
连接的唯一标识符。 |
name |
用户可见的连接名称 |
type |
固定值 |
source.id |
向此连接发送 FlowFiles 的组件的唯一标识符 |
source.name |
向此连接发送 FlowFiles 的组件的用户可见名称 |
destination.id |
从此连接接收 FlowFiles 的组件的唯一标识符 |
destination.name |
从此连接接收 FlowFiles 的组件的用户可见名称 |
group.id |
包含此连接的流程组的唯一标识符 |
输入端口和输出端口指标¶
输入端口和输出端口在技术上属于两种不同的组件类型。为了保持一致性,输入端口和输出端口的指标和属性是相同的,唯一的例外是 type
属性,用于表示是输入端口还是输出端口。
指标名称 |
单位 |
描述 |
---|---|---|
port.thread.count.active |
threads |
活动线程数 |
port.bytes.received |
bytes |
接收的字节数 |
port.bytes.sent |
bytes |
发送的字节数 |
port.flowfiles.received |
flowfiles |
接收的 FlowFiles 数量 |
port.flowfiles.sent |
flowfiles |
发送的 FlowFiles 数量 |
port.input.bytes |
bytes |
输入项目的大小 |
port.input.count |
items |
输入项目的数量 |
port.output.bytes |
bytes |
输出项目的大小 |
port.output.count |
items |
输出项目的数量 |
输入端口和输出端口记录属性¶
每个端口指标都包含以下记录属性:
属性 |
描述 |
---|---|
id |
端口的唯一标识符。 |
name |
用户可见的端口名称 |
type |
|
group.id |
包含此端口的流程组的唯一标识符 |
流程组指标¶
指标名称 |
单位 |
描述 |
---|---|---|
processgroup.thread.count.active |
threads |
活动线程数 |
processgroup.thread.count.stateless |
threads |
无状态线程数 |
processgroup.thread.count.terminated |
threads |
终止线程数 |
processgroup.bytes.read |
bytes |
读取的字节数 |
processgroup.bytes.received |
bytes |
接收的字节数 |
processgroup.bytes.transferred |
bytes |
传输的字节数 |
processgroup.bytes.sent |
bytes |
发送的字节数 |
processgroup.bytes.written |
bytes |
写入的字节数 |
processgroup.flowfiles.received |
flowfiles |
接收的 FlowFiles 数量 |
processgroup.flowfiles.sent |
flowfiles |
发送的 FlowFiles 数量 |
processgroup.flowfiles.transferred |
flowfiles |
传输的 FlowFiles 数 |
processgroup.input.count |
items |
输入的项目数 |
processgroup.input.content.size |
bytes |
输入项目的大小 |
processgroup.output.count |
items |
输出的项目数 |
processgroup.output.content.size |
bytes |
输出项目的大小 |
processgroup.queued.count |
items |
已排队的项目数 |
processgroup.queued.content.size |
bytes |
已排队项目的大小 |
processgroup.time.processing |
nanoseconds |
处理所花费的时间 |
流程组记录属性¶
每个流程组指标都包含以下记录属性:
属性 |
描述 |
---|---|
id |
流程组的唯一标识符 |
name |
用户可见的流程组名称 |
type |
固定值 |
tree.level |
相对于流的根流程组而言,流程组的深度。流程最高级别的流程组的 tree.level 为 1 |
处理器指标¶
指标名称 |
单位 |
描述 |
---|---|---|
processor.thread.count.active |
thread |
活动线程数 |
processor.thread.count.terminated |
thread |
终止线程数 |
processor.time.lineage.average |
nanosecond |
沿袭平均持续时间 |
processor.invocations |
invocations |
调用次数 |
processor.bytes.read |
字节 |
读取的字节数 |
processor.bytes.received |
字节 |
接收的字节数 |
processor.bytes.sent |
字节 |
发送的字节数 |
processor.bytes.written |
字节 |
写入的字节数 |
processor.flowfiles.received |
flowfiles |
接收的 FlowFiles 数量 |
processor.flowfiles.removed |
flowfiles |
移除的 FlowFiles 数量 |
processor.flowfiles.sent |
flowfiles |
发送的 FlowFiles 数量 |
processor.input.count |
item |
输入的项目数 |
processor.input.content.size |
bytes |
输入项目的大小 |
processor.output.count |
item |
输出的项目数 |
processor.output.content.size |
字节 |
输出项目的大小 |
processor.time.processing |
nanosecond |
处理所花费的时间 |
processor.run.status.running |
binary,0 或 1 |
如果正在运行,则为 1;否则为 0 |
processor.run.status.stopped |
binary,0 或 1 |
如果已停止,则为 1;否则为 0 |
processor.run.status.validating |
binary,0 或 1 |
如果正在验证,则为 1;否则为 0 |
processor.run.status.invalid |
binary,0 或 1 |
如果无效,则为 1;否则为 0 |
processor.run.status.disabled |
binary,0 或 1 |
如果禁用,则为 1;否则为 0 |
processor.counter |
count |
计数器的值 |
处理器记录属性¶
每个处理器指标都包含以下记录属性:
属性 |
描述 |
---|---|
id |
处理器的唯一标识符 |
name |
处理器的用户可见且用户可编辑的名称 |
type |
固定值 |
component |
处理器的不可变类名。 |
execution.node |
此处理器所配置的运行节点,取值为 |
group.id |
包含此处理器的流程组的唯一标识符 |
计数器的附加属性¶
除了上述标准处理器属性外,processor.counter
指标还包括以下内容:
属性 |
描述 |
---|---|
type |
固定值 |
counter |
用户或系统生成的计数器名称 |
远程流程组指标¶
指标名称 |
单位 |
描述 |
---|---|---|
remoteprocessgroup.thread.count.active |
threads |
活动线程数 |
remoteprocessgroup.remote.port.count.active |
ports |
活动远程端口数 |
remoteprocessgroup.remote.port.count.inactive |
ports |
非活动远程端口数 |
remoteprocessgroup.duration.lineage.average |
nanoseconds |
沿袭平均持续时间 |
remoteprocessgroup.refresh.age |
milliseconds |
自上次刷新以来的时间 |
remoteprocessgroup.received.count |
items |
接收项目的数量 |
remoteprocessgroup.received.content.size |
bytes |
接收项目的大小 |
remoteprocessgroup.sent.count |
items |
已发送项目的数量 |
remoteprocessgroup.sent.content.size |
bytes |
已发送项目的大小 |
remoteprocessgroup.transmission.status.transmitting |
binary,0 或 1 |
如果远程流程组正在传输,则为 1。否则为 0。 |
remoteprocessgroup.transmission.status.nottransmitting |
binary,0 或 1 |
如果远程流程组正在传输,则为 0。否则为 1。 |
远程流程组记录属性¶
每个远程流程组指标都包含以下记录属性:
属性 |
描述 |
---|---|
id |
远程流程组的唯一标识符 |
name |
远程流程组的用户可见名称 |
group.id |
包含此远程流程组的流程组的唯一标识符 |
authorization.issue |
用于访问远程流程组的授权 |
target.uri |
远程流程组的 URI |
type |
固定值 |
JVM 指标¶
指标名称 |
单位 |
描述 |
---|---|---|
jvm.memory.heap.used |
bytes |
JVM 堆上对象当前占用的内存量 |
jvm.memory.heap.committed |
bytes |
保证可供 JVM 堆使用的内存量 |
jvm.memory.heap.max |
bytes |
分配给 JVM 堆的最大内存量 |
jvm.memory.heap.init |
bytes |
分配给 JVM 堆的初始内存量 |
jvm.memory.heap.usage |
percentage |
JVM 堆使用率 |
jvm.memory.non-heap.usage |
percentage |
JVM 非堆使用率 |
jvm.memory.total.init |
bytes |
分配给 JVM 的初始内存量 |
jvm.memory.total.used |
bytes |
当前 JVM 使用的内存量 |
jvm.memory.total.max |
bytes |
JVM 可使用的最大内存量 |
jvm.memory.total.committed |
bytes |
保证可供 JVM 使用的内存量 |
jvm.threads.count |
threads |
活跃线程数量 |
jvm.threads.deadlocks |
threads |
JVM 线程死锁 |
jvm.threads.daemon.count |
threads |
活跃守护线程数 |
jvm.uptime |
seconds |
JVM 流程已运行的秒数 |
jvm.file.descriptor.usage |
percentage |
当前使用的可用文件描述符的百分比。 |
jvm.gc.G1-Concurrent-GC.runs |
runs |
G1 并发垃圾回收运行的总次数 |
jvm.gc.G1-Concurrent-GC.time |
milliseconds |
G1 并发垃圾回收运行的总时间 |
jvm.gc.G1-Young-Generation.runs |
runs |
G1 年轻代垃圾回收运行的总次数 |
jvm.gc.G1-Young-Generation.time |
milliseconds |
G1 年轻代垃圾回收运行的总时间 |
jvm.gc.G1-Old-Generation.runs |
runs |
G1 老年代垃圾回收运行的总次数 |
jvm.gc.G1-Old-Generation.time |
milliseconds |
G1 老年代垃圾回收运行的总时间 |
JVM 记录属性¶
JVM 指标不提供记录属性。
CPU 指标¶
指标名称 |
单位 |
描述 |
---|---|---|
cores.available |
cores |
运行时可用内核的数量 |
cores.load |
percentage |
系统平均负载,若不可用则为 -1 |
CPU 记录属性¶
属性 |
描述 |
---|---|
id |
固定值 |
name |
操作系统的名称 |
architecture |
操作系统的架构 |
version |
操作系统的版本 |
存储指标¶
指标名称 |
单位 |
描述 |
---|---|---|
storage.free |
bytes |
给定存储库的可用存储量 |
storage.used |
bytes |
给定存储库的已用存储量 |
存储记录属性¶
属性 |
描述 |
---|---|
id |
存储库的唯一标识符 |
name |
与 id 相同并提供了一致性 |
storage.type |
|
示例查询¶
以下示例查询旨在帮助您开始使用 Openflow 遥测。
所有查询均假定 Openflow 配置为将遥测数据发送到 SNOWFLAKE.TELEMETRY.EVENTS
的默认事件表。如果您的 Snowflake 账户或 Openflow 部署配置了其他事件表,请在看到 SNOWFLAKE.TELEMETRY.EVENTS
的地方替换该表的名称。
查找卡住的 FlowFiles¶
此查询返回排队时间超过某个阈值的 FlowFiles,这表明它们可能处于卡住状态并需要干预。阈值默认设置为 30 分钟,可根据实际需求调整。
SELECT * FROM (
SELECT
resource_attributes:"openflow.dataplane.id" as Deployment_ID,
resource_attributes:"k8s.namespace.name" as Runtime_Key,
record_attributes:name as Connection_Name,
record_attributes:id as Connection_ID,
MAX(TO_NUMBER(value / 60 / 1000)) as Max_Queued_File_Minutes
FROM snowflake.telemetry.events
WHERE true
AND record_type = 'METRIC'
AND record:metric:name = 'connection.queued.duration.max'
AND timestamp > dateadd(minutes, -30, sysdate())
GROUP BY 1, 2, 3, 4
ORDER BY Max_Queued_File_Minutes DESC
) WHERE Max_Queued_File_Minutes > 30;
查找 Openflow 运行时的错误日志¶
SELECT
timestamp,
Deployment_ID,
Runtime_Key,
parsed_log:level as log_level,
parsed_log:loggerName as logger,
parsed_log:formattedMessage as message,
parsed_log
FROM (
SELECT
timestamp,
resource_attributes:"openflow.dataplane.id" as Deployment_ID,
resource_attributes:"k8s.namespace.name" as Runtime_Key,
TRY_PARSE_JSON(value) as parsed_log
FROM snowflake.telemetry.events
WHERE true
AND timestamp > dateadd('minutes', -30, sysdate())
AND record_type = 'LOG'
AND resource_attributes:"k8s.namespace.name" like 'runtime-%'
ORDER BY timestamp DESC
) WHERE log_level = 'ERROR';
查找正在运行和未运行的处理器¶
某些流程要求所有处理器处于“运行”状态,即使它们未主动处理数据。
此查询可帮助您找到任何正在运行或处于其他状态的处理器,例如:- 已停止 - 无效 - 已禁用
SELECT
timestamp,
resource_attributes:"openflow.dataplane.id" as Deployment_ID,
resource_attributes:"k8s.namespace.name" as Runtime_Key,
record_attributes:component as Processor,
record_attributes:id as Processor_ID,
TO_NUMBER(value) as Running
FROM snowflake.telemetry.events
WHERE true
AND record:metric:name = 'processor.run.status.running'
AND record_type = 'METRIC'
AND timestamp > dateadd(minutes, -30, sysdate());
查找 Openflow 运行时的高 CPU 使用率¶
数据流变慢或吞吐量下降,可能是由于 CPU 出现瓶颈导致。Openflow 运行时会根据您配置的最小和最大节点数量自动向上扩展。
如果 Openflow 运行时正在使用其最大节点数,但 CPU 使用率仍然很高,请考虑:#。增加分配给该运行时的最大节点数 #。排查连接器或数据流,找出瓶颈所在
使用 Snowsight Charts,可以方便地将 CPU 使用率随时间变化的查询结果可视化。
SELECT
timestamp,
resource_attributes:"openflow.dataplane.id" as Deployment_ID,
resource_attributes:"k8s.namespace.name" as Runtime_Key,
resource_attributes:"k8s.pod.name" as Runtime_Pod,
TO_NUMBER(value, 10, 3) * 100 as CPU_Usage_Percentage
FROM snowflake.telemetry.events
WHERE true
AND timestamp > dateadd(minute, -30, sysdate())
AND record_type = 'METRIC'
AND record:metric:name ilike 'container.cpu.usage'
AND resource_attributes:"k8s.namespace.name" ilike 'runtime-%'
AND resource_attributes:"k8s.container.name" ilike '%-server'
ORDER BY timestamp desc, CPU_Usage_Percentage desc;