使用 Python 管理 Snowflake 集成¶
您可以使用 Python 来管理 Snowflake 中的不同类型的集成。
先决条件¶
在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root
对象以使用 Snowflake Python APIs 的代码。
例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
代码可通过生成的 Session
对象创建 Root
对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python APIs 连接到 Snowflake。
管理目录集成¶
您可以管理您账户中的 Apache Iceberg™ 的目录集成。目录集成是命名的账户级 Snowflake 对象,用于存储有关不使用 Snowflake 作为 Iceberg 目录或想要与 Snowflake Open Catalog 集成的情况下有关如何组织 Iceberg 表元数据的信息。有关更多信息,请参阅 Apache Iceberg™ 表 中的 目录集成 部分。
备注
ALTER CATALOG INTEGRATION 目前不受支持。
Snowflake Python APIs 使用两种不同类型来表示目录集成:
CatalogIntegration
:显示目录集成的属性,例如其名称、表格格式和目录设置。CatalogIntegrationResource
:显示可用于获取相应CatalogIntegration
对象和删除目录集成的方法。
创建目录集成¶
要创建目录集成,首先创建 CatalogIntegration
对象,然后根据 API:code:Root
对象创建 CatalogIntegrationCollection
对象。使用 CatalogIntegrationCollection.create
将新目录集成添加到 Snowflake。
您可以在您的账户中为以下类型的外部 Iceberg 目录创建目录集成。
AWS Glue¶
以下示例中的代码会创建一个 CatalogIntegration
对象,其表示将 AWS Glue 与所指定属性搭配使用的 Iceberg 表的名为 my_catalog_integration
的目录集成:
from snowflake.core.catalog_integration import CatalogIntegration, Glue
root.catalog_integrations.create(CatalogIntegration(
name="my_catalog_integration",
catalog = Glue(
catalog_namespace="abcd-ns",
glue_aws_role_arn="arn:aws:iam::123456789012:role/sqsAccess",
glue_catalog_id="1234567",
),
table_format="ICEBERG",
enabled=True,
))
对象存储¶
以下示例中的代码会创建一个 CatalogIntegration
对象,其表示使用对象存储的 Iceberg 表的名为 my_catalog_integration
的目录集成:
from snowflake.core.catalog_integration import CatalogIntegration, ObjectStore
root.catalog_integrations.create(CatalogIntegration(
name="my_catalog_integration",
catalog = ObjectStore(),
table_format="ICEBERG",
enabled=True,
))
Snowflake Open Catalog¶
以下示例中的代码会创建一个 CatalogIntegration
对象,其表示将 Open Catalog 与所指定属性搭配使用的 Iceberg 表的名为 my_catalog_integration
的目录集成:
from snowflake.core.catalog_integration import CatalogIntegration, OAuth, Polaris, RestConfig
root.catalog_integrations.create(CatalogIntegration(
name="my_catalog_integration",
catalog = Polaris(
catalog_namespace="abcd-ns",
rest_config=RestConfig(
catalog_uri="https://my_account.snowflakecomputing.cn/polaris/api/catalog",
warehouse="my-warehouse",
),
rest_authentication=OAuth(
type="OAUTH",
oauth_client_id="my_client_id",
oauth_client_secret="my_client_secret",
oauth_allowed_scopes=["PRINCIPAL_ROLE:ALL"],
),
),
table_format="ICEBERG",
enabled=True,
))
获取目录集成详细信息¶
您可以通过调用 CatalogIntegrationResource.fetch
方法来获取有关目录集成的信息,该方法会返回一个 CatalogIntegration
对象。
以下示例中的代码会获取有关名为 my_catalog_integration
的目录集成的信息:
my_catalog_integration = root.catalog_integrations["my_catalog_integration"].fetch()
print(my_catalog_integration.to_dict())
列出目录集成¶
您可以使用 CatalogIntegrationCollection.iter
方法列出目录集成,该方法会返回 CatalogIntegration
对象的 PagedIter
迭代器。
以下示例中的代码会列出名称以 my
为开头的目录集成,并打印每个目录集成的名称:
catalog_integration_iter = root.catalog_integrations.iter(like="my%")
for catalog_integration_obj in catalog_integration_iter:
print(catalog_integration_obj.name)
删除目录集成¶
您可以使用 CatalogIntegrationResource
对象删除目录集成。
以下示例中的代码会获取 my_catalog_integration
目录集成资源对象,然后删除目录集成。
my_catalog_integration_res = root.catalog_integrations["my_catalog_integration"]
my_catalog_integration_res.drop()
管理通知集成¶
您可以管理通知集成;通知集成是一些 Snowflake 对象,用于在 Snowflake 与第三方消息传递服务(例如第三方云消息队列服务、电子邮件服务和 Webhook)之间提供接口。有关更多信息,请参阅 Snowflake 中的通知。
备注
ALTER NOTIFICATION INTEGRATION 目前不受支持。
Snowflake Python APIs 使用两种不同类型来表示通知集成:
NotificationIntegration
:显示通知集成的属性,例如其名称和通知 hook 设置。NotificationIntegrationResource
:显示可用于获取相应NotificationIntegration
对象和删除通知集成的方法。
创建通知集成¶
要创建通知集成,首先创建 NotificationIntegration
对象,然后根据 API:code:Root
对象创建 NotificationIntegrationCollection
对象。使用 NotificationIntegrationCollection.create
将新通知集成添加到 Snowflake。
您可以为以下类型的消息服务创建通知集成。
电子邮件地址¶
以下示例中的代码会创建一个 NotificationIntegration
对象,其表示名为 my_email_notification_integration
且具有指定的 NotificationEmail
属性的通知集成:
from snowflake.core.notification_integration import NotificationEmail, NotificationIntegration
my_notification_integration = NotificationIntegration(
name="my_email_notification_integration",
notification_hook=NotificationEmail(
allowed_recipients=["test1@snowflake.com", "test2@snowflake.com"],
default_recipients=["test1@snowflake.com"],
default_subject="test default subject",
),
enabled=True,
)
root.notification_integrations.create(my_notification_integration)
Webhook¶
以下示例中的代码会创建一个 NotificationIntegration
对象,其表示名为 my_webhook_notification_integration
且具有指定的 NotificationWebhook
属性的通知集成:
from snowflake.core.notification_integration import NotificationIntegration, NotificationWebhook
my_notification_integration = NotificationIntegration(
name="my_webhook_notification_integration",
enabled=False,
notification_hook=NotificationWebhook(
webhook_url=webhook_url,
webhook_secret=WebhookSecret(
# This example assumes that this secret already exists
name="mySecret".upper(), database_name=database, schema_name=schema
),
webhook_body_template=webhook_template,
webhook_headers=webhook_headers,
),
)
root.notification_integrations.create(my_notification_integration)
Amazon SNS 主题(出站)¶
以下示例中的代码会创建一个 NotificationIntegration
对象,其表示名为 my_aws_sns_outbound_notification_integration
且具有指定的 NotificationQueueAwsSnsOutbound
属性的通知集成:
from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAwsSnsOutbound
my_notification_integration = NotificationIntegration(
name="my_aws_sns_outbound_notification_integration",
enabled=False,
notification_hook=NotificationQueueAwsSnsOutbound(
aws_sns_topic_arn="arn:aws:sns:us-west-1:123456789012:sns-test-topic",
aws_sns_role_arn="arn:aws:iam::123456789012:role/sns-test-topic",
)
)
root.notification_integrations.create(my_notification_integration)
Azure 事件网格主题(出站)¶
以下示例中的代码会创建一个 NotificationIntegration
对象,其表示名为 my_azure_outbound_notification_integration
且具有指定的 NotificationQueueAzureEventGridOutbound
属性的通知集成:
from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAzureEventGridOutbound
my_notification_integration = NotificationIntegration(
name="my_azure_outbound_notification_integration",
enabled=False,
notification_hook=NotificationQueueAzureEventGridOutbound(
azure_event_grid_topic_endpoint="https://fake.queue.core.windows.net/api/events",
azure_tenant_id="fake.onmicrosoft.com",
)
)
root.notification_integrations.create(my_notification_integration)
Azure 事件网格主题(入站)¶
以下示例中的代码会创建一个 NotificationIntegration
对象,其表示名为 my_azure_inbound_notification_integration
且具有指定的 NotificationQueueAzureEventGridInbound
属性的通知集成:
from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAzureEventGridInbound
my_notification_integration = NotificationIntegration(
name="my_azure_inbound_notification_integration",
enabled=False,
notification_hook=NotificationQueueAzureEventGridInbound(
azure_storage_queue_primary_uri="https://fake.queue.core.windows.net/snowapi_queue",
azure_tenant_id="fake.onmicrosoft.com",
),
)
root.notification_integrations.create(my_notification_integration)
Google Pub/Sub 主题(出站)¶
以下示例中的代码会创建一个 NotificationIntegration
对象,其表示名为 my_gcp_outbound_notification_integration
且具有指定的 NotificationQueueGcpPubsubOutbound
属性的通知集成:
from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueGcpPubsubOutbound
my_notification_integration = NotificationIntegration(
name="my_gcp_outbound_notification_integration",
enabled=False,
notification_hook=NotificationQueueGcpPubsubOutbound(
gcp_pubsub_topic_name="projects/fake-project-name/topics/pythonapi-test",
)
)
root.notification_integrations.create(my_notification_integration)
Google Pub/Sub 主题(入站)¶
以下示例中的代码会创建一个 NotificationIntegration
对象,其表示名为 my_gcp_inbound_notification_integration
且具有指定的 NotificationQueueGcpPubsubInbound
属性的通知集成:
from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueGcpPubsubInbound
my_notification_integration = NotificationIntegration(
name="my_gcp_inbound_notification_integration",
enabled=True,
notification_hook=NotificationQueueGcpPubsubInbound(
gcp_pubsub_subscription_name="projects/fake-project-name/subscriptions/sub-test",
)
)
root.notification_integrations.create(my_notification_integration)
获取通知集成详细信息¶
您可以通过调用 NotificationIntegrationResource.fetch
方法来获取有关通知集成的信息,该方法会返回一个 NotificationIntegration
对象。
以下示例中的代码会获取有关名为 my_notification_integration
的通知集成的信息:
my_notification_integration = root.notification_integrations["my_notification_integration"].fetch()
print(my_notification_integration.to_dict())
列出通知集成¶
您可以使用 NotificationIntegrationCollection.iter
方法列出通知集成,该方法会返回 NotificationIntegration
对象的 PagedIter
迭代器。
以下示例中的代码会列出名称以 my
开头的通知集成,并打印每个通知集成的名称:
notification_integration_iter = root.notification_integrations.iter(like="my%")
for notification_integration_obj in notification_integration_iter:
print(notification_integration_obj.name)
删除通知集成¶
您可以使用 NotificationIntegrationResource
对象删除通知集成。
以下示例中的代码会获取 my_notification_integration
通知集成资源对象,然后删除通知集成。
my_notification_integration_res = root.notification_integrations["my_notification_integration"]
my_notification_integration_res.drop()