将 Data Classification 与经典 APIs 配合使用¶
本主题提供了在 Snowflake 中使用经典 APIs 进行数据分类的示例:EXTRACT_SEMANTIC_CATEGORIES 和 ASSOCIATE_SEMANTIC_CATEGORY_TAGS。这些 APIs 不再进行更新以与 Data Classification 的更新保持一致。
您可以继续使用这些 APIs。但是,您应更新工作流以使用 使用 Data Classification 中所示的方法。
本主题内容:
概述¶
以下用于分类数据的示例可根据范围解决不同的用例:
如果您是新的分类用户,请从对单个表进行分类开始,然后继续执行另外两个示例。
通用表和角色¶
本主题中的示例使用如下所示的表和自定义角色:
表:
my_db.my_schema.hr_data
,其中包含与员工有关的数据。角色:
data_engineer
:对表进行分类。policy_admin
:使用掩码策略保护个人信息。analyst
:用作您可能希望限制其访问权限的自定义角色的示例。示例假定此角色存在。-
GOVERNANCE_ADMIN:分配分类系统标签。
GOVERNANCE_VIEWER:查询与标签和掩码策略相关的 Account Usage 视图。
创建表:
CREATE OR REPLACE TABLE hr_data ( age INTEGER, email_address VARCHAR, fname VARCHAR, lname VARCHAR );
加载表(未显示详细信息)。
创建自定义角色并向这些角色授予必要的权限。
data_engineer
角色需要访问表才能运行分类过程。请注意,此示例将 GOVERNANCE_ADMIN 数据库角色授予data_engineer
角色,因为您无法将角色切换到数据库角色:use role accountadmin; create role data_engineer; grant usage on database my_db to role data_engineer; grant usage on schema my_db.my_schema to role data_engineer; grant select, update on table my_db.my_schema.hr_data to role data_engineer; grant database role snowflake.governance_admin to role data_engineer;
policy_admin
自定义角色需要为包含 PII 数据的任何列分配掩码策略:use role accountadmin; create role policy_admin; grant apply masking policy on account to role policy_admin; grant database role snowflake.governance_viewer to role policy_admin;
对单个表进行分类¶
单个表示例扩展了三步分类过程(即分析、查看和应用),以应用结果来撤消自定义角色 analyst
的表访问权限。
第 1 步:对表列进行分类¶
在此步骤中,data_engineer
会运行分类过程,policy_admin
还会使用 掩码策略 保护列数据。
分析:使用
data_engineer
自定义角色调用 EXTRACT_SEMANTIC_CATEGORIES 函数,对名为my_db.my_schema.hr_data
的表中的列进行分类:USE ROLE data_engineer; SELECT EXTRACT_SEMANTIC_CATEGORIES('my_db.my_schema.hr_data');
审查:数据工程师审查结果以确保结果有意义。
应用:数据工程师为列分配分类系统标签。
数据工程师有两种选择:自动分配或手动分配。
若要自动分配系统标签,请调用 ASSOCIATE_SEMANTIC_CATEGORY_TAGS 存储过程。注意:
表的完全限定名称和第 1 步中的函数是存储过程的实参。
存储过程会重新运行 EXTRACT_SEMANTIC_CATEGORIES 函数。如果要保留第 1 步的结果,请在调用存储过程之前将结果保存到表中。
CALL ASSOCIATE_SEMANTIC_CATEGORY_TAGS( 'my_db.my_schema.hr_data', EXTRACT_SEMANTIC_CATEGORIES('my_db.my_schema.hr_data') );
如果存储过程成功执行,它将返回与以下内容类似的消息:
Applied tag semantic_category to <n> columns. Applied tag privacy_category to <n> columns.
否则,当存储过程未执行时,或者如果决定根据需要手动将分类系统标签分配给每一列,请使用 ALTER TABLE ...ALTER COLUMN 语句。例如,将任一系统标签分配给 FNAME 列(即名字)。
USE ROLE data_engineer; ALTER TABLE my_db.my_schema.hr_data MODIFY COLUMN fname SET TAG SNOWFLAKE.CORE.SEMANTIC_CATEGORY='NAME';
或
ALTER TABLE my_db.my_schema.hr_data MODIFY COLUMN fname SET TAG SNOWFLAKE.CORE.PRIVACY_CATEGORY='IDENTIFIER';
第 2 步:保护表列¶
此步骤假定 my_db.my_schema.hr_data
表中的多个列具有分配给这些列的 PRIVACY_CATEGORY = 'IDENTIFIER'
标签,并且需要使用掩码策略保护这些列。
要保护这些列,请执行以下操作:
使用
policy_admin
角色查找应用了 IDENTIFIER 隐私标签的列:USE ROLE policy_admin; SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.TAG_REFERENCES WHERE TAG_NAME = 'PRIVACY_CATEGORY' AND TAG_VALUE = 'IDENTIFIER';
TAG_REFERENCES 视图的延迟时间最长可达 120 分钟。如果您需要更快地获得结果,并且知道要查询分类标签的列的名称,则可以改用 TAG_REFERENCES 或 TAG_REFERENCES_ALL_COLUMNS 表函数。
使用
policy_admin
角色将掩码策略应用于相应的列。例如,以下语句将identifier_mask
掩码策略应用到fname
列:ALTER TABLE my_db.my_schema.hr_data MODIFY COLUMN fname SET MASKING POLICY governance.policies.identifier_mask;
对架构中的所有表进行分类¶
此示例演示如何使用由两个用户定义的存储过程对架构中的所有表进行分类:
classify_schema
:列出架构中的所有表,创建一个表来存储分类结果,然后从每个表中提取分类标签并将标签存储在结果表中。associate_tag_batch
:使用classify_schema
存储过程的结果自动将分类系统标签分配给架构中的所有表列,并返回分配给每个表的标签数。
重要
名为 classify_schema
的存储过程将创建一个临时表来存储结果。临时表在调用此存储过程的用户的用户会话期间存在。当用户会话过期时,Snowflake 会删除临时表,用户需要再次调用存储过程以重新创建临时表。
如果需要保留临时表,请从 sqlText
命令中移除 temp
关键字以创建表。
有关详细信息,请参阅 CREATE TABLE 命令中的 TEMP[ORARY]
选项。
创建名为
classify_schema
的第一个过程:create or replace procedure classify_schema(schema_name string, result_table string) returns object language JavaScript as $$ // 1 const table_names = snowflake.execute({ sqlText: `show terse tables in schema identifier(?)`, binds: [SCHEMA_NAME], }); // helper function for quoted table names function quote(name) { return '"'+ name.replace(/"/g,'""') + '"'; } // create table to store results in. if it already exists, we will add to it rather than overwrite snowflake.execute({ sqlText: `create temp table if not exists identifier(?) (table_name string, result variant)`, binds: [RESULT_TABLE], }) // loop through tables while (table_names.next()) { let name = table_names.getColumnValue('name'); // add schema to table name name = SCHEMA_NAME + "." + quote(name); // insert qualified table name and result into result table const results = snowflake.execute({ sqlText: `insert into identifier(?) select ?, extract_semantic_categories(?)`, binds: [RESULT_TABLE, name, name], }); } // return the number of tables classified return {tables_classified: table_names.getRowCount()}; $$;
创建名为
associate_tag_batch
的第二个过程:create or replace procedure associate_tag_batch(result_table string) returns Object language JavaScript as $$ // get table names and classification results to loop through const tags_to_apply = snowflake.execute({ sqlText: `select table_name, result from identifier(?)`, binds: [RESULT_TABLE], }); const out = {}; while (tags_to_apply.next()) { // get table name const name = tags_to_apply.getColumnValue('TABLE_NAME'); // get classification result const classification_results = tags_to_apply.getColumnValue('RESULT'); // call associate semantic category tags with table name and classification result const results = snowflake.execute({ sqlText: `call associate_semantic_category_tags(?, parse_json(?))`, binds: [name, JSON.stringify(classification_results)], }); results.next(); out[name] = results.getColumnValue(1).split('\n'); } // return number of tags applied per table return out; $$;
使用要分类的架构的名称和临时表的名称作为实参来保存每个表的 EXTRACT_SEMANTIC_CATEGORY 结果,从而调用
classify_schema
存储过程:call classify_schema('my_db.my_schema','my_temporary_classification_table');
查看临时表中的结果,并根据需要进行修改。
对结果感到满意后,调用
associate_tag_batch
存储过程将分类系统标签分配给表列:call associate_tag_batch('my_temporary_classification_table');
对数据库中的所有表进行分类¶
此示例演示如何使用两个存储过程对数据库中的所有表进行分类:
classify_database
:对数据库中每个架构内的所有表进行分类,并返回分类的表数和架构数。associate_tag_batch
:执行 对架构中的所有表进行分类 (本主题内容)中定义的相同操作。
创建
classify_database
存储过程:create or replace procedure classify_database(database_name string, result_table string) returns Object language JavaScript as $$ // get list of schemas in database const schema_names = snowflake.execute({ sqlText: `show terse schemas in database identifier(?)`, binds: [DATABASE_NAME], }); // helper function for quoted schema names function quote(name) { return '"'+ name.replace(/"/g,'""') + '"'; } // counter for tables. will use result from classify_schema to increment let table_count = 0 while (schema_names.next()) { let name = schema_names.getColumnValue('name'); // skip the information schema if (name == "INFORMATION_SCHEMA") { continue; } // add database name to schema name = DATABASE_NAME + "." + quote(name); // call classify_schema on each schema. This will loop over tables in schema const results = snowflake.execute({ sqlText: `call classify_schema(?, ?)`, binds: [name, RESULT_TABLE], }); results.next(); // increment total number of tables by the number of tables in the schema table_count += results.getColumnValue(1).tables_classified ; } return { tables_classified: table_count, // subtract one from number of schemas because we skip the information schema schemas_classified: schema_names.getRowCount() - 1, }; $$;
使用要分类的数据库的名称和临时表的名称调用
classify_database
存储过程,以将结果存储在数据库的每个架构中:call classify_database('my_db','my_temporary_classification_table');
导航到每个架构并查看临时表,在必要时进行修订。
对结果感到满意时,为 每个 架构调用一次
associate_tag_batch
存储过程,以将标签应用于该架构中的表。例如,如果数据库包含三个架构,则调用存储过程三次:call associate_tag_batch('my_temporary_classification_table');