监控 Openflow

本主题介绍如何监控 Openflow 的状态和解决问题。

访问 Openflow 日志

Snowflake 会将 Openflow 日志发送到您在 设置 Openflow 时配置的事件表。

Snowflake 建议您在事件表查询的 WHERE 子句中包含时间戳。这一点尤为重要,因为各种 Snowflake 组件可能会生成大量数据。通过应用筛选器,可以检索到较小的数据子集,从而提高查询性能。

要快速开始使用 Openflow 的遥测功能,请参阅下面的 示例查询

Openflow 遥测架构

有关事件表列的信息,请参阅 事件表列

以下部分介绍了 Openflow 如何在事件表中构造遥测。

资源属性

描述 Openflow 设置的事件元数据。有关其他类型资源属性的一般信息,请参阅事件表列文档中的 RESOURCE_ATTRIBUTES 列

名称

类型

描述

application

字符串

固定值 openflow

cloud.service.provider

字符串

awsazuregcpspcs 中的一个

container.id

字符串

容器的唯一标识符

container.image.name

字符串

容器镜像的完全限定名称。Openflow 运行时容器将包含本地容器注册表的路径。

例如,$accountid.dkr.ecr.$region.amazonaws.com/snowflake-openflow/runtime-server

container.image.tag

字符串

容器镜像的版本

k8s.container.name

字符串

K8s 容器的名称。Openflow 运行时容器将以“Runtime Key”开头,以 -gateway-server 结尾。

例如,名为“PostgreSQL CDC”的 Openflow 运行时 Runtime Key 为 postgresql-cdc,因此它的容器名称将为:

  • postgresql-cdc-gateway

  • postgresql-cdc-server

k8s.container.restart_count

数字字符串

此容器自创建以来重新启动的次数。

k8s.namespace.name

字符串

对于 Openflow 运行时,Pod 或容器的 K8s 命名空间以 runtime- 开头。值还包括 kube-systemopenflow-runtime-infra

k8s.node.name

字符串

托管 Pod/容器的 EKS 节点或 EKS 节点本身的内部域名。

例如,ip-10-12-13-144.us-west-2.compute.internal

k8s.pod.name

字符串

K8s Pod 的名称。Openflow 运行时 Pod 将以“Runtime Key”开头,并以每个 Pod 副本的数字标识符结尾。这个数字可以增长到为运行时设置的“最大节点数”,索引为 0。

例如,名为“PostgreSQL CDC”的 Openflow 运行时 Runtime Key 为 postgresql-cdc 和 3 个节点的 Pod 名称将为:

  • postgresql-cdc-0

  • postgresql-cdc-1

  • postgresql-cdc-2

k8s.pod.start_time

ISO 8601 日期字符串

Pod 启动时的时间戳

k8s.pod.uid

UUID 字符串

集群中 Pod 的唯一标识符

openflow.dataplane.id

UUID 字符串

Openflow 部署的唯一标识符,与 Snowflake Openflow UI 中通过“Deployment”>“View Details”显示的“ID”相匹配。

资源属性示例:
{
  "application": "openflow",
  "cloud.service.provider": "aws",
  "k8s.container.name": "pg-dev-server",
  "k8s.container.restart_count": "0",
  "k8s.namespace.name": "runtime-pg-dev",
  "k8s.node.name": "ip-10-10-62-36.us-east-2.compute.internal",
  "k8s.pod.name": "pg-dev-0",
  "k8s.pod.start_time": "2025-04-25T22:14:29Z",
  "k8s.pod.uid": "94610175-1685-4c8f-b0a1-42898d1058e6",
  "k8s.statefulset.name": "pg-dev",
  "openflow.dataplane.id": "abeddb4f-95ae-45aa-95b1-b4752f30c64a"
}
Copy

范围

名称

类型

描述

name

字符串

指标的提供商。其中之一:

  • Openflow Connector 指标:runtime

  • 系统级指标:github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver

范围示例:
{
  "name": "runtime"
}
Copy

记录类型

根据此行所表示的 Openflow 遥测类型,类型将为以下之一:

  • LOG

  • METRIC

Openflow 不会收集 TRACE 记录,但在 Snowflake 事件表中,该记录也是此列的有效类型。

记录

可选。JSON 对象描述了该行所表示的指标的类型。

名称

类型

描述

metric

对象

包含两个字段:

  • 唯一指标生成的 name,通常使用以点分隔的命名空间

  • 由类型表示的值的 unit,例如字节、纳秒和线程

name 和 unit 值差异很大。有关完整列表,请参阅下面的 应用程序指标

metric_type

字符串

其中之一:

  • 大多数 Openflow 指标的 gauge,可以增加或减少

  • Pod CPU 时间和网络 IO 等累积指标的 sum

value_type

字符串

此指标生成的值的原始类型。其中之一:

  • INT

  • DOUBLE

aggregation_temporality

字符串

可选。对于严格增加且依赖于先前值(例如 Pod CPU 时间和网络 IO)的指标,设置为累积值。

is_monotonic

布尔

可选。对于累积指标,这确实表明它在时间序列内严格增加。

记录示例:
{
  "metric": {
    "name": "connection.queued.duration.max",
    "unit": "millisecond"
  },
  "metric_type": "gauge",
  "value_type": "INT"
}
Copy

记录属性

日志

日志的记录属性通常会指示此日志的来源。例如,来自名为 testruntime 的 Openflow 运行时日志的记录属性可能为:

{
  "log.file.path": "/var/log/pods/runtime-testruntime_testruntime-0_66d80cdb-9484-40a4-bdba-f92eb0af14c7/testruntime-server/0.log",
  "log.iostream": "stdout",
  "logtag": "F"
}
Copy

系统指标

CPU 使用率等系统指标通常不会设置记录属性,因此此指标将为值为

Openflow 应用程序指标

应用程序或“流量”指标的记录属性提供有关数据管道中生成该指标的组件的详细信息。这将根据组件的类型而有所不同。请参阅 应用程序指标

{
  "component": "PutSnowpipeStreaming",
  "execution.node": "ALL",
  "group.id": "c052f9d7-7f76-3013-a2c5-d3b064fa7326",
  "id": "c69e2913-22a9-36bb-a159-6a5ed1fb9d63",
  "name": "PutSnowpipeStreaming",
  "type": "processor"
}
Copy

此列包含遥测的原始值。对于指标,这将是一个数值(整数或双精度)。对于日志,这将是一个半结构化的字符串值或格式良好的 JSON 字符串。

Openflow 运行时日志

Openflow 运行时大多数日志以 JSON 输出,因此可以对 VALUE 列使用 Snowflake 的 TRY_PARSE_JSON,将该值进一步解析为以下结构化字段:

名称

类型

描述

formattedMessage

字符串

运行时记录器输出的实际日志消息。

level

字符串

其中之一:

  • ERROR

  • WARN

  • INFO

  • DEBUG

  • TRACE

loggerName

字符串

记录器的完全限定类名。Openflow 处理器通常使用以 com.snowflake.openflow.runtime.processors 开头的记录器名称。

这对于查看特定处理器、控制器服务或捆绑库的日志很有用。

nanoseconds

整数

创建此日志消息时的纳秒级时间,从毫秒开始。

例如,纳秒值 111222333 可能对应于时间戳值 1749180210111,其纳秒值的前三位与时间戳的后三位数字相匹配。

threadName

字符串

用于处理此调用的线程名称。例如,Timer-Driven Process Thread-7

throwable

JSON 对象

如果此日志消息没有异常或堆栈跟踪,则为 null。否则,它会将堆栈跟踪记录为带有字段的 JSON 字符串:

  • className – 抛出的异常

  • message – 异常附带的消息

  • stepArray – 堆栈跟踪的方法调用数组,包含:

    • className

    • fileName

    • lineNumber

    • methodName

timestamp

整数

此日志消息的创建时间,单位为自 UNIX 纪元以来的毫秒数。

例如,1749180210044 表示该日志创建于 2025-06-05 03:23:30.044 UTC

应用程序指标

备注

以下列表涵盖了可用于 Openflow 运行时的所有应用程序指标。运行时仅输出与 Openflow 连接器相关的部分指标,以保留在 Snowflake 事件表中。

Snowflake 的 OpenTelemetry 报告任务可以将部分或全部指标发送到任何 OTLP 目的地。

连接指标

指标名称

单位

描述

connection.input.bytes

bytes

输入项目的大小

connection.input.count

items

输入项目的数量

connection.output.bytes

bytes

输出项目的大小

connection.output.count

items

输出项目的数量

connection.queued.bytes

bytes

已排队项目的大小

connection.queued.bytes.max

bytes

已排队项目的最大大小

connection.queued.count

items

已排队项目的数量

connection.queued.count.max

items

已排队项目的最大数量

connection.queued.duration.total

milliseconds

已排队项目的总计持续时间

connection.queued.duration.max

milliseconds

已排队项目的最大持续时间

connection.backpressure.threshold.bytes

bytes

在此连接中,开始施加背压之前允许排队的数据最大字节数。

connection.backpressure.threshold.objects

items

在施加背压之前,此连接中 FlowFiles 可以排队的最大数量。

connection.loadbalance.status.load_balance_not_configured

binary,0 或 1

如果连接没有配置负载平衡设置,则为 1。否则为 0。

connection.loadbalance.status.load_balance_active

binary,0 或 1

如果连接正在集群中进行负载平衡,则为 1。否则为 0。

connection.loadbalance.status.load_balance_inactive

binary,0 或 1

如果连接未在集群中进行负载平衡,则为 1。否则为 0。

连接记录属性

每个连接指标都包含以下记录属性:

属性

描述

id

连接的唯一标识符。

name

用户可见的连接名称

type

固定值 connection

source.id

向此连接发送 FlowFiles 的组件的唯一标识符

source.name

向此连接发送 FlowFiles 的组件的用户可见名称

destination.id

从此连接接收 FlowFiles 的组件的唯一标识符

destination.name

从此连接接收 FlowFiles 的组件的用户可见名称

group.id

包含此连接的流程组的唯一标识符

输入端口和输出端口指标

输入端口和输出端口在技术上属于两种不同的组件类型。为了保持一致性,输入端口和输出端口的指标和属性是相同的,唯一的例外是 type 属性,用于表示是输入端口还是输出端口。

指标名称

单位

描述

port.thread.count.active

threads

活动线程数

port.bytes.received

bytes

接收的字节数

port.bytes.sent

bytes

发送的字节数

port.flowfiles.received

flowfiles

接收的 FlowFiles 数量

port.flowfiles.sent

flowfiles

发送的 FlowFiles 数量

port.input.bytes

bytes

输入项目的大小

port.input.count

items

输入项目的数量

port.output.bytes

bytes

输出项目的大小

port.output.count

items

输出项目的数量

输入端口和输出端口记录属性

每个端口指标都包含以下记录属性:

属性

描述

id

端口的唯一标识符。

name

用户可见的端口名称

type

port-inputport-output 之一

group.id

包含此端口的流程组的唯一标识符

流程组指标

指标名称

单位

描述

processgroup.thread.count.active

threads

活动线程数

processgroup.thread.count.stateless

threads

无状态线程数

processgroup.thread.count.terminated

threads

终止线程数

processgroup.bytes.read

bytes

读取的字节数

processgroup.bytes.received

bytes

接收的字节数

processgroup.bytes.transferred

bytes

传输的字节数

processgroup.bytes.sent

bytes

发送的字节数

processgroup.bytes.written

bytes

写入的字节数

processgroup.flowfiles.received

flowfiles

接收的 FlowFiles 数量

processgroup.flowfiles.sent

flowfiles

发送的 FlowFiles 数量

processgroup.flowfiles.transferred

flowfiles

传输的 FlowFiles 数

processgroup.input.count

items

输入的项目数

processgroup.input.content.size

bytes

输入项目的大小

processgroup.output.count

items

输出的项目数

processgroup.output.content.size

bytes

输出项目的大小

processgroup.queued.count

items

已排队的项目数

processgroup.queued.content.size

bytes

已排队项目的大小

processgroup.time.processing

nanoseconds

处理所花费的时间

流程组记录属性

每个流程组指标都包含以下记录属性:

属性

描述

id

流程组的唯一标识符

name

用户可见的流程组名称

type

固定值 process-group

tree.level

相对于流的根流程组而言,流程组的深度。流程最高级别的流程组的 tree.level 为 1

处理器指标

指标名称

单位

描述

processor.thread.count.active

thread

活动线程数

processor.thread.count.terminated

thread

终止线程数

processor.time.lineage.average

nanosecond

沿袭平均持续时间

processor.invocations

invocations

调用次数

processor.bytes.read

字节

读取的字节数

processor.bytes.received

字节

接收的字节数

processor.bytes.sent

字节

发送的字节数

processor.bytes.written

字节

写入的字节数

processor.flowfiles.received

flowfiles

接收的 FlowFiles 数量

processor.flowfiles.removed

flowfiles

移除的 FlowFiles 数量

processor.flowfiles.sent

flowfiles

发送的 FlowFiles 数量

processor.input.count

item

输入的项目数

processor.input.content.size

bytes

输入项目的大小

processor.output.count

item

输出的项目数

processor.output.content.size

字节

输出项目的大小

processor.time.processing

nanosecond

处理所花费的时间

processor.run.status.running

binary,0 或 1

如果正在运行,则为 1;否则为 0

processor.run.status.stopped

binary,0 或 1

如果已停止,则为 1;否则为 0

processor.run.status.validating

binary,0 或 1

如果正在验证,则为 1;否则为 0

processor.run.status.invalid

binary,0 或 1

如果无效,则为 1;否则为 0

processor.run.status.disabled

binary,0 或 1

如果禁用,则为 1;否则为 0

processor.counter

count

计数器的值

处理器记录属性

每个处理器指标都包含以下记录属性:

属性

描述

id

处理器的唯一标识符

name

处理器的用户可见且用户可编辑的名称

type

固定值 processor

component

处理器的不可变类名。

execution.node

此处理器所配置的运行节点,取值为 ALLPRIMARY

group.id

包含此处理器的流程组的唯一标识符

计数器的附加属性

除了上述标准处理器属性外,processor.counter 指标还包括以下内容:

属性

描述

type

固定值 counter

counter

用户或系统生成的计数器名称

远程流程组指标

指标名称

单位

描述

remoteprocessgroup.thread.count.active

threads

活动线程数

remoteprocessgroup.remote.port.count.active

ports

活动远程端口数

remoteprocessgroup.remote.port.count.inactive

ports

非活动远程端口数

remoteprocessgroup.duration.lineage.average

nanoseconds

沿袭平均持续时间

remoteprocessgroup.refresh.age

milliseconds

自上次刷新以来的时间

remoteprocessgroup.received.count

items

接收项目的数量

remoteprocessgroup.received.content.size

bytes

接收项目的大小

remoteprocessgroup.sent.count

items

已发送项目的数量

remoteprocessgroup.sent.content.size

bytes

已发送项目的大小

remoteprocessgroup.transmission.status.transmitting

binary,0 或 1

如果远程流程组正在传输,则为 1。否则为 0。

remoteprocessgroup.transmission.status.nottransmitting

binary,0 或 1

如果远程流程组正在传输,则为 0。否则为 1。

远程流程组记录属性

每个远程流程组指标都包含以下记录属性:

属性

描述

id

远程流程组的唯一标识符

name

远程流程组的用户可见名称

group.id

包含此远程流程组的流程组的唯一标识符

authorization.issue

用于访问远程流程组的授权

target.uri

远程流程组的 URI

type

固定值 remote-process-group

JVM 指标

指标名称

单位

描述

jvm.memory.heap.used

bytes

JVM 堆上对象当前占用的内存量

jvm.memory.heap.committed

bytes

保证可供 JVM 堆使用的内存量

jvm.memory.heap.max

bytes

分配给 JVM 堆的最大内存量

jvm.memory.heap.init

bytes

分配给 JVM 堆的初始内存量

jvm.memory.heap.usage

percentage

JVM 堆使用率

jvm.memory.non-heap.usage

percentage

JVM 非堆使用率

jvm.memory.total.init

bytes

分配给 JVM 的初始内存量

jvm.memory.total.used

bytes

当前 JVM 使用的内存量

jvm.memory.total.max

bytes

JVM 可使用的最大内存量

jvm.memory.total.committed

bytes

保证可供 JVM 使用的内存量

jvm.threads.count

threads

活跃线程数量

jvm.threads.deadlocks

threads

JVM 线程死锁

jvm.threads.daemon.count

threads

活跃守护线程数

jvm.uptime

seconds

JVM 流程已运行的秒数

jvm.file.descriptor.usage

percentage

当前使用的可用文件描述符的百分比。

jvm.gc.G1-Concurrent-GC.runs

runs

G1 并发垃圾回收运行的总次数

jvm.gc.G1-Concurrent-GC.time

milliseconds

G1 并发垃圾回收运行的总时间

jvm.gc.G1-Young-Generation.runs

runs

G1 年轻代垃圾回收运行的总次数

jvm.gc.G1-Young-Generation.time

milliseconds

G1 年轻代垃圾回收运行的总时间

jvm.gc.G1-Old-Generation.runs

runs

G1 老年代垃圾回收运行的总次数

jvm.gc.G1-Old-Generation.time

milliseconds

G1 老年代垃圾回收运行的总时间

JVM 记录属性

JVM 指标不提供记录属性。

CPU 指标

指标名称

单位

描述

cores.available

cores

运行时可用内核的数量

cores.load

percentage

系统平均负载,若不可用则为 -1

CPU 记录属性

属性

描述

id

固定值 cpu

name

操作系统的名称

architecture

操作系统的架构

version

操作系统的版本

存储指标

指标名称

单位

描述

storage.free

bytes

给定存储库的可用存储量

storage.used

bytes

给定存储库的已用存储量

存储记录属性

属性

描述

id

存储库的唯一标识符

name

与 id 相同并提供了一致性

storage.type

flowfilecontentprovenance 之一

示例查询

以下示例查询旨在帮助您开始使用 Openflow 遥测。

所有查询均假定 Openflow 配置为将遥测数据发送到 SNOWFLAKE.TELEMETRY.EVENTS 的默认事件表。如果您的 Snowflake 账户或 Openflow 部署配置了其他事件表,请在看到 SNOWFLAKE.TELEMETRY.EVENTS 的地方替换该表的名称。

查找卡住的 FlowFiles

此查询返回排队时间超过某个阈值的 FlowFiles,这表明它们可能处于卡住状态并需要干预。阈值默认设置为 30 分钟,可根据实际需求调整。

SELECT * FROM (
  SELECT
    resource_attributes:"openflow.dataplane.id" as Deployment_ID,
    resource_attributes:"k8s.namespace.name" as Runtime_Key,
    record_attributes:name as Connection_Name,
    record_attributes:id as Connection_ID,
    MAX(TO_NUMBER(value / 60 / 1000)) as Max_Queued_File_Minutes
  FROM snowflake.telemetry.events
  WHERE true
    AND record_type = 'METRIC'
    AND record:metric:name = 'connection.queued.duration.max'
    AND timestamp > dateadd(minutes, -30, sysdate())
  GROUP BY 1, 2, 3, 4
  ORDER BY Max_Queued_File_Minutes DESC
) WHERE Max_Queued_File_Minutes > 30;
Copy

查找 Openflow 运行时的错误日志

SELECT
  timestamp,
  Deployment_ID,
  Runtime_Key,
  parsed_log:level as log_level,
  parsed_log:loggerName as logger,
  parsed_log:formattedMessage as message,
  parsed_log
FROM (
  SELECT
    timestamp,
    resource_attributes:"openflow.dataplane.id" as Deployment_ID,
    resource_attributes:"k8s.namespace.name" as Runtime_Key,
    TRY_PARSE_JSON(value) as parsed_log
  FROM snowflake.telemetry.events
  WHERE true
    AND timestamp > dateadd('minutes', -30, sysdate())
    AND record_type = 'LOG'
    AND resource_attributes:"k8s.namespace.name" like 'runtime-%'
  ORDER BY timestamp DESC
) WHERE log_level = 'ERROR';
Copy

查找正在运行和未运行的处理器

某些流程要求所有处理器处于“运行”状态,即使它们未主动处理数据。

此查询可帮助您找到任何正在运行或处于其他状态的处理器,例如:- 已停止 - 无效 - 已禁用

SELECT
  timestamp,
  resource_attributes:"openflow.dataplane.id" as Deployment_ID,
  resource_attributes:"k8s.namespace.name" as Runtime_Key,
  record_attributes:component as Processor,
  record_attributes:id as Processor_ID,
  TO_NUMBER(value) as Running
FROM snowflake.telemetry.events
WHERE true
  AND record:metric:name = 'processor.run.status.running'
  AND record_type = 'METRIC'
  AND timestamp > dateadd(minutes, -30, sysdate());
Copy

查找 Openflow 运行时的高 CPU 使用率

数据流变慢或吞吐量下降,可能是由于 CPU 出现瓶颈导致。Openflow 运行时会根据您配置的最小和最大节点数量自动向上扩展。

如果 Openflow 运行时正在使用其最大节点数,但 CPU 使用率仍然很高,请考虑:#。增加分配给该运行时的最大节点数 #。排查连接器或数据流,找出瓶颈所在

使用 Snowsight Charts,可以方便地将 CPU 使用率随时间变化的查询结果可视化。

SELECT
  timestamp,
  resource_attributes:"openflow.dataplane.id" as Deployment_ID,
  resource_attributes:"k8s.namespace.name" as Runtime_Key,
  resource_attributes:"k8s.pod.name" as Runtime_Pod,
  TO_NUMBER(value, 10, 3) * 100 as CPU_Usage_Percentage
FROM snowflake.telemetry.events
WHERE true
  AND timestamp > dateadd(minute, -30, sysdate())
  AND record_type = 'METRIC'
  AND record:metric:name ilike 'container.cpu.usage'
  AND resource_attributes:"k8s.namespace.name" ilike 'runtime-%'
  AND resource_attributes:"k8s.container.name" ilike '%-server'
ORDER BY timestamp desc, CPU_Usage_Percentage desc;
Copy
语言: 中文