Monitor Openflow using telemetry data¶
本主题介绍如何监控 Openflow 的状态和解决问题。
访问 Openflow 日志¶
Snowflake sends Openflow logs to the event table you configured when you set up Openflow (BYOC | Snowflake deployment).
Snowflake 建议您在事件表查询的 WHERE 子句中包含时间戳。这一点尤为重要,因为各种 Snowflake 组件可能会生成大量数据。通过应用筛选器,可以检索到较小的数据子集,从而提高查询性能。
To get started quickly with Openflow’s telemetry, see Example Queries below.
Openflow 遥测架构¶
For information about the event table columns, see Event table columns.
以下部分介绍了 Openflow 如何在事件表中构造遥测。
资源属性
Describes the event metadata set by Openflow. For general information on other types of resource attributes see RESOURCE_ATTRIBUTES column in the Event Table columns documentation.
| Name | Type | Description |
|---|---|---|
| application | String | The fixed value openflow |
| cloud.service.provider | String | One of aws, snowflake |
| container.id | String | Unique identifier of the container |
| container.image.name | String | Fully qualified name of the container image. All Openflow images are hosted by Snowflake repositories. For example, <account>-openflow-<env>.registry-internal.snowflakecomputing.cn/openflow/openflow/openflow_repo/runtime-server |
| container.image.tag | String | Version of the container image |
| k8s.container.name | String | The name of the K8s container. Openflow Runtime containers will start with the “Runtime Key” and end with -gateway or -server. 例如,名为“PostgreSQL CDC”的 Openflow 运行时 Runtime Key 为 postgresql-cdc,因此它的容器名称将为:
|
| k8s.container.restart_count | Numeric String | The number of times this container has restarted since it was created. |
| k8s.namespace.name | String | K8s namespace of the pod or container, starting with runtime- for Openflow Runtimes. Values also include kube-system and openflow-runtime-infra. |
| k8s.node.name | String | 托管 Pod/容器的 EKS 节点或 EKS 节点本身的内部域名。 例如,ip-10-12-13-144.us-west-2.compute.internal |
| k8s.pod.name | String | K8s Pod 的名称。Openflow 运行时 Pod 将以“Runtime Key”开头,并以每个 Pod 副本的数字标识符结尾。这个数字可以增长到为运行时设置的“最大节点数”,索引为 0。 例如,名为“PostgreSQL CDC”的 Openflow 运行时 Runtime Key 为 postgresql-cdc 和 3 个节点的 Pod 名称将为:
|
| k8s.pod.start_time | ISO 8601 Date String | Timestamp that the pod was started |
| k8s.pod.uid | UUID String | Unique identifier of the pod within the cluster |
| deployment.version | String | The Openflow deployment version. |
| openflow.dataplane.id | UUID String | The unique identifier of the Openflow Deployment, matching the “ID” shown in the Snowflake Openflow UI through Deployment > View Details. |
- Resource Attributes Example:
范围
| Name | Type | Description |
|---|---|---|
| name | String | 指标的提供商。其中之一:
|
- Scope Example:
记录类型
根据此行所表示的 Openflow 遥测类型,类型将为以下之一:
- LOG
- METRIC
Openflow 不会收集 TRACE 记录,但在 Snowflake 事件表中,该记录也是此列的有效类型。
记录
可选。JSON 对象描述了该行所表示的指标的类型。
| Name | Type | Description |
|---|---|---|
| metric | Object | 包含两个字段:
The name and unit values vary widely. For the full list, see Application Metrics below. |
| metric_type | String | 其中之一:
|
| value_type | String | 此指标生成的值的原始类型。其中之一:
|
| aggregation_temporality | String | Optional. Set to cumulative for metrics that are strictly increasing and dependent on previous values, such as pod CPU time and network IO. |
| is_monotonic | Boolean | Optional. For cumulative metrics, this is true to show that it is strictly increasing within the time series. |
- Record Example:
记录属性
日志
Record attributes for Logs will typically indicate where this log was sourced. For example, logs from an Openflow Runtime named testruntime could have Record Attributes of:
系统指标
System metrics like CPU usage will typically not set Record Attributes, so this will be null.
Openflow 应用程序指标¶
Record Attributes for Application or “Flow” metrics provide details about the component in the data pipeline that produced the metric. This will vary based on the type of component. See Application Metrics
值
此列包含遥测的原始值。对于指标,这将是一个数值(整数或双精度)。对于日志,这将是一个半结构化的字符串值或格式良好的 JSON 字符串。
Openflow 运行时日志¶
Openflow Runtimes emit most logs as JSON, so applying Snowflake’s TRY_PARSE_JSON to the VALUE column allows you to further break this value into the following structured fields:
| Name | Type | Description |
|---|---|---|
| formattedMessage | String | The actual log message emitted from the Runtime logger. |
| level | String | 其中之一:
|
| loggerName | String | The fully qualified classname for the logger. Openflow processors will typically use logger names that start with com.snowflake.openflow.runtime.processors. 这对于查看特定处理器、控制器服务或捆绑库的日志很有用。 |
| nanoseconds | Integer | 创建此日志消息时的纳秒级时间,从毫秒开始。 例如,纳秒值 111222333 可能对应于时间戳值 1749180210111,其纳秒值的前三位与时间戳的后三位数字相匹配。 |
| threadName | String | Name of the thread handling this call. For example, Timer-Driven Process Thread-7 |
| throwable | JSON Object | null when there is no exception or stacktrace for this log message. Otherwise, it logs the stacktrace as a JSON string with fields:
|
| timestamp | Integer | 此日志消息的创建时间,单位为自 UNIX 纪元以来的毫秒数。 例如,1749180210044 表示该日志创建于 2025-06-05 03:23:30.044 UTC |
| mdc | JSON Object | Mapped Diagnostic Context (MDC) providing additional flow-level context for the log entry. Contains the following fields:
For example: |
应用程序指标
Note
以下列表涵盖了可用于 Openflow 运行时的所有应用程序指标。运行时仅输出与 Openflow 连接器相关的部分指标,以保留在 Snowflake 事件表中。
Snowflake 的 OpenTelemetry 报告任务可以将部分或全部指标发送到任何 OTLP 目的地。
连接指标
| Metric Name | Unit | Description |
|---|---|---|
| connection.input.bytes | bytes | Size of Items Input |
| connection.input.count | items | Count of Items Input |
| connection.output.bytes | bytes | Size of Items Output |
| connection.output.count | items | Count of Items Output |
| connection.queued.bytes | bytes | Size of Items Queued |
| connection.queued.bytes.max | bytes | Max Size of Items Queued |
| connection.queued.count | items | Count of Items Queued |
| connection.queued.count.max | items | Max Count of Items Queued |
| connection.queued.duration.total | milliseconds | Total Duration of Queued Items |
| connection.queued.duration.max | milliseconds | Max Duration of Queued Items |
| connection.backpressure.threshold.bytes | bytes | The maximum size of data in bytes that can be queued in this connection before it applies back pressure. |
| connection.backpressure.threshold.objects | items | The configured maximum number of FlowFiles that can be queued in this connection before it applies back pressure. |
| connection.loadbalance.status.load_balance_not_configured | binary, 0 or 1 | 1 if the connection does not have a configured load balance setting. Otherwise, 0. |
| connection.loadbalance.status.load_balance_active | binary, 0 or 1 | 1 if the connection is load balancing across the cluster. Otherwise, 0. |
| connection.loadbalance.status.load_balance_inactive | binary, 0 or 1 | 1 if the connection is not load balancing across the cluster. Otherwise, 0. |
连接记录属性
每个连接指标都包含以下记录属性:
| Attribute | Description |
|---|---|
| id | The unique identifier of the connection |
| name | The user-visible name of the connection |
| type | The fixed value connection |
| source.id | The unique identifier of the component that is sending FlowFiles to this connection |
| source.name | The user-visible name of the component that is sending FlowFiles to this connection |
| destination.id | The unique identifier of the component that is receiving FlowFiles from this connection |
| destination.name | The user-visible name of the component that is receiving FlowFiles from this connection |
| group.id | The unique identifier of the Process Group that contains this Connection |
输入端口和输出端口指标
Input Port and Output Ports are technically two separate types of components. For consistency, metrics and attributes for Input and Output Ports are the same, with the exception of the type attribute that indicates whether it is an input port or an output port.
| Metric Name | Unit | Description |
|---|---|---|
| port.thread.count.active | threads | Number of Active Threads |
| port.bytes.received | bytes | Number of Bytes Received |
| port.bytes.sent | bytes | Number of Bytes Sent |
| port.flowfiles.received | flowfiles | Number of FlowFiles Received |
| port.flowfiles.sent | flowfiles | Number of FlowFiles Sent |
| port.input.bytes | bytes | Size of Items Input |
| port.input.count | items | Count of Items Input |
| port.output.bytes | bytes | Size of Items Output |
| port.output.count | items | Count of Items Output |
输入端口和输出端口记录属性
每个端口指标都包含以下记录属性:
| Attribute | Description |
|---|---|
| id | The unique identifier of the port |
| name | The user-visible name of the port |
| type | One of port-input or port-output |
| group.id | The unique identifier of the Process Group that contains this Port |
流程组指标
| Metric Name | Unit | Description |
|---|---|---|
| processgroup.thread.count.active | threads | Number of Active Threads |
| processgroup.thread.count.stateless | threads | Number of Stateless Threads |
| processgroup.thread.count.terminated | threads | Number of Terminated Threads |
| processgroup.bytes.read | bytes | Number of Bytes Read |
| processgroup.bytes.received | bytes | Number of Bytes Received |
| processgroup.bytes.transferred | bytes | Number of Bytes Transferred |
| processgroup.bytes.sent | bytes | Number of Bytes Sent |
| processgroup.bytes.written | bytes | Number of Bytes Written |
| processgroup.flowfiles.received | flowfiles | Number of FlowFiles Received |
| processgroup.flowfiles.sent | flowfiles | Number of FlowFiles Sent |
| processgroup.flowfiles.transferred | flowfiles | Number of FlowFiles Transferred |
| processgroup.input.count | items | Number of Items Input |
| processgroup.input.content.size | bytes | Size of Items Input |
| processgroup.output.count | items | Number of Items Output |
| processgroup.output.content.size | bytes | Size of Items Output |
| processgroup.queued.count | items | Number of Items Queued |
| processgroup.queued.content.size | bytes | Size of Items Queued |
| processgroup.time.processing | nanoseconds | Time Spent Processing |
流程组记录属性
每个流程组指标都包含以下记录属性:
| Attribute | Description |
|---|---|
| id | The unique identifier of the Process Group |
| name | The user-visible name of the Process Group |
| type | The fixed value process-group |
| tree.level | The depth of the Process Group, relative to the root process group of the flow. Process Groups at the highest level of the flow will have a tree.level of 1 |
处理器指标
| Metric Name | Unit | Description |
|---|---|---|
| processor.thread.count.active | thread | Number of Active Threads |
| processor.thread.count.terminated | thread | Number of Terminated Threads |
| processor.time.lineage.average | nanosecond | Average Lineage Duration |
| processor.invocations | invocations | Number of Invocations |
| processor.bytes.read | byte | Number of Bytes Read |
| processor.bytes.received | byte | Number of Bytes Received |
| processor.bytes.sent | byte | Number of Bytes Sent |
| processor.bytes.written | byte | Number of Bytes Written |
| processor.flowfiles.received | flowfiles | Number of FlowFiles Received |
| processor.flowfiles.removed | flowfiles | Number of FlowFiles Removed |
| processor.flowfiles.sent | flowfiles | Number of FlowFiles Sent |
| processor.input.count | item | Number of Items Input |
| processor.input.content.size | bytes | Size of Items Input |
| processor.output.count | item | Number of Items Output |
| processor.output.content.size | byte | Size of Items Output |
| processor.time.processing | nanosecond | Time Spent Processing |
| processor.run.status.running | binary, 0 or 1 | 1 if running; 0 otherwise |
| processor.run.status.stopped | binary, 0 or 1 | 1 if stopped; 0 otherwise |
| processor.run.status.validating | binary, 0 or 1 | 1 if validating; 0 otherwise |
| processor.run.status.invalid | binary, 0 or 1 | 1 if invalid; 0 otherwise |
| processor.run.status.disabled | binary, 0 or 1 | 1 if disabled; 0 otherwise |
| processor.counter | count | Value of the counter |
处理器记录属性
每个处理器指标都包含以下记录属性:
| Attribute | Description |
|---|---|
| id | The unique identifier of the processor |
| name | The user-visible and user-editable name of the Processor |
| type | The fixed value processor |
| component | The immutable class name of the processor. |
| execution.node | Either ALL or PRIMARY, depending on how this Processor is configured to run |
| group.id | The unique identifier of the Process Group that contains this Processor |
计数器的附加属性
In addition to the standard Processor attributes above, processor.counter metrics include the following:
| Attribute | Description |
|---|---|
| type | The fixed value counter |
| counter | The user- or system-generated name of the counter |
远程流程组指标
| Metric Name | Unit | Description |
|---|---|---|
| remoteprocessgroup.thread.count.active | threads | Number of Active Threads |
| remoteprocessgroup.remote.port.count.active | ports | Number of Active Remote Ports |
| remoteprocessgroup.remote.port.count.inactive | ports | Number of Inactive Remote Ports |
| remoteprocessgroup.duration.lineage.average | nanoseconds | Average Lineage Duration |
| remoteprocessgroup.refresh.age | milliseconds | Time since last refresh |
| remoteprocessgroup.received.count | items | Number of Received Items |
| remoteprocessgroup.received.content.size | bytes | Size of Received Items |
| remoteprocessgroup.sent.count | items | Number of Sent Items |
| remoteprocessgroup.sent.content.size | bytes | Size of Sent Items |
| remoteprocessgroup.transmission.status.transmitting | binary, 0 or 1 | 1 if the Remote Process Group is transmitting. Otherwise, 0. |
| remoteprocessgroup.transmission.status.nottransmitting | binary, 0 or 1 | 0 if the Remote Process Group is transmitting. Otherwise, 1. |
远程流程组记录属性
每个远程流程组指标都包含以下记录属性:
| Attribute | Description |
|---|---|
| id | The unique identifier of the remote process group |
| name | The user-visible name of the Remote Process Group |
| group.id | The unique identifier of the Process Group that contains this Remote Process Group |
| authorization.issue | The Authorization used to access the Remote Process Group |
| target.uri | The URI of the Remote Process Group |
| type | The fixed value remote-process-group |
JVM 指标¶
| Metric Name | Unit | Description |
|---|---|---|
| jvm.memory.heap.used | bytes | The amount of memory currently occupied by objects on the JVM Heap |
| jvm.memory.heap.committed | bytes | The amount of memory guaranteed to be available for use by the JVM Heap |
| jvm.memory.heap.max | bytes | Maximum amount of memory allocated for the JVM Heap |
| jvm.memory.heap.init | bytes | Initial amount of memory allocated for the JVM Heap |
| jvm.memory.heap.usage | percentage | JVM Heap Usage |
| jvm.memory.non-heap.usage | percentage | JVM Non-Heap Usage |
| jvm.memory.total.init | bytes | Initial amount of memory allocated for the JVM |
| jvm.memory.total.used | bytes | Current amount of memory used by the JVM |
| jvm.memory.total.max | bytes | Maximum amount of memory that can be used by the JVM |
| jvm.memory.total.committed | bytes | The amount of memory guaranteed to be available for use by the JVM |
| jvm.threads.count | threads | Number of live threads |
| jvm.threads.deadlocks | threads | JVM Thread Deadlocks |
| jvm.threads.daemon.count | threads | Number of live daemon threads |
| jvm.uptime | seconds | Number of seconds the JVM process has been running |
| jvm.file.descriptor.usage | percentage | Percentage of available file descriptors currently in use. |
| jvm.gc.G1-Concurrent-GC.runs | runs | Total number of times that the G1 Concurrent Garbage Collection has run |
| jvm.gc.G1-Concurrent-GC.time | milliseconds | Total amount of time that the G1 Concurrent Garbage Collection has been running |
| jvm.gc.G1-Young-Generation.runs | runs | Total number of times that the G1 Young Generation has run |
| jvm.gc.G1-Young-Generation.time | milliseconds | Total amount of time that the G1 Young Generation has been running |
| jvm.gc.G1-Old-Generation.runs | runs | Total number of times that the G1 Old Generation has run |
| jvm.gc.G1-Old-Generation.time | milliseconds | Total amount of time that the G1 Old Generation has been running |
JVM 记录属性¶
JVM 指标不提供记录属性。
CPU 指标¶
| Metric Name | Unit | Description |
|---|---|---|
| cores.available | cores | The number of available cores for the Runtime |
| cores.load | percentage | Either the system load average or -1 if it is not available |
CPU 记录属性¶
| Attribute | Description |
|---|---|
| id | The fixed value cpu |
| name | The name of the operating system |
| architecture | The architecture of the operating system |
| version | The version of the operating system |
存储指标
| Metric Name | Unit | Description |
|---|---|---|
| storage.free | bytes | The amount of free storage for a given repository |
| storage.used | bytes | The amount of used storage for a given repository |
存储记录属性
| Attribute | Description |
|---|---|
| id | The unique identifier of the storage repository |
| name | Same as id and provided for consistency |
| storage.type | One of flowfile, content, or provenance |
示例查询
以下示例查询旨在帮助您开始使用 Openflow 遥测。
All queries assume that Openflow is configured to send telemetry to the default Event Table of SNOWFLAKE.TELEMETRY.EVENTS. If your Snowflake Account or Openflow Deployment is configured with a different Event Table, substitute that table name where you see SNOWFLAKE.TELEMETRY.EVENTS.
查找卡住的 FlowFiles¶
此查询返回排队时间超过某个阈值的 FlowFiles,这表明它们可能处于卡住状态并需要干预。阈值默认设置为 30 分钟,可根据实际需求调整。
查找 Openflow 运行时的错误日志¶
查找正在运行和未运行的处理器
某些流程要求所有处理器处于“运行”状态,即使它们未主动处理数据。
此查询可帮助您找到任何正在运行或处于其他状态的处理器,例如:
- 已停止
- invalid
- 已禁用
查找 Openflow 运行时的高 CPU 使用率¶
数据流变慢或吞吐量下降,可能是由于 CPU 出现瓶颈导致。Openflow 运行时会根据您配置的最小和最大节点数量自动向上扩展。
如果 Openflow 运行时正在使用其最大节点数,但 CPU 使用率仍然很高,请考虑:
- 增加分配给该运行时的最大节点数
- 排查连接器或数据流,找出瓶颈所在
使用 Snowsight Charts,可以方便地将 CPU 使用率随时间变化的查询结果可视化。