向应用程序包添加可计费事件

为 Snowflake Native App 使用自定义事件计费时,除了基于使用量的现有定价方案之外,您还可以采用针对特定类型应用程序使用量的计费方式。要进行此设置,必须执行两个高级步骤:

  1. 按照本主题中的步骤设置应用程序包,以发出可计费事件。

  2. 针对用于向使用者发布 Snowflake Native App 的列表,选择具有可计费事件且基于使用量的定价方案

本主题介绍如何使用 SYSTEM$CREATE_BILLING_EVENT 系统函数设置应用程序包,以发出可计费事件。

应用程序包中的可计费事件概述

您可以将应用程序包设置为针对特定使用事件发出可计费事件,这样可根据使用者对 Snowflake Native App 的使用量向其收费。

例如,可以添加一个可计费事件,对于 Snowflake Native App 中的每次存储过程调用向使用者收取特定金额的费用。

要向应用程序包添加可计费事件,请执行以下操作:

  1. 创建存储过程,定义哪些使用事件会触发对 SYSTEM$CREATE_BILLING_EVENT 系统函数的调用。

    备注

    在此阶段,您不能测试系统函数的输出。此系统函数只能从安装在使用者账户中的 Snowflake Native App 调用。

  2. 将这些存储过程添加到应用程序包的安装脚本中。

重要

Snowflake 支持通过在应用程序的存储过程中调用系统函数发出的计费事件,如本主题中的示例所示。

Snowflake 不支持通过其他方法来计算可计费事件的基本费用,例如使用表输出或输出使用者活动的用户定义的函数的方法,或使用在事件表中记录日志的遥测的方法。

如果不确定拟议的实施方案是否受支持,请联系 Snowflake 客户代表。

可计费事件示例

本部分中的示例说明了如何创建存储过程,以针对常见计费场景发出可计费事件。 这些示例均调用 调用 SYSTEM$CREATE_BILLING_EVENT 系统函数 中定义的 createBillingEvent 函数。

调用 SYSTEM$CREATE_BILLING_EVENT 系统函数

以下示例说明了如何在存储过程中创建包装器函数,以调用 SYSTEM$CREATE_BILLING_EVENT 系统函数。

备注

可在使用 JavaScript、Python 或 Java 编写的存储过程中调用此系统函数。

此示例在 db_1 数据库和 custom_event_billing 架构中创建了一个名为 public 的 JavaScript 存储过程。该存储过程创建了一个名为 createBillingEvent 的 helper 函数,该函数接受与 SYSTEM$CREATE_BILLING_EVENT 系统函数期望的有类型参数相对应的实参。

有关参数和所需类型的更多详细信息,请参阅 SYSTEM$CREATE_BILLING_EVENT

 CREATE OR REPLACE PROCEDURE db_1.public.custom_event_billing()
 RETURNS NULL
 LANGUAGE JAVASCRIPT
 AS
 $$
   /**
    * Helper method to add a billable event
    * Format timestamps as Unix timestamps in milliseconds
    */

   function createBillingEvent(className, subclassName, startTimestampVal, timestampVal, baseCharge, objects, additionalInfo) {
        try {
            var res = snowflake.createStatement({
            sqlText: `SELECT SYSTEM$CREATE_BILLING_EVENT('${className}',
                                                      '${subclassName}',
                                                      ${startTimestampVal},
                                                      ${timestampVal},
                                                      ${baseCharge},
                                                      '${objects}',
                                                      '${additionalInfo}')`
            }).execute();

            res.next();

            return res.getColumnValue(1);
        } catch(err) {
            return err.message;
        }
    }
$$;
Copy

本主题中的示例调用了该 helper 函数。

示例:基于对存储过程的调用计费

以下示例说明了如何创建存储过程,以在使用者在 Snowflake Native App 中调用该存储过程时发出可计费事件。

在定义 helper 函数的同一存储过程中,将此示例代码添加到设置脚本中:

...
  //
  // Send a billable event when a stored procedure is called.
  //
  var event_ts = Date.now();
  var billing_quantity = 1.0;
  var base_charge = billing_quantity;
  var objects = "[ \"db_1.public.procedure_1\" ]";
  var retVal = createBillingEvent("PROCEDURE_CALL", "", event_ts, event_ts, base_charge, objects, "");
  // Run the rest of the procedure ...
$$;
Copy

此示例代码创建了一个存储过程,该存储过程调用 createBillingEvent 函数,以发出类名称为 PROCEDURE_CALL、基本费用为 1.0 的可计费事件。

备注

传递给 createBillingEvent 函数的实参类型必须对应于 SYSTEM$CREATE_BILLING_EVENT 系统函数所需的有类型参数。

示例:基于 Snowflake Native App 使用的行数计费

以下示例说明了如何创建存储过程,以根据使用者账户中对一个表内的行的使用情况发出可计费事件。

在定义 helper 函数的同一存储过程中,将此示例代码添加到设置脚本中:

...
  // Run a query and get the number of rows in the result
  var select_query = "select i from db_1.public.t1";
  res = snowflake.execute ({sqlText: select_query});
  res.next();
  //
  // Send a billable event for rows returned from the select query
  //
  var event_ts = Date.now();
  var billing_quantity = 2.5;
  var base_charge = res.getRowcount() * billing_quantity;
  var objects = "[ \"db_1.public.t1\" ]";
  createBillingEvent("ROWS_CONSUMED", "", event_ts, event_ts, base_charge, objects, "");
  // Run the rest of the procedure ...
$$;
Copy

此示例代码创建了一个存储过程,该存储过程调用 createBillingEvent 函数以发出一个可计费事件,后者的类名为 ROWS_CONSUMED,计算出的基本费用为 2.5 乘以使用者账户中 db_1.public.t1 表内的行数。

备注

传递给 createBillingEvent 函数的实参类型必须对应于 SYSTEM$CREATE_BILLING_EVENT 系统函数所需的有类型参数。

示例:基于引入的行数计费

以下示例说明了如何创建存储过程,以根据表中接收的行数发出可计费事件。

在定义 helper 函数的同一存储过程中,将此示例代码添加到设置脚本中:

...
    // Run the merge query
    var merge_query = "MERGE INTO target_table USING source_table ON target_table.i = source_table.i
        WHEN MATCHED THEN UPDATE SET target_table.j = source_table.j
        WHEN NOT MATCHED
        THEN INSERT (i, j)
        VALUES (source_table.i, source_table.j)";
    res = snowflake.execute ({sqlText: merge_query});
    res.next();
    // rows ingested = rows inserted + rows updated
    var numRowsIngested = res.getColumnValue(1) + res.getColumnValue(2);

    //
    // Send a billable event for rows changed by the merge query
    //
    var event_ts = Date.now();
    var billing_quantity = 2.5;
    var base_charge = numRowsIngested * billing_quantity;
    var objects = "[ \"db_1.public.target_table\" ]";
    createBillingEvent("ROWS_CHANGED", "", event_ts, event_ts, base_charge, objects, "");
    // Run the rest of the procedure ...
$$;
Copy

此示例代码创建了一个存储过程,该存储过程调用 createBillingEvent 函数以发出一个可计费事件,后者的类名为 ROWS_CHANGED,计算出的基本费用为 2.5 乘以 db_1.target_table 表中引入的行数。

备注

传递给 createBillingEvent 函数的实参类型必须对应于 SYSTEM$CREATE_BILLING_EVENT 系统函数所需的有类型参数。

示例:基于每月活跃行数计费

每月活跃行数是指在一个日历月内首次插入或更新的行数。有些提供商使用此指标,仅按一个月内更新的不重复行数向使用者收费。您可以修改此示例,改为统计唯一身份用户,或者确定不重复数据加载位置,从而确定基本费用。

以下示例说明了如何创建存储过程,以根据每月活跃行数发出可计费事件。在定义 helper 函数的同一存储过程中,将此示例代码添加到设置脚本中:

...
    //
    // Get monthly active rows
    //
    var monthly_active_rows_query = "
     SELECT
         count(*)
     FROM
         source_table
     WHERE
         source_table.i not in
         (
           SELECT
             i
           FROM
             target_table
           WHERE
             updated_on >= DATE_TRUNC('MONTH', CURRENT_TIMESTAMP)
         )";
    res = snowflake.execute ({sqlText: monthly_active_rows_query});
    res.next();
    var monthlyActiveRows = parseInt(res.getColumnValue(1));
    //
    // Run the merge query and update the updated_on values for the rows
    //
    var merge_query = "
        MERGE INTO
            target_table
        USING
            source_table
        ON
            target_table.i = source_table.i
        WHEN MATCHED THEN
         UPDATE SET target_table.j = source_table.j
                    ,target_table.updated_on = current_timestamp
        WHEN NOT MATCHED THEN
            INSERT (i, j, updated_on) VALUES (source_table.i, source_table.j, current_timestamp)";
    res = snowflake.execute ({sqlText: merge_query});
    res.next();
    //
    // Emit a billable event for monthly active rows changed by the merge query
    //
    var event_ts = Date.now();
    var billing_quantity = 0.02
    var base_charge = monthlyActiveRows * billing_quantity;
    var objects = "[ \"db_1.public.target_table\" ]";
    createBillingEvent("MONTHLY_ACTIVE_ROWS", "", event_ts, event_ts, base_charge, objects, "");
    // Run the rest of the procedure ...
$$;
Copy

此示例代码创建一个存储过程,该存储过程使用合并查询来确定每月活跃行数,从而确定不重复行数。随后,该示例使用 monthlyActiveRows 变量的值和 billing_quantity 计算基本费用。然后将基本费用传递给 createBillingEvent 函数。

备注

传递给 createBillingEvent 函数的实参类型必须对应于 SYSTEM$CREATE_BILLING_EVENT 系统函数所需的有类型参数。

在安装脚本中,将此存储过程添加在 :ref:` 调用 SYSTEM$CREATE_BILLING_EVENT 系统函数的存储过程 <label-native_apps_custom_billing_calling_sys_function>` 之后。

Snowpark Python 示例:基于使用的行数计费

如果您想在 Snowpark Python 中编写存储过程,以便根据 Snowflake Native App 使用的行数计费,请使用以下示例:

CREATE OR REPLACE PROCEDURE app_schema.billing_event_rows()
   RETURNS STRING
   LANGUAGE PYTHON
   RUNTIME_VERSION = '3.9'
   PACKAGES = ('snowflake-snowpark-python')
   HANDLER = 'run'
   EXECUTE AS OWNER
   AS $$
import time

# Helper method that calls the system function for billing
def createBillingEvent(session, class_name, subclass_name, start_timestamp, timestamp, base_charge, objects, additional_info):
   session.sql(f"SELECT SYSTEM$CREATE_BILLING_EVENT('{class_name}', '{subclass_name}', {start_timestamp}, {timestamp}, {base_charge}, '{objects}', '{additional_info}')").collect()
   return "Success"

# Handler function for the stored procedure
def run(session):
   # insert code to identify monthly active rows and calculate a charge
   try:

      # Run a query to select rows from a table
      query =  "select i from db_1.public.t1"
      res = session.sql(query).collect()

      # Define the price to charge per row
      billing_quantity = 2.5

      # Calculate the base charge based on number of rows in the result
      charge = len(res) * billing_quantity

      # Current time in Unix timestamp (epoch) time in milliseconds
      current_time_epoch = int(time.time() * 1000)

      return createBillingEvent(session, 'ROWS_CONSUMED', '', current_time_epoch, current_time_epoch, charge, '["billing_event_rows"]', '')
   except Exception as ex:
      return "Error " + ex
$$;
Copy

此示例代码创建了一个存储过程,该存储过程定义一个调用 SYSTEM$CREATE_BILLING_EVENT 系统函数的 helper 方法,以及一个调用该 helper 方法的 createBillingEvent 方法,从而发出类名为 ROWS_CONSUMED、基本费用按 2.5 USD 乘以使用者账户中 db_1.public.t1 表行数计算得出的可计费事件。

备注

传递给 createBillingEvent 函数的实参类型必须对应于 SYSTEM$CREATE_BILLING_EVENT 系统函数所需的有类型参数。

测试自定义事件计费

为确保正确设置自定义事件计费,并确保按预期为使用量事件发出可计费事件,请执行以下操作:

  1. 更新应用程序包:

    1. 更新安装脚本,包含发出可计费事件的存储过程。

    2. 使用新的安装脚本更新应用程序包。

    3. 更新应用程序包的版本和发布指令。

  2. 与组织中您有拥有以下权限的使用者账户共享应用程序包:

    1. 创建私密列表

    2. 为列表添加 作为定价方案的自定义事件计费

    3. 将其与使用者账户共享。

    4. 使用 Snowsight 登录使用者账户。

    5. 安装 Snowflake Native App。

  3. 确认 存储过程成功发出可计费事件

  4. 确认 正确设置列表

备注

在测试自定义事件计费时,必须 ` 设置付款方式 <https://other-docs.snowflake.cn/en/collaboration/consumer-listings-paying#set-up-payment-method>`_,但不会为您组织内部的使用向您收费。

验证存储过程是否会发出可计费事件

登录您与之共享列表的使用者账户时,调用已添加到 Snowflake Native App 的存储过程。

例如,要测试为 :ref:` 根据每月活跃行数计费 <label-native_apps_custom_billing_monthly_active_rows_ex>` 而创建的存储过程,请执行以下操作:

  1. 在 Snowsight 中登录使用者账户。

  2. 打开工作表,并将上下文设置为 db_1.public

  3. 运行以下 SQL 语句:

    CALL merge_procedure()
    
    Copy

    如果存储过程返回 Success,则表明代码有效。

备注

如果在用于创建应用程序包的提供商账户中运行这些 SQL 命令,则会收到错误。

验证自定义事件计费定价方案

要验证 Snowflake Native App 用户的体验,并确认列表和应用程序包设置正确,可在共享 SNOWFLAKE 数据库的 DATA_SHARING_USAGE 架构中查询 `MARKETPLACE_PAID_USAGE_DAILY 视图<https://other-docs.snowflake.com/en/collaboration/views/marketplace-paid-usage-daily-ds>`_

备注

由于视图中的延迟,请在首次使用 Snowflake Native App 后等待至少两天再运行这些查询。

要确认 Snowflake Native App 和列表成功生成了可计费事件,请在您与之共享列表的使用者账户中运行以下 SQL 语句:

备注

将 PROVIDER_ACCOUNT_NAME 和 PROVIDER_ORGANIZATION_NAME 值替换为提供商账户的对应值。

SELECT listing_global_name,
   listing_display_name,
   charge_type,
   charge
FROM SNOWFLAKE.DATA_SHARING_USAGE.MARKETPLACE_PAID_USAGE_DAILY
WHERE charge_type='MONETIZABLE_BILLING_EVENTS'
      AND PROVIDER_ACCOUNT_NAME = <account_name>
      AND PROVIDER_ORGANIZATION_NAME= <organization_name>;
Copy
+---------------------+------------------------+----------------------------+--------+
| LISTING_GLOBAL_NAME |  LISTING_DISPLAY_NAME  |        CHARGE_TYPE         | CHARGE |
+---------------------+------------------------+----------------------------+--------+
| AAAA0BBB1CC         | Snowy Mountain Listing | MONETIZABLE_BILLING_EVENTS |   18.6 |
+---------------------+------------------------+----------------------------+--------+
语言: 中文