使用处理程序代码发送指标数据

若要使您的过程或 UDF 发送指标数据,您不需要向处理程序添加任何代码。Snowflake 会在事件表中生成所收集的数据。

如何衡量指标

由于 Java 和 Python 的执行环境不同,收集的指标数据也不同。有关所收集数据的参考信息,请参阅 RECORD 列参考

下面介绍了捕获的数据如何与执行环境相对应。

Java:

报告每个查询 ID 的 JVM(Java 虚拟机)CPU 和内存指标。

每个存储过程都分配有自己的 JVM。下面介绍了收集的指标数据:

  • process.memory.usage:JVM 执行存储过程处理程序所消耗的内存量(以字节为单位)。

  • process.cpu.utilization:总 CPU 时间除以每个逻辑 CPU 挂钟时间,以百分比衡量,其中 1.0 表示 100% 利用率。总 CPU 时间是非空闲任务所花费的总时间。

在查询中调用的每个 Java 和 Scala UDF 会共享一个 JVM。指标值在查询中的每个 Java 或 Scala 函数中进行汇总。 下面介绍了收集的指标数据:

  • process.memory.usage:内存使用情况,显示为查询中调用的所有相关 Java 函数的总和。

  • process.cpu.utilization:CPU 使用,显示为查询中调用的所有 Java 和 Scala 函数的平均值。

Python:

报告每个 Python 函数或过程的 CPU 和内存指标。

每个存储过程仅在一个 Python 过程上执行。下面介绍了收集的指标数据:

  • process.memory.usage:执行存储过程处理程序的 Python 过程所消耗的内存量(以字节为单位)。

  • process.cpu.utilization:总 CPU 时间除以每个逻辑 CPU 挂钟时间,以百分比衡量,其中 1.0 表示 100% 使用率。总 CPU 时间是非空闲任务所花费的总时间。

每个 UDF 可以在多个 Python 执行过程上执行。值在多个流程之间进行汇总。下面介绍了收集的指标数据:

  • process.memory.usage:内存使用情况,显示为该 UDF 所有相关 Python 过程的总和。

  • process.cpu.utilization:已报告 CPU,显示为该 UDF 所有相关过程的平均值。

Python 示例

使用以下步骤生成指标示例数据。

  1. 设置会话的指标级别。METRIC_LEVEL 参数控制是否将自动检测的资源指标数据点发送到事件表。您可以将参数设置为 NONEALL,并在对象和会话级别进行设置。有关更多信息,请参阅 为日志、指标和跟踪设置级别

    ALTER SESSION SET METRIC_LEVEL = ALL;
    
    Copy
  2. 创建存储过程。

    CREATE OR REPLACE PROCEDURE DEMO_SP(n_queries number)
    RETURNS VARCHAR(16777216)
    LANGUAGE PYTHON
    RUNTIME_VERSION = '3.10'
    PACKAGES = ('snowflake-snowpark-python', 'snowflake-telemetry-python==0.2.0')
    HANDLER = 'my_handler'
    AS $$
    import time
    def my_handler(session, n_queries):
      import snowflake.snowpark
      from snowflake.snowpark.functions import col, udf
      from snowflake import telemetry
    
      session.sql('create or replace stage udf_stage;').collect()
    
      @udf(name='example_udf', is_permanent=True, stage_location='@udf_stage', replace=True)
      def example_udf(x: int) -> int:
        # This UDF will consume 1GB of memory to illustrate the memory consumption metric
        one_gb_list = [0] * (1024**3 // 8)
        return x
    
      pandas_grouped_df = session.table('snowflake.account_usage.query_history').select(
        col('total_elapsed_time'),
        col('rows_written_to_result'),
        col('database_name'),
        example_udf(col('bytes_scanned'))
      ).limit(n_queries)\
      .to_pandas()\
      .groupby('DATABASE_NAME')
    
      mean_time = pandas_grouped_df['TOTAL_ELAPSED_TIME'].mean()
      mean_rows_written = pandas_grouped_df['ROWS_WRITTEN_TO_RESULT'].mean()
    
      return f"""
      {mean_time}
      {mean_rows_written}
      """
    $$;
    
    Copy
  3. 运行存储过程

    CALL DEMO_SP(100);
    
    Copy
  4. 查询完成后,查看指标数据,如 查看指标数据 中所述。

语言: 中文