从 Apache Spark™ 查询 Apache Iceberg™ 表时强制执行数据保护策略¶
本主题介绍在通过 Snowflake Horizon Catalog 跨 Apache Spark™ 访问 Apache Iceberg™ 表时,如何强制执行在表上设置的数据保护策略。要强制执行数据保护策略,请安装 Spark 连接器 Snowflake Connector for Spark。有关 Spark 连接器的更多信息,请参阅 Snowflake Connector for Spark。
Spark 连接器支持通过将查询路由至 Snowflake,来查询受 Snowflake 策略保护的表,从而确保计算资源的高效使用和策略的一致执行。Spark 连接器还支持通过将写入路由至 Snowflake,对受 Snowflake 策略保护的表执行写入操作。
备注
对于没有细粒度数据保护策略的 Apache Iceberg 表,Spark 连接器还支持通过 Snowflake Horizon Catalog 使用 Spark 会话计算对其进行直接查询。
从 Spark 查询 Iceberg 表时强制执行数据保护策略的工作流程¶
要在从 Spark 查询 Iceberg 表时强制执行数据保护策略,请完成以下步骤:
使用 Snowflake Spark Connector 将 Spark 连接到 Iceberg 表,其中包括下载 Snowflake Connector for Spark 以及通过 Snowflake Horizon Catalog 将 Spark 连接到 Iceberg 表。
支持的数据保护策略¶
支持以下数据保护策略:
对受任何其他数据策略保护的表进行查询将导致错误。
先决条件¶
获取以下信息:
将查询表的 Snowflake 用户的用户名
包含要查询表的 Snowflake 数据库的名称
Snowflake 中用于策略评估的虚拟仓库的名称
检索包含要查询的 Iceberg 表的 Snowflake 的账户账户标识符。有关说明,请参阅 账户标识符。您将在 将 Spark 连接到强制执行数据访问策略的 Iceberg 表 时指定此标识符。
小技巧
要使用 SQL 获取您的账户标识符,请运行以下命令:
SELECT CURRENT_ORGANIZATION_NAME() || '-' || CURRENT_ACCOUNT_NAME();
第 1 步:配置数据保护策略¶
重要
如果您已经在要查询的 Iceberg 表上设置了数据保护策略,请继续执行下一步。
在此步骤中,您将配置数据保护策略。
第 2 步:使用 Snowflake Connector for Spark 将 Spark 连接到 Iceberg 表¶
在此步骤中,您将通过 Horizon Catalog 将 Spark 连接到 Iceberg 表。建立连接后,您可以使用 Spark 查询表,并在表上强制执行数据保护策略。
要使用 Snowflake Connector for Spark (Spark 连接器)将 Spark 连接到 Iceberg 表,请先下载 Spark 连接器,然后将 Spark 连接到 Iceberg 表。
下载 Snowflake Connector for Spark¶
要下载 3.1.6 或更高版本的 Snowflake Connector for Spark,请按照 安装和配置 Spark Connector 中的说明进行操作。
将 Spark 连接到 Iceberg 表¶
在此步骤中,您将通过 Horizon Catalog 将 Spark 连接到 Iceberg 表。此连接包含相关配置,以便您配合 Horizon Catalog 使用 Snowflake Connector for Spark 来查询受 Snowflake 数据保护策略保护的表。
备注
如果您使用的是 External OAuth 或密钥对身份验证,请参阅 使用 External OAuth 或密钥对身份验证将 Spark 连接到 Iceberg 表。
要使用编程访问令牌 (PAT) 将 Spark 连接到 Iceberg 表,请使用以下示例 PySpark 代码:
from pyspark.sql import SparkSession # Snowflake Horizon Catalog Configuration, change as per your environment CATALOG_URI = "https://<account_identifier>.snowflakecomputing.cn/polaris/api/catalog" ROLE = "<role>" HORIZON_SESSION_ROLE = f"session:role:{ROLE}" CATALOG_NAME = "<database_name>" #provide in UPPER CASE SF_URL= "<account_identifier>.snowflakecomputing.cn" SF_USER = "<user_name>" #provide in UPPER CASE SF_PASSWORD = "<user_password>" SF_SCHEMA = "<schema_name>" #provide in UPPER CASE SF_WAREHOUSE = "<warehouse_name>" #provide in UPPER CASE # Cloud Service Provider Region Configuration (where the Iceberg data is stored) REGION = "<region_name>" # Paste the External Oauth Access token that you generated in Snowflake here ACCESS_TOKEN = "<your_access_token>" # Paste the PAT you generated in Snowflake here PAT_TOKEN = "<your_PAT_token>" # Iceberg Version ICEBERG_VERSION = "1.9.1" #Snowflake Connector for Spark DRIVER_VERSION = "3.24.0" # (or above) SNOWFLAKE_CONNECTOR_VERSION = "3.1.6" try: spark.stop() except: pass spark = ( SparkSession.builder .master("local[*]") .config("spark.ui.port", "0") .config("spark.driver.bindAddress", "127.0.0.1") .config("spark.driver.host", "127.0.0.1") .config("spark.driver.port", "0") .config("spark.blockManager.port", "0") # JAR Dependencies for Iceberg, Azure and Snowflake Connector for Spark .config( "spark.jars.packages", f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{ICEBERG_VERSION}," f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}," # for Azure storage, use the below package and comment above azure bundle # f"org.apache.iceberg:iceberg-azure-bundle:{ICEBERG_VERSION}" # for Snowflake Connector for Spark f"net.snowflake:snowflake-jdbc:{DRIVER_VERSION}," f"net.snowflake:spark-snowflake_2.12:{SNOWFLAKE_CONNECTOR_VERSION}" ) # Iceberg SQL Extensions .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.defaultCatalog", "horizoncatalog") .config("spark.sql.catalog.horizoncatalog", "org.apache.spark.sql.snowflake.catalog.SnowflakeFallbackCatalog") #Horizon REST Catalog Configuration .config(f"spark.sql.catalog.horizoncatalog.catalog-impl", "org.apache.iceberg.spark.SparkCatalog") .config(f"spark.sql.catalog.horizoncatalog.type", "rest") .config(f"spark.sql.catalog.horizoncatalog.uri", CATALOG_URI) .config(f"spark.sql.catalog.horizoncatalog.warehouse", CATALOG_NAME) .config(f"spark.sql.catalog.horizoncatalog.scope", HORIZON_SESSION_ROLE) .config(f"spark.sql.catalog.horizoncatalog.client.region", REGION) .config(f"spark.sql.catalog.horizoncatalog.credential", PAT_TOKEN) # for External Oauth use below and comment above configuration .token #.config(f"spark.sql.catalog.horizoncatalog.token", ACCESS_TOKEN) .config("spark.sql.catalog.horizoncatalog.io-impl","org.apache.iceberg.aws.s3.S3FileIO") # Enforcing policies using Snowflake Connector for Spark .config("spark.snowflake.sfURL", SF_URL) .config("spark.snowflake.sfUser", SF_USER) .config("spark.snowflake.sfPassword", SF_PASSWORD) # for External Oauth uncomment below and comment above configurations for user and password #.config("spark.snowflake.sfAuthenticator","oauth") #.config("spark.snowflake.sfToken",ACCESS_TOKEN) .config("spark.snowflake.sfDatabase", CATALOG_NAME) .config("spark.snowflake.sfSchema",SF_SCHEMA) # Optional .config("spark.snowflake.sfRole",ROLE) .config("spark.snowflake.sfWarehouse",SF_WAREHOUSE) # Required for vended credentials .config(f"spark.sql.catalog.horizoncatalog.header.X-Iceberg-Access-Delegation", "vended-credentials") .config("spark.sql.iceberg.vectorization.enabled", "false") .getOrCreate() ) spark.sparkContext.setLogLevel("ERROR")
其中:
<account_identifier>是包含要查询的 Iceberg 表的 Snowflake 账户的 Snowflake 账户标识符。要查找此标识符,请参阅 账户标识符。<your_access_token>是您获得的访问令牌。要获取访问令牌,请参阅 获取用于身份验证的访问令牌。备注
对于外部 OAuth,您还可以使用自动令牌刷新配置与引擎的连接,而不是指定访问令牌。
<database_name>是 Snowflake 账户中数据库的名称,该数据库包含您要查询的 Snowflake 托管的 Iceberg 表。备注
Spark 中的以下属性需要您的 Snowflake 数据库 名称,而非 Snowflake 仓库名称:
.warehouse.sfDatabase
<role>是 Snowflake 中配置为对要查询的 Iceberg 表具有访问权限的角色。例如:DATA_ENGINEER。:code:` <user_name> ` 是用于访问 Snowflake 中表的用户名。
<user_password>是访问表的用户的密码。备注
如果适用,该密码可以是您为身份验证而获取的编程访问令牌 (PAT)。
<schema_name>是 Snowflake 中存储表的架构。这是可选的。<warehouse_name>是您希望用于评估策略的 Snowflake 仓库(计算实例)名称。
重要
默认情况下,代码示例是针对存储在 Amazon S3 上的 Apache Iceberg™ 表进行设置的。如果您的 Iceberg 表存储在 Azure 存储 (ADLS) 中,请执行以下步骤:
注释以下行:
f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}"取消注释以下行:
# f"org.apache.iceberg:iceberg-azure-bundle:{ICEBERG_VERSION}"
使用 External OAuth 或密钥对身份验证将 Spark 连接到 Iceberg 表¶
前面的代码示例显示了使用编程访问令牌 (PAT) 进行连接的配置。
要使用 External OAuth 或密钥对身份验证将 Spark 连接到 Iceberg 表,请按照以下步骤修改前面的代码示例:
对于
<your_access_token>,请指定您的 External OAuth 或密钥对身份验证的访问令牌。要获取访问令牌,请参阅 第 3 步:获取用于身份验证的访问令牌。
注释以下行:
.config(f"spark.sql.catalog.{CATALOG_NAME}.credential", PAT_TOKEN)取消注释以下行:
#.config(f"spark.sql.catalog.{CATALOG_NAME}.token", ACCESS_TOKEN)
第 3 步:使用 Spark 查询 Iceberg 表¶
使用 Spark 读取受 Snowflake 数据保护策略保护的 Iceberg 表。Spark 可以自动将受 Snowflake 策略保护的表查询通过 Snowflake 进行路由,以确保策略的一致执行。
查询表¶
spark.sql("SHOW NAMESPACES").show(truncate=False)
spark.sql("USE horizoncatalog.<schema_name>")
spark.sql("SHOW TABLES").show(truncate=False)
spark.sql("Select * from <your_table_name_in_snowflake>").show(truncate=False)
监控策略评估查询¶
要监控 Snowflake 中由于策略评估而从 Spark 路由至 Snowflake 的查询活动,您可以监控您的 Snowflake 账户活动。
要监控 Snowflake 中的查询历史记录,请按照 使用 Query History 监控查询活动 中的说明进行操作。
配置数据保护策略的注意事项¶
配置数据保护策略时,请考虑以下事项:
仅当在表上设置了以下数据保护策略时,才支持对使用 Spark 查询的 Iceberg 表强制执行数据保护策略:
掩码策略
基于标签的掩码策略
行访问策略
对受所有其他策略保护的表进行查询将导致错误。