使用 External OAuth 连接到 Snowflake Open Catalog¶
本主题介绍如何通过客户端应用程序,使用外部 OAuth 连接到 Snowflake Open Catalog。
本主题中的示例代码展示了如何使用 Apache Spark™ 进行连接,且代码以 PySpark 编写。
备注
如果您使用 Snowflake 查询 Open Catalog 管理的表,则可以为 Snowflake 创建使用 External OAuth 的目录集成。有关更多信息,请参阅 Snowflake 文档中的 CREATE CATALOG INTEGRATION (Snowflake Open Catalog)。
先决条件¶
在连接到 Open Catalog with External OAuth 之前,您需要在 Open Catalog 中配置 External OAuth。有关说明,请参阅 在 Snowflake Open Catalog 中配置外部 OAuth。
使用自动刷新令牌连接 Open Catalog(首选方法)¶
使用此方法可通过自动刷新令牌进行连接,从而无需手动刷新令牌。
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,<maven_coordinate>') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'opencatalog') \
.config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.opencatalog.type', 'rest') \
.config('spark.sql.catalog.opencatalog.uri','https://<open_catalog_account_identifier>.snowflakecomputing.cn/polaris/api/catalog') \
.config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
.config('spark.sql.catalog.opencatalog.warehouse','<catalog_name>') \
.config('spark.sql.catalog.opencatalog.rest.auth.type','oauth2') \
.config('spark.sql.catalog.opencatalog.oauth2-server-uri','<oauth2_server_uri>') \
.config('spark.sql.catalog.opencatalog.credential','<oauth_client_id>:<oauth_client_secret>') \
.config('spark.sql.catalog.opencatalog.scope','SESSION:ROLE:<custom_role>') \
.config('spark.sql.catalog.opencatalog.audience','https://<open_catalog_account_identifier>.snowflakecomputing.cn') \
.getOrCreate()
参数¶
参数 |
描述 |
|---|---|
|
指定要连接的目录的名称。 |
|
指定外部云存储提供商的 Maven 坐标:
|
|
指定 Open Catalog 账户的账户标识符。 |
|
您的 OAuth2 服务器 URI。 |
|
您的 OAuth2 客户端 ID。 |
|
您的 OAuth2 客户端密钥。 |
|
Open Catalog 中要向服务主体授予其权限的自定义角色的名称。 |
使用访问令牌连接 Open Catalog¶
如果需要,您可以使用访问令牌连接 Open Catalog。但是,访问令牌将过期,您需要手动刷新。或者,您可以 使用自动刷新令牌进行连接。
以下示例代码用于使用 Spark 与 Open Catalog 连接。
参数¶
参数 |
描述 |
|---|---|
|
指定要连接的目录的名称。 |
|
指定外部云存储提供商的 Maven 坐标:
|
|
指定客户端应用程序要使用的访问令牌。 |
|
指定 Open Catalog 账户的账户标识符。 |
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,<maven_coordinate>') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'opencatalog') \
.config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.opencatalog.type', 'rest') \
.config('spark.sql.catalog.opencatalog.uri','https://<open_catalog_account_identifier>.snowflakecomputing.cn/polaris/api/catalog') \
.config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
.config('spark.sql.catalog.opencatalog.token','<access_token>') \
.config('spark.sql.catalog.opencatalog.warehouse','<catalog_name>') \
.getOrCreate()
通过跨区域连接进行连接(仅限 Amazon S3)¶
以下示例代码用于在以下情况为 True 时连接到 Open Catalog:
您的 Open Catalog 账户托管在 Amazon S3 上。
您的外部存储提供商是 Amazon S3。
您的 Open Catalog 账户托管在一个 S3 区域,该区域与包含您的 Apache Tables™ 表的存储桶所在的 S3 区域不同。
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,software.amazon.awssdk:bundle:2.20.160') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'opencatalog') \
.config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.opencatalog.type', 'rest') \
.config('spark.sql.catalog.opencatalog.uri','https://<open_catalog_account_identifier>.snowflakecomputing.cn/polaris/api/catalog') \
.config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
.config('spark.sql.catalog.opencatalog.token','<access_token>') \
.config('spark.sql.catalog.opencatalog.warehouse','<catalog_name>') \
.config('spark.sql.catalog.opencatalog.client.region','<target_s3_region>') \
.getOrCreate()
参数¶
参数 |
描述 |
|---|---|
|
指定要连接的目录的名称。 |
|
指定客户端应用程序要使用的访问令牌。 |
|
指定 Open Catalog 账户的账户标识符。根据账户所在区域和云平台的不同,此标识符可能本身就是账户定位器(例如, |
|
指定包含 Apache Iceberg 表的 S3 桶所在的区域代码。有关区域代码,请参阅 AWS 服务端点 (https://docs.aws.amazon.com/general/latest/gr/s3.html#s3_region),并参阅表中的区域列。 |
示例¶
本部分包含使用 Spark 连接到 Open Catalog 的示例:
示例 1:在以下 和 命令中,服务名称不会显式指定数据库和架构名称。连接 (S3)¶
请参阅:
使用自动刷新进行连接 (S3)¶
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,software.amazon.awssdk:bundle:2.20.160') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'opencatalog') \
.config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.opencatalog.type', 'rest') \
.config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.cn/polaris/api/catalog') \
.config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
.config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
.config('spark.sql.catalog.opencatalog.rest.auth.type','oauth2') \
.config('spark.sql.catalog.opencatalog.oauth2-server-uri','your-tenant.region.auth0.com') \
.config('spark.sql.catalog.opencatalog.credential','11111111111111111111111111111111:222222222222222222222222222222222222222222222222222222222222222222') \
.config('spark.sql.catalog.opencatalog.scope','SESSION:ROLE:DATA_ENG') \
.config('spark.sql.catalog.opencatalog.audience','https://ab12345.snowflakecomputing.cn') \
.getOrCreate()
使用访问令牌进行连接 (S3)¶
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,software.amazon.awssdk:bundle:2.20.160') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'opencatalog') \
.config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.opencatalog.type', 'rest') \
.config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.cn/polaris/api/catalog') \
.config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
.config('spark.sql.catalog.opencatalog.token','0000000000000000000000000001111111111111111111111111111111111111111111') \
.config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
.getOrCreate()
示例 2:在以下 和 命令中,服务名称包括数据库和架构名称。连接 (Google Cloud Storage)¶
请参阅:
使用自动刷新进行连接 (Google Cloud Storage)¶
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,org.apache.iceberg:iceberg-gcp-bundle:1.5.2') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'opencatalog') \
.config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.opencatalog.type', 'rest') \
.config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.cn/polaris/api/catalog') \
.config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
.config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
.config('spark.sql.catalog.opencatalog.rest.auth.type','oauth2') \
.config('spark.sql.catalog.opencatalog.oauth2-server-uri','your-tenant.region.auth0.com') \
.config('spark.sql.catalog.opencatalog.credential','11111111111111111111111111111111:222222222222222222222222222222222222222222222222222222222222222222') \
.config('spark.sql.catalog.opencatalog.scope','SESSION:ROLE:DATA_ENG') \
.config('spark.sql.catalog.opencatalog.audience','https://ab12345.snowflakecomputing.cn') \
.getOrCreate()
使用访问令牌进行连接 (Google Cloud Storage)¶
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,org.apache.iceberg:iceberg-gcp-bundle:1.5.2') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'opencatalog') \
.config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.opencatalog.type', 'rest') \
.config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.cn/polaris/api/catalog') \
.config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
.config('spark.sql.catalog.opencatalog.token','0000000000000000000000000001111111111111111111111111111111111111111111') \
.config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
.getOrCreate()
示例 3:连接 (Azure)¶
请参阅:
使用自动刷新进行连接 (Azure)¶
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,org.apache.iceberg:iceberg-azure-bundle:1.5.2') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'opencatalog') \
.config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.opencatalog.type', 'rest') \
.config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.cn/polaris/api/catalog') \
.config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
.config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
.config('spark.sql.catalog.opencatalog.rest.auth.type','oauth2') \
.config('spark.sql.catalog.opencatalog.oauth2-server-uri','your-tenant.region.auth0.com') \
.config('spark.sql.catalog.opencatalog.credential','11111111111111111111111111111111:222222222222222222222222222222222222222222222222222222222222222222') \
.config('spark.sql.catalog.opencatalog.scope','SESSION:ROLE:DATA_ENG') \
.config('spark.sql.catalog.opencatalog.audience','https://ab12345.snowflakecomputing.cn') \
.getOrCreate()
使用访问令牌进行连接 (Azure)¶
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,org.apache.iceberg:iceberg-azure-bundle:1.5.2') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'opencatalog') \
.config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.opencatalog.type', 'rest') \
.config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.cn/polaris/api/catalog') \
.config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
.config('spark.sql.catalog.opencatalog.token','0000000000000000000000000001111111111111111111111111111111111111111111') \
.config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
.getOrCreate()
验证与 Open Catalog 的连接¶
要验证 Spark 是否已连接到 Open Catalog,应列出目录的命名空间。有关更多信息,请参阅 列出命名空间。