将 Data Classification 与经典 APIs 配合使用

本主题提供了在 Snowflake 中使用经典 APIs 进行数据分类的示例:EXTRACT_SEMANTIC_CATEGORIESASSOCIATE_SEMANTIC_CATEGORY_TAGS。这些 APIs 不再进行更新以与 Data Classification 的更新保持一致。

您可以继续使用这些 APIs。但是,您应更新工作流以使用 使用 Data Classification 中所示的方法。

本主题内容:

概述

以下用于分类数据的示例可根据范围解决不同的用例:

如果您是新的分类用户,请从对单个表进行分类开始,然后继续执行另外两个示例。

通用表和角色

本主题中的示例使用如下所示的表和自定义角色:

  • 表:my_db.my_schema.hr_data,其中包含与员工有关的数据。

  • 角色:

    • data_engineer:对表进行分类。

    • policy_admin:使用掩码策略保护个人信息。

    • analyst:用作您可能希望限制其访问权限的自定义角色的示例。示例假定此角色存在。

    • 数据库角色

      • GOVERNANCE_ADMIN:分配分类系统标签。

      • GOVERNANCE_VIEWER:查询与标签和掩码策略相关的 Account Usage 视图。

  1. 创建表:

    CREATE OR REPLACE TABLE hr_data (
        age INTEGER,
        email_address VARCHAR,
        fname VARCHAR,
        lname VARCHAR
        );
    
    Copy
  2. 加载表(未显示详细信息)。

  3. 创建自定义角色并向这些角色授予必要的权限。

  4. data_engineer 角色需要访问表才能运行分类过程。请注意,此示例将 GOVERNANCE_ADMIN 数据库角色授予 data_engineer 角色,因为您无法将角色切换到数据库角色:

    use role accountadmin;
    
    create role data_engineer;
    
    grant usage on database my_db to role data_engineer;
    
    grant usage on schema my_db.my_schema to role data_engineer;
    
    grant select, update on table my_db.my_schema.hr_data to role data_engineer;
    
    grant database role snowflake.governance_admin to role data_engineer;
    
    Copy
  5. policy_admin 自定义角色需要为包含 PII 数据的任何列分配掩码策略:

    use role accountadmin;
    create role policy_admin;
    grant apply masking policy on account to role policy_admin;
    grant database role snowflake.governance_viewer to role policy_admin;
    
    Copy

对单个表进行分类

单个表示例扩展了三步分类过程(即分析、查看和应用),以应用结果来撤消自定义角色 analyst 的表访问权限。

第 1 步:对表列进行分类

在此步骤中,data_engineer 会运行分类过程,policy_admin 还会使用 掩码策略 保护列数据。

  1. 分析:使用 data_engineer 自定义角色调用 EXTRACT_SEMANTIC_CATEGORIES 函数,对名为 my_db.my_schema.hr_data 的表中的列进行分类:

    USE ROLE data_engineer;
    
    SELECT EXTRACT_SEMANTIC_CATEGORIES('my_db.my_schema.hr_data');
    
    Copy
  2. 审查:数据工程师审查结果以确保结果有意义。

  3. 应用:数据工程师为列分配分类系统标签。

    数据工程师有两种选择:自动分配或手动分配。

    若要自动分配系统标签,请调用 ASSOCIATE_SEMANTIC_CATEGORY_TAGS 存储过程。注意:

    • 表的完全限定名称和第 1 步中的函数是存储过程的实参。

    • 存储过程会重新运行 EXTRACT_SEMANTIC_CATEGORIES 函数。如果要保留第 1 步的结果,请在调用存储过程之前将结果保存到表中。

      CALL ASSOCIATE_SEMANTIC_CATEGORY_TAGS(
         'my_db.my_schema.hr_data',
          EXTRACT_SEMANTIC_CATEGORIES('my_db.my_schema.hr_data')
      );
      
      Copy

      如果存储过程成功执行,它将返回与以下内容类似的消息:Applied tag semantic_category to <n> columns. Applied tag privacy_category to <n> columns.

    否则,当存储过程未执行时,或者如果决定根据需要手动将分类系统标签分配给每一列,请使用 ALTER TABLE ...ALTER COLUMN 语句。例如,将任一系统标签分配给 FNAME 列(即名字)。

    USE ROLE data_engineer;
    
    ALTER TABLE my_db.my_schema.hr_data
      MODIFY COLUMN fname
      SET TAG SNOWFLAKE.CORE.SEMANTIC_CATEGORY='NAME';
    
    Copy

    ALTER TABLE my_db.my_schema.hr_data
      MODIFY COLUMN fname
      SET TAG SNOWFLAKE.CORE.PRIVACY_CATEGORY='IDENTIFIER';
    
    Copy

第 2 步:保护表列

此步骤假定 my_db.my_schema.hr_data 表中的多个列具有分配给这些列的 PRIVACY_CATEGORY = 'IDENTIFIER' 标签,并且需要使用掩码策略保护这些列。

要保护这些列,请执行以下操作:

  1. 使用 policy_admin 角色查找应用了 IDENTIFIER 隐私标签的列:

    USE ROLE policy_admin;
    
    SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.TAG_REFERENCES
      WHERE TAG_NAME = 'PRIVACY_CATEGORY'
      AND TAG_VALUE = 'IDENTIFIER';
    
    Copy

    TAG_REFERENCES 视图的延迟时间最长可达 120 分钟。如果您需要更快地获得结果,并且知道要查询分类标签的列的名称,则可以改用 TAG_REFERENCESTAG_REFERENCES_ALL_COLUMNS 表函数。

  2. 使用 policy_admin 角色将掩码策略应用于相应的列。例如,以下语句将 identifier_mask 掩码策略应用到 fname 列:

    ALTER TABLE my_db.my_schema.hr_data
      MODIFY COLUMN fname
      SET MASKING POLICY governance.policies.identifier_mask;
    
    Copy

第 3 步:使用系统标签撤消访问权限

最后,安全管理员(即具有 SECURITYADMIN 角色的用户)执行以下操作:

  • 查询 TAG_REFERENCES 视图以查找具有 IDENTIFIER 隐私标签值的所有列。

  • 撤消 analyst 自定义角色对列设置了 PRIVACY_CATEGORY = 'IDENTIFIER' 标签的表的 SELECT 权限:

小技巧

您不需要使用 SECURITYADMIN 系统角色来执行这些任务;您可以使用已分配了必要权限的任何自定义角色。

USE ROLE SECURITYADMIN;

SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.TAG_REFERENCES
    WHERE TAG_NAME = 'PRIVACY_CATEGORY'
    AND TAG_VALUE= 'IDENTIFIER';

REVOKE SELECT ON TABLE my_db.my_schema.hr_data FROM ROLE analyst;
Copy

对架构中的所有表进行分类

此示例演示如何使用由两个用户定义的存储过程对架构中的所有表进行分类:

  • classify_schema:列出架构中的所有表,创建一个表来存储分类结果,然后从每个表中提取分类标签并将标签存储在结果表中。

  • associate_tag_batch:使用 classify_schema 存储过程的结果自动将分类系统标签分配给架构中的所有表列,并返回分配给每个表的标签数。

重要

名为 classify_schema 的存储过程将创建一个临时表来存储结果。临时表在调用此存储过程的用户的用户会话期间存在。当用户会话过期时,Snowflake 会删除临时表,用户需要再次调用存储过程以重新创建临时表。

如果需要保留临时表,请从 sqlText 命令中移除 temp 关键字以创建表。

有关详细信息,请参阅 CREATE TABLE 命令中的 TEMP[ORARY] 选项。

  1. 创建名为 classify_schema 的第一个过程:

    create or replace procedure classify_schema(schema_name string, result_table string)
    returns object language JavaScript
    as $$
    // 1
    const table_names = snowflake.execute({
      sqlText: `show terse tables in schema identifier(?)`,
      binds: [SCHEMA_NAME],
    });
    
    // helper function for quoted table names
    function quote(name) {
      return '"'+ name.replace(/"/g,'""') + '"';
    }
    
    // create table to store results in. if it already exists, we will add to it rather than overwrite
    snowflake.execute({
        sqlText: `create temp table if not exists identifier(?) (table_name string, result variant)`,
        binds: [RESULT_TABLE],
    })
    // loop through tables
    while (table_names.next()) {
      let name = table_names.getColumnValue('name');
      // add schema to table name
      name = SCHEMA_NAME + "." + quote(name);
      // insert qualified table name and result into result table
      const results = snowflake.execute({
        sqlText: `insert into identifier(?) select ?, extract_semantic_categories(?)`,
        binds: [RESULT_TABLE, name, name],
      });
    }
    // return the number of tables classified
    return {tables_classified: table_names.getRowCount()};
    $$;
    
    Copy
  2. 创建名为 associate_tag_batch 的第二个过程:

    create or replace procedure associate_tag_batch(result_table string)
    returns Object language JavaScript
    as $$
    // get table names and classification results to loop through
    const tags_to_apply = snowflake.execute({
      sqlText: `select table_name, result from identifier(?)`,
      binds: [RESULT_TABLE],
    });
    
    const out = {};
    while (tags_to_apply.next()) {
      // get table name
      const name = tags_to_apply.getColumnValue('TABLE_NAME');
      // get classification result
      const classification_results = tags_to_apply.getColumnValue('RESULT');
      // call associate semantic category tags with table name and classification result
      const results = snowflake.execute({
        sqlText: `call associate_semantic_category_tags(?, parse_json(?))`,
        binds: [name, JSON.stringify(classification_results)],
      });
      results.next();
      out[name] = results.getColumnValue(1).split('\n');
    }
    // return number of tags applied per table
    return out;
    $$;
    
    Copy
  3. 使用要分类的架构的名称和临时表的名称作为实参来保存每个表的 EXTRACT_SEMANTIC_CATEGORY 结果,从而调用 classify_schema 存储过程:

    call classify_schema('my_db.my_schema','my_temporary_classification_table');
    
    Copy
  4. 查看临时表中的结果,并根据需要进行修改。

  5. 对结果感到满意后,调用 associate_tag_batch 存储过程将分类系统标签分配给表列:

    call associate_tag_batch('my_temporary_classification_table');
    
    Copy

对数据库中的所有表进行分类

此示例演示如何使用两个存储过程对数据库中的所有表进行分类:

  • classify_database:对数据库中每个架构内的所有表进行分类,并返回分类的表数和架构数。

  • associate_tag_batch:执行 对架构中的所有表进行分类 (本主题内容)中定义的相同操作。

  1. 创建 classify_database 存储过程:

    create or replace procedure classify_database(database_name string, result_table string)
    returns Object language JavaScript
    as $$
    // get list of schemas in database
    const schema_names = snowflake.execute({
      sqlText: `show terse schemas in database identifier(?)`,
      binds: [DATABASE_NAME],
    });
    
    // helper function for quoted schema names
    function quote(name) {
      return '"'+ name.replace(/"/g,'""') + '"';
    }
    
    // counter for tables. will use result from classify_schema to increment
    let table_count = 0
    while (schema_names.next()) {
      let name = schema_names.getColumnValue('name');
      // skip the information schema
      if (name == "INFORMATION_SCHEMA") {
        continue;
      }
      // add database name to schema
      name = DATABASE_NAME + "." + quote(name);
      // call classify_schema on each schema. This will loop over tables in schema
      const results = snowflake.execute({
        sqlText: `call classify_schema(?, ?)`,
        binds: [name, RESULT_TABLE],
      });
      results.next();
      // increment total number of tables by the number of tables in the schema
      table_count += results.getColumnValue(1).tables_classified ;
    }
    
    return {
        tables_classified: table_count,
        // subtract one from number of schemas because we skip the information schema
        schemas_classified: schema_names.getRowCount() - 1,
    };
    $$;
    
    Copy
  2. 使用要分类的数据库的名称和临时表的名称调用 classify_database 存储过程,以将结果存储在数据库的每个架构中:

    call classify_database('my_db','my_temporary_classification_table');
    
    Copy
  3. 导航到每个架构并查看临时表,在必要时进行修订。

  4. 对结果感到满意时,为 每个 架构调用一次 associate_tag_batch 存储过程,以将标签应用于该架构中的表。例如,如果数据库包含三个架构,则调用存储过程三次:

    call associate_tag_batch('my_temporary_classification_table');
    
    Copy
语言: 中文