Monitor Openflow using telemetry data

本主题介绍如何监控 Openflow 的状态和解决问题。

访问 Openflow 日志

Snowflake sends Openflow logs to the event table you configured when you set up Openflow (BYOC | Snowflake deployment).

Snowflake 建议您在事件表查询的 WHERE 子句中包含时间戳。这一点尤为重要,因为各种 Snowflake 组件可能会生成大量数据。通过应用筛选器,可以检索到较小的数据子集,从而提高查询性能。

To get started quickly with Openflow’s telemetry, see Example Queries below.

Openflow 遥测架构

For information about the event table columns, see Event table columns.

以下部分介绍了 Openflow 如何在事件表中构造遥测。

资源属性

Describes the event metadata set by Openflow. For general information on other types of resource attributes see RESOURCE_ATTRIBUTES column in the Event Table columns documentation.

NameTypeDescription
applicationStringThe fixed value openflow
cloud.service.providerStringOne of aws, snowflake
container.idStringUnique identifier of the container
container.image.nameString

Fully qualified name of the container image. All Openflow images are hosted by Snowflake repositories.

For example, <account>-openflow-<env>.registry-internal.snowflakecomputing.cn/openflow/openflow/openflow_repo/runtime-server

container.image.tagStringVersion of the container image
k8s.container.nameString

The name of the K8s container. Openflow Runtime containers will start with the “Runtime Key” and end with -gateway or -server.

例如,名为“PostgreSQL CDC”的 Openflow 运行时 Runtime Key 为 postgresql-cdc,因此它的容器名称将为:

  • postgresql-cdc-gateway
  • postgresql-cdc-server
k8s.container.restart_countNumeric StringThe number of times this container has restarted since it was created.
k8s.namespace.nameStringK8s namespace of the pod or container, starting with runtime- for Openflow Runtimes. Values also include kube-system and openflow-runtime-infra.
k8s.node.nameString

托管 Pod/容器的 EKS 节点或 EKS 节点本身的内部域名。

例如,ip-10-12-13-144.us-west-2.compute.internal

k8s.pod.nameString

K8s Pod 的名称。Openflow 运行时 Pod 将以“Runtime Key”开头,并以每个 Pod 副本的数字标识符结尾。这个数字可以增长到为运行时设置的“最大节点数”,索引为 0。

例如,名为“PostgreSQL CDC”的 Openflow 运行时 Runtime Key 为 postgresql-cdc 和 3 个节点的 Pod 名称将为:

  • postgresql-cdc-0
  • postgresql-cdc-1
  • postgresql-cdc-2
k8s.pod.start_timeISO 8601 Date StringTimestamp that the pod was started
k8s.pod.uidUUID StringUnique identifier of the pod within the cluster
deployment.versionStringThe Openflow deployment version.
openflow.dataplane.idUUID StringThe unique identifier of the Openflow Deployment, matching the “ID” shown in the Snowflake Openflow UI through Deployment > View Details.
Resource Attributes Example:
{
  "application": "openflow",
  "cloud.service.provider": "aws",
  "container.id": "a1b2c3d4e5f6",
  "container.image.name": "example-openflow-prod.registry-internal.snowflakecomputing.cn/openflow/openflow/openflow_repo/runtime-server",
  "container.image.tag": "2026.3.17.13",
  "deployment.version": "1.35.0",
  "k8s.container.name": "pg-dev-server",
  "k8s.container.restart_count": "0",
  "k8s.namespace.name": "runtime-pg-dev",
  "k8s.node.name": "ip-10-10-62-36.us-east-2.compute.internal",
  "k8s.pod.name": "pg-dev-0",
  "k8s.pod.start_time": "2025-04-25T22:14:29Z",
  "k8s.pod.uid": "94610175-1685-4c8f-b0a1-42898d1058e6",
  "openflow.dataplane.id": "abeddb4f-95ae-45aa-95b1-b4752f30c64a"
}

范围

NameTypeDescription
nameString

指标的提供商。其中之一:

  • runtime for Openflow Connector metrics
  • github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver for system-level metrics
Scope Example:
{
  "name": "runtime"
}

记录类型

根据此行所表示的 Openflow 遥测类型,类型将为以下之一:

  • LOG
  • METRIC

Openflow 不会收集 TRACE 记录,但在 Snowflake 事件表中,该记录也是此列的有效类型。

记录

可选。JSON 对象描述了该行所表示的指标的类型。

NameTypeDescription
metricObject

包含两个字段:

  • name for the unique metric produced, typically using dot-delimited namespaces
  • unit for the value represented by the type, such as byte, nanosecond, and thread

The name and unit values vary widely. For the full list, see Application Metrics below.

metric_typeString

其中之一:

  • gauge for most Openflow metrics, a snapshot value that can increase or decrease
  • sum for cumulative metrics like pod CPU time and network IO
value_typeString

此指标生成的值的原始类型。其中之一:

  • INT
  • DOUBLE
aggregation_temporalityStringOptional. Set to cumulative for metrics that are strictly increasing and dependent on previous values, such as pod CPU time and network IO.
is_monotonicBooleanOptional. For cumulative metrics, this is true to show that it is strictly increasing within the time series.
Record Example:
{
  "metric": {
    "name": "connection.queued.duration.max",
    "unit": "millisecond"
  },
  "metric_type": "gauge",
  "value_type": "INT"
}

记录属性

日志

Record attributes for Logs will typically indicate where this log was sourced. For example, logs from an Openflow Runtime named testruntime could have Record Attributes of:

{
  "log.file.path": "/var/log/pods/runtime-testruntime_testruntime-0_66d80cdb-9484-40a4-bdba-f92eb0af14c7/testruntime-server/0.log",
  "log.iostream": "stdout",
  "logtag": "F"
}

系统指标

System metrics like CPU usage will typically not set Record Attributes, so this will be null.

Openflow 应用程序指标

Record Attributes for Application or “Flow” metrics provide details about the component in the data pipeline that produced the metric. This will vary based on the type of component. See Application Metrics

{
  "component": "PutSnowpipeStreaming",
  "execution.node": "ALL",
  "group.id": "c052f9d7-7f76-3013-a2c5-d3b064fa7326",
  "id": "c69e2913-22a9-36bb-a159-6a5ed1fb9d63",
  "name": "PutSnowpipeStreaming",
  "type": "processor"
}

此列包含遥测的原始值。对于指标,这将是一个数值(整数或双精度)。对于日志,这将是一个半结构化的字符串值或格式良好的 JSON 字符串。

Openflow 运行时日志

Openflow Runtimes emit most logs as JSON, so applying Snowflake’s TRY_PARSE_JSON to the VALUE column allows you to further break this value into the following structured fields:

NameTypeDescription
formattedMessageStringThe actual log message emitted from the Runtime logger.
levelString

其中之一:

  • ERROR
  • WARN
  • INFO
  • DEBUG
  • TRACE
loggerNameString

The fully qualified classname for the logger. Openflow processors will typically use logger names that start with com.snowflake.openflow.runtime.processors.

这对于查看特定处理器、控制器服务或捆绑库的日志很有用。

nanosecondsInteger

创建此日志消息时的纳秒级时间,从毫秒开始。

例如,纳秒值 111222333 可能对应于时间戳值 1749180210111,其纳秒值的前三位与时间戳的后三位数字相匹配。

threadNameStringName of the thread handling this call. For example, Timer-Driven Process Thread-7
throwableJSON Object

null when there is no exception or stacktrace for this log message. Otherwise, it logs the stacktrace as a JSON string with fields:

  • className - the exception thrown
  • message - any message logged with the exception
  • stepArray - array of method calls for the stack trace, including:
    • className
    • fileName
    • lineNumber
    • methodName
timestampInteger

此日志消息的创建时间,单位为自 UNIX 纪元以来的毫秒数。

例如,1749180210044 表示该日志创建于 2025-06-05 03:23:30.044 UTC

mdcJSON Object

Mapped Diagnostic Context (MDC) providing additional flow-level context for the log entry. Contains the following fields:

  • processGroupId - unique identifier of the process group
  • processGroupIdPath - hierarchical path of process group IDs
  • processGroupName - name of the process group
  • processGroupNamePath - hierarchical path of process group names
  • registeredFlowIdentifier - identifier of the registered flow (present for all versioned flows, including out-of-the-box Openflow connectors)
  • registeredFlowVersion - version of the registered flow (present for all versioned flows, including out-of-the-box Openflow connectors)

For example:

{
  "processGroupId": "6dc1d98f-019d-1000-ffff-ffffa3ba8a09",
  "processGroupIdPath": "/58385a8b-019d-1000-2a52-9ef1c34b0e5f/6dc1d98f-019d-1000-ffff-ffffa3ba8a09",
  "processGroupName": "latency targets",
  "processGroupNamePath": "/Openflow/latency targets",
  "registeredFlowIdentifier": "sqlserver-multidatabase",
  "registeredFlowVersion": "0.29.0-ebb7a257"
}

应用程序指标

Note

以下列表涵盖了可用于 Openflow 运行时的所有应用程序指标。运行时仅输出与 Openflow 连接器相关的部分指标,以保留在 Snowflake 事件表中。

Snowflake 的 OpenTelemetry 报告任务可以将部分或全部指标发送到任何 OTLP 目的地。

连接指标

Metric NameUnitDescription
connection.input.bytesbytesSize of Items Input
connection.input.countitemsCount of Items Input
connection.output.bytesbytesSize of Items Output
connection.output.countitemsCount of Items Output
connection.queued.bytesbytesSize of Items Queued
connection.queued.bytes.maxbytesMax Size of Items Queued
connection.queued.countitemsCount of Items Queued
connection.queued.count.maxitemsMax Count of Items Queued
connection.queued.duration.totalmillisecondsTotal Duration of Queued Items
connection.queued.duration.maxmillisecondsMax Duration of Queued Items
connection.backpressure.threshold.bytesbytesThe maximum size of data in bytes that can be queued in this connection before it applies back pressure.
connection.backpressure.threshold.objectsitemsThe configured maximum number of FlowFiles that can be queued in this connection before it applies back pressure.
connection.loadbalance.status.load_balance_not_configuredbinary, 0 or 11 if the connection does not have a configured load balance setting. Otherwise, 0.
connection.loadbalance.status.load_balance_activebinary, 0 or 11 if the connection is load balancing across the cluster. Otherwise, 0.
connection.loadbalance.status.load_balance_inactivebinary, 0 or 11 if the connection is not load balancing across the cluster. Otherwise, 0.

连接记录属性

每个连接指标都包含以下记录属性:

AttributeDescription
idThe unique identifier of the connection
nameThe user-visible name of the connection
typeThe fixed value connection
source.idThe unique identifier of the component that is sending FlowFiles to this connection
source.nameThe user-visible name of the component that is sending FlowFiles to this connection
destination.idThe unique identifier of the component that is receiving FlowFiles from this connection
destination.nameThe user-visible name of the component that is receiving FlowFiles from this connection
group.idThe unique identifier of the Process Group that contains this Connection

输入端口和输出端口指标

Input Port and Output Ports are technically two separate types of components. For consistency, metrics and attributes for Input and Output Ports are the same, with the exception of the type attribute that indicates whether it is an input port or an output port.

Metric NameUnitDescription
port.thread.count.activethreadsNumber of Active Threads
port.bytes.receivedbytesNumber of Bytes Received
port.bytes.sentbytesNumber of Bytes Sent
port.flowfiles.receivedflowfilesNumber of FlowFiles Received
port.flowfiles.sentflowfilesNumber of FlowFiles Sent
port.input.bytesbytesSize of Items Input
port.input.countitemsCount of Items Input
port.output.bytesbytesSize of Items Output
port.output.countitemsCount of Items Output

输入端口和输出端口记录属性

每个端口指标都包含以下记录属性:

AttributeDescription
idThe unique identifier of the port
nameThe user-visible name of the port
typeOne of port-input or port-output
group.idThe unique identifier of the Process Group that contains this Port

流程组指标

Metric NameUnitDescription
processgroup.thread.count.activethreadsNumber of Active Threads
processgroup.thread.count.statelessthreadsNumber of Stateless Threads
processgroup.thread.count.terminatedthreadsNumber of Terminated Threads
processgroup.bytes.readbytesNumber of Bytes Read
processgroup.bytes.receivedbytesNumber of Bytes Received
processgroup.bytes.transferredbytesNumber of Bytes Transferred
processgroup.bytes.sentbytesNumber of Bytes Sent
processgroup.bytes.writtenbytesNumber of Bytes Written
processgroup.flowfiles.receivedflowfilesNumber of FlowFiles Received
processgroup.flowfiles.sentflowfilesNumber of FlowFiles Sent
processgroup.flowfiles.transferredflowfilesNumber of FlowFiles Transferred
processgroup.input.countitemsNumber of Items Input
processgroup.input.content.sizebytesSize of Items Input
processgroup.output.countitemsNumber of Items Output
processgroup.output.content.sizebytesSize of Items Output
processgroup.queued.countitemsNumber of Items Queued
processgroup.queued.content.sizebytesSize of Items Queued
processgroup.time.processingnanosecondsTime Spent Processing

流程组记录属性

每个流程组指标都包含以下记录属性:

AttributeDescription
idThe unique identifier of the Process Group
nameThe user-visible name of the Process Group
typeThe fixed value process-group
tree.levelThe depth of the Process Group, relative to the root process group of the flow. Process Groups at the highest level of the flow will have a tree.level of 1

处理器指标

Metric NameUnitDescription
processor.thread.count.activethreadNumber of Active Threads
processor.thread.count.terminatedthreadNumber of Terminated Threads
processor.time.lineage.averagenanosecondAverage Lineage Duration
processor.invocationsinvocationsNumber of Invocations
processor.bytes.readbyteNumber of Bytes Read
processor.bytes.receivedbyteNumber of Bytes Received
processor.bytes.sentbyteNumber of Bytes Sent
processor.bytes.writtenbyteNumber of Bytes Written
processor.flowfiles.receivedflowfilesNumber of FlowFiles Received
processor.flowfiles.removedflowfilesNumber of FlowFiles Removed
processor.flowfiles.sentflowfilesNumber of FlowFiles Sent
processor.input.countitemNumber of Items Input
processor.input.content.sizebytesSize of Items Input
processor.output.countitemNumber of Items Output
processor.output.content.sizebyteSize of Items Output
processor.time.processingnanosecondTime Spent Processing
processor.run.status.runningbinary, 0 or 11 if running; 0 otherwise
processor.run.status.stoppedbinary, 0 or 11 if stopped; 0 otherwise
processor.run.status.validatingbinary, 0 or 11 if validating; 0 otherwise
processor.run.status.invalidbinary, 0 or 11 if invalid; 0 otherwise
processor.run.status.disabledbinary, 0 or 11 if disabled; 0 otherwise
processor.countercountValue of the counter

处理器记录属性

每个处理器指标都包含以下记录属性:

AttributeDescription
idThe unique identifier of the processor
nameThe user-visible and user-editable name of the Processor
typeThe fixed value processor
componentThe immutable class name of the processor.
execution.nodeEither ALL or PRIMARY, depending on how this Processor is configured to run
group.idThe unique identifier of the Process Group that contains this Processor

计数器的附加属性

In addition to the standard Processor attributes above, processor.counter metrics include the following:

AttributeDescription
typeThe fixed value counter
counterThe user- or system-generated name of the counter

远程流程组指标

Metric NameUnitDescription
remoteprocessgroup.thread.count.activethreadsNumber of Active Threads
remoteprocessgroup.remote.port.count.activeportsNumber of Active Remote Ports
remoteprocessgroup.remote.port.count.inactiveportsNumber of Inactive Remote Ports
remoteprocessgroup.duration.lineage.averagenanosecondsAverage Lineage Duration
remoteprocessgroup.refresh.agemillisecondsTime since last refresh
remoteprocessgroup.received.countitemsNumber of Received Items
remoteprocessgroup.received.content.sizebytesSize of Received Items
remoteprocessgroup.sent.countitemsNumber of Sent Items
remoteprocessgroup.sent.content.sizebytesSize of Sent Items
remoteprocessgroup.transmission.status.transmittingbinary, 0 or 11 if the Remote Process Group is transmitting. Otherwise, 0.
remoteprocessgroup.transmission.status.nottransmittingbinary, 0 or 10 if the Remote Process Group is transmitting. Otherwise, 1.

远程流程组记录属性

每个远程流程组指标都包含以下记录属性:

AttributeDescription
idThe unique identifier of the remote process group
nameThe user-visible name of the Remote Process Group
group.idThe unique identifier of the Process Group that contains this Remote Process Group
authorization.issueThe Authorization used to access the Remote Process Group
target.uriThe URI of the Remote Process Group
typeThe fixed value remote-process-group

JVM 指标

Metric NameUnitDescription
jvm.memory.heap.usedbytesThe amount of memory currently occupied by objects on the JVM Heap
jvm.memory.heap.committedbytesThe amount of memory guaranteed to be available for use by the JVM Heap
jvm.memory.heap.maxbytesMaximum amount of memory allocated for the JVM Heap
jvm.memory.heap.initbytesInitial amount of memory allocated for the JVM Heap
jvm.memory.heap.usagepercentageJVM Heap Usage
jvm.memory.non-heap.usagepercentageJVM Non-Heap Usage
jvm.memory.total.initbytesInitial amount of memory allocated for the JVM
jvm.memory.total.usedbytesCurrent amount of memory used by the JVM
jvm.memory.total.maxbytesMaximum amount of memory that can be used by the JVM
jvm.memory.total.committedbytesThe amount of memory guaranteed to be available for use by the JVM
jvm.threads.countthreadsNumber of live threads
jvm.threads.deadlocksthreadsJVM Thread Deadlocks
jvm.threads.daemon.countthreadsNumber of live daemon threads
jvm.uptimesecondsNumber of seconds the JVM process has been running
jvm.file.descriptor.usagepercentagePercentage of available file descriptors currently in use.
jvm.gc.G1-Concurrent-GC.runsrunsTotal number of times that the G1 Concurrent Garbage Collection has run
jvm.gc.G1-Concurrent-GC.timemillisecondsTotal amount of time that the G1 Concurrent Garbage Collection has been running
jvm.gc.G1-Young-Generation.runsrunsTotal number of times that the G1 Young Generation has run
jvm.gc.G1-Young-Generation.timemillisecondsTotal amount of time that the G1 Young Generation has been running
jvm.gc.G1-Old-Generation.runsrunsTotal number of times that the G1 Old Generation has run
jvm.gc.G1-Old-Generation.timemillisecondsTotal amount of time that the G1 Old Generation has been running

JVM 记录属性

JVM 指标不提供记录属性。

CPU 指标

Metric NameUnitDescription
cores.availablecoresThe number of available cores for the Runtime
cores.loadpercentageEither the system load average or -1 if it is not available

CPU 记录属性

AttributeDescription
idThe fixed value cpu
nameThe name of the operating system
architectureThe architecture of the operating system
versionThe version of the operating system

存储指标

Metric NameUnitDescription
storage.freebytesThe amount of free storage for a given repository
storage.usedbytesThe amount of used storage for a given repository

存储记录属性

AttributeDescription
idThe unique identifier of the storage repository
nameSame as id and provided for consistency
storage.typeOne of flowfile, content, or provenance

示例查询

以下示例查询旨在帮助您开始使用 Openflow 遥测。

All queries assume that Openflow is configured to send telemetry to the default Event Table of SNOWFLAKE.TELEMETRY.EVENTS. If your Snowflake Account or Openflow Deployment is configured with a different Event Table, substitute that table name where you see SNOWFLAKE.TELEMETRY.EVENTS.

查找卡住的 FlowFiles

此查询返回排队时间超过某个阈值的 FlowFiles,这表明它们可能处于卡住状态并需要干预。阈值默认设置为 30 分钟,可根据实际需求调整。

SELECT * FROM (
  SELECT
    resource_attributes:"openflow.dataplane.id" as Deployment_ID,
    resource_attributes:"k8s.namespace.name" as Runtime_Key,
    record_attributes:name as Connection_Name,
    record_attributes:id as Connection_ID,
    MAX(TO_NUMBER(value / 60 / 1000)) as Max_Queued_File_Minutes
  FROM snowflake.telemetry.events
  WHERE true
    AND record_type = 'METRIC'
    AND record:metric:name = 'connection.queued.duration.max'
    AND timestamp > dateadd(minutes, -30, sysdate())
  GROUP BY 1, 2, 3, 4
  ORDER BY Max_Queued_File_Minutes DESC
) WHERE Max_Queued_File_Minutes > 30;

查找 Openflow 运行时的错误日志

SELECT
  timestamp,
  Deployment_ID,
  Runtime_Key,
  parsed_log:level as log_level,
  parsed_log:loggerName as logger,
  parsed_log:formattedMessage as message,
  parsed_log
FROM (
  SELECT
    timestamp,
    resource_attributes:"openflow.dataplane.id" as Deployment_ID,
    resource_attributes:"k8s.namespace.name" as Runtime_Key,
    TRY_PARSE_JSON(value) as parsed_log
  FROM snowflake.telemetry.events
  WHERE true
    AND timestamp > dateadd('minutes', -30, sysdate())
    AND record_type = 'LOG'
    AND resource_attributes:"k8s.namespace.name" like 'runtime-%'
  ORDER BY timestamp DESC
) WHERE log_level = 'ERROR';

查找正在运行和未运行的处理器

某些流程要求所有处理器处于“运行”状态,即使它们未主动处理数据。

此查询可帮助您找到任何正在运行或处于其他状态的处理器,例如:

  • 已停止
  • invalid
  • 已禁用
SELECT
  timestamp,
  resource_attributes:"openflow.dataplane.id" as Deployment_ID,
  resource_attributes:"k8s.namespace.name" as Runtime_Key,
  record_attributes:component as Processor,
  record_attributes:id as Processor_ID,
  TO_NUMBER(value) as Running
FROM snowflake.telemetry.events
WHERE true
  AND record:metric:name = 'processor.run.status.running'
  AND record_type = 'METRIC'
  AND timestamp > dateadd(minutes, -30, sysdate());

查找 Openflow 运行时的高 CPU 使用率

数据流变慢或吞吐量下降,可能是由于 CPU 出现瓶颈导致。Openflow 运行时会根据您配置的最小和最大节点数量自动向上扩展。

如果 Openflow 运行时正在使用其最大节点数,但 CPU 使用率仍然很高,请考虑:

  1. 增加分配给该运行时的最大节点数
  2. 排查连接器或数据流,找出瓶颈所在

使用 Snowsight Charts,可以方便地将 CPU 使用率随时间变化的查询结果可视化。

SELECT
  timestamp,
  resource_attributes:"openflow.dataplane.id" as Deployment_ID,
  resource_attributes:"k8s.namespace.name" as Runtime_Key,
  resource_attributes:"k8s.pod.name" as Runtime_Pod,
  TO_NUMBER(value, 10, 3) * 100 as CPU_Usage_Percentage
FROM snowflake.telemetry.events
WHERE true
  AND timestamp > dateadd(minute, -30, sysdate())
  AND record_type = 'METRIC'
  AND record:metric:name ilike 'container.cpu.usage'
  AND resource_attributes:"k8s.namespace.name" ilike 'runtime-%'
  AND resource_attributes:"k8s.container.name" ilike '%-server'
ORDER BY timestamp desc, CPU_Usage_Percentage desc;