使用 Python 管理 Snowflake 集成

您可以使用 Python 来管理 Snowflake 中的不同类型的集成。

先决条件

在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root 对象以使用 Snowflake Python APIs 的代码。

例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

代码可通过生成的 Session 对象创建 Root 对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python APIs 连接到 Snowflake

管理目录集成

您可以管理您账户中的 Apache Iceberg™ 的目录集成。目录集成是命名的账户级 Snowflake 对象,用于存储有关不使用 Snowflake 作为 Iceberg 目录或想要与 Snowflake Open Catalog 集成的情况下有关如何组织 Iceberg 表元数据的信息。有关更多信息,请参阅 Apache Iceberg™ 表 中的 目录集成 部分。

备注

ALTER CATALOG INTEGRATION 目前不受支持。

Snowflake Python APIs 使用两种不同类型来表示目录集成:

  • CatalogIntegration:显示目录集成的属性,例如其名称、表格格式和目录设置。

  • CatalogIntegrationResource:显示可用于获取相应 CatalogIntegration 对象和删除目录集成的方法。

创建目录集成

要创建目录集成,首先创建 CatalogIntegration 对象,然后根据 API:code:Root 对象创建 CatalogIntegrationCollection 对象。使用 CatalogIntegrationCollection.create 将新目录集成添加到 Snowflake。

您可以在您的账户中为以下类型的外部 Iceberg 目录创建目录集成。

AWS Glue

以下示例中的代码会创建一个 CatalogIntegration 对象,其表示将 AWS Glue 与所指定属性搭配使用的 Iceberg 表的名为 my_catalog_integration 的目录集成:

from snowflake.core.catalog_integration import CatalogIntegration, Glue

root.catalog_integrations.create(CatalogIntegration(
    name="my_catalog_integration",
    catalog = Glue(
        catalog_namespace="abcd-ns",
        glue_aws_role_arn="arn:aws:iam::123456789012:role/sqsAccess",
        glue_catalog_id="1234567",
    ),
    table_format="ICEBERG",
    enabled=True,
))
Copy

对象存储

以下示例中的代码会创建一个 CatalogIntegration 对象,其表示使用对象存储的 Iceberg 表的名为 my_catalog_integration 的目录集成:

from snowflake.core.catalog_integration import CatalogIntegration, ObjectStore

root.catalog_integrations.create(CatalogIntegration(
    name="my_catalog_integration",
    catalog = ObjectStore(),
    table_format="ICEBERG",
    enabled=True,
))
Copy

Snowflake Open Catalog

以下示例中的代码会创建一个 CatalogIntegration 对象,其表示将 Open Catalog 与所指定属性搭配使用的 Iceberg 表的名为 my_catalog_integration 的目录集成:

from snowflake.core.catalog_integration import CatalogIntegration, OAuth, Polaris, RestConfig

root.catalog_integrations.create(CatalogIntegration(
    name="my_catalog_integration",
    catalog = Polaris(
        catalog_namespace="abcd-ns",
        rest_config=RestConfig(
            catalog_uri="https://my_account.snowflakecomputing.cn/polaris/api/catalog",
            warehouse="my-warehouse",
        ),
        rest_authentication=OAuth(
            type="OAUTH",
            oauth_client_id="my_client_id",
            oauth_client_secret="my_client_secret",
            oauth_allowed_scopes=["PRINCIPAL_ROLE:ALL"],
        ),
    ),
    table_format="ICEBERG",
    enabled=True,
))
Copy

获取目录集成详细信息

您可以通过调用 CatalogIntegrationResource.fetch 方法来获取有关目录集成的信息,该方法会返回一个 CatalogIntegration 对象。

以下示例中的代码会获取有关名为 my_catalog_integration 的目录集成的信息:

my_catalog_integration = root.catalog_integrations["my_catalog_integration"].fetch()
print(my_catalog_integration.to_dict())
Copy

列出目录集成

您可以使用 CatalogIntegrationCollection.iter 方法列出目录集成,该方法会返回 CatalogIntegration 对象的 PagedIter 迭代器。

以下示例中的代码会列出名称以 my 为开头的目录集成,并打印每个目录集成的名称:

catalog_integration_iter = root.catalog_integrations.iter(like="my%")
for catalog_integration_obj in catalog_integration_iter:
  print(catalog_integration_obj.name)
Copy

删除目录集成

您可以使用 CatalogIntegrationResource 对象删除目录集成。

以下示例中的代码会获取 my_catalog_integration 目录集成资源对象,然后删除目录集成。

my_catalog_integration_res = root.catalog_integrations["my_catalog_integration"]
my_catalog_integration_res.drop()
Copy

管理通知集成

您可以管理通知集成;通知集成是一些 Snowflake 对象,用于在 Snowflake 与第三方消息传递服务(例如第三方云消息队列服务、电子邮件服务和 Webhook)之间提供接口。有关更多信息,请参阅 Snowflake 中的通知

备注

ALTER NOTIFICATION INTEGRATION 目前不受支持。

Snowflake Python APIs 使用两种不同类型来表示通知集成:

  • NotificationIntegration:显示通知集成的属性,例如其名称和通知 hook 设置。

  • NotificationIntegrationResource:显示可用于获取相应 NotificationIntegration 对象和删除通知集成的方法。

创建通知集成

要创建通知集成,首先创建 NotificationIntegration 对象,然后根据 API:code:Root 对象创建 NotificationIntegrationCollection 对象。使用 NotificationIntegrationCollection.create 将新通知集成添加到 Snowflake。

您可以为以下类型的消息服务创建通知集成。

电子邮件地址

以下示例中的代码会创建一个 NotificationIntegration 对象,其表示名为 my_email_notification_integration 且具有指定的 NotificationEmail 属性的通知集成:

from snowflake.core.notification_integration import NotificationEmail, NotificationIntegration

my_notification_integration = NotificationIntegration(
  name="my_email_notification_integration",
  notification_hook=NotificationEmail(
      allowed_recipients=["test1@snowflake.com", "test2@snowflake.com"],
      default_recipients=["test1@snowflake.com"],
      default_subject="test default subject",
  ),
  enabled=True,
)

root.notification_integrations.create(my_notification_integration)
Copy

Webhook

以下示例中的代码会创建一个 NotificationIntegration 对象,其表示名为 my_webhook_notification_integration 且具有指定的 NotificationWebhook 属性的通知集成:

from snowflake.core.notification_integration import NotificationIntegration, NotificationWebhook

my_notification_integration = NotificationIntegration(
  name="my_webhook_notification_integration",
  enabled=False,
  notification_hook=NotificationWebhook(
      webhook_url=webhook_url,
      webhook_secret=WebhookSecret(
          # This example assumes that this secret already exists
          name="mySecret".upper(), database_name=database, schema_name=schema
      ),
      webhook_body_template=webhook_template,
      webhook_headers=webhook_headers,
  ),
)

root.notification_integrations.create(my_notification_integration)
Copy

Amazon SNS 主题(出站)

以下示例中的代码会创建一个 NotificationIntegration 对象,其表示名为 my_aws_sns_outbound_notification_integration 且具有指定的 NotificationQueueAwsSnsOutbound 属性的通知集成:

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAwsSnsOutbound

my_notification_integration = NotificationIntegration(
  name="my_aws_sns_outbound_notification_integration",
  enabled=False,
  notification_hook=NotificationQueueAwsSnsOutbound(
      aws_sns_topic_arn="arn:aws:sns:us-west-1:123456789012:sns-test-topic",
      aws_sns_role_arn="arn:aws:iam::123456789012:role/sns-test-topic",
  )
)

root.notification_integrations.create(my_notification_integration)
Copy

Azure 事件网格主题(出站)

以下示例中的代码会创建一个 NotificationIntegration 对象,其表示名为 my_azure_outbound_notification_integration 且具有指定的 NotificationQueueAzureEventGridOutbound 属性的通知集成:

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAzureEventGridOutbound

my_notification_integration = NotificationIntegration(
  name="my_azure_outbound_notification_integration",
  enabled=False,
  notification_hook=NotificationQueueAzureEventGridOutbound(
      azure_event_grid_topic_endpoint="https://fake.queue.core.windows.net/api/events",
      azure_tenant_id="fake.onmicrosoft.com",
  )
)

root.notification_integrations.create(my_notification_integration)
Copy

Azure 事件网格主题(入站)

以下示例中的代码会创建一个 NotificationIntegration 对象,其表示名为 my_azure_inbound_notification_integration 且具有指定的 NotificationQueueAzureEventGridInbound 属性的通知集成:

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAzureEventGridInbound

my_notification_integration = NotificationIntegration(
  name="my_azure_inbound_notification_integration",
  enabled=False,
  notification_hook=NotificationQueueAzureEventGridInbound(
      azure_storage_queue_primary_uri="https://fake.queue.core.windows.net/snowapi_queue",
      azure_tenant_id="fake.onmicrosoft.com",
  ),
)

root.notification_integrations.create(my_notification_integration)
Copy

Google Pub/Sub 主题(出站)

以下示例中的代码会创建一个 NotificationIntegration 对象,其表示名为 my_gcp_outbound_notification_integration 且具有指定的 NotificationQueueGcpPubsubOutbound 属性的通知集成:

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueGcpPubsubOutbound

my_notification_integration = NotificationIntegration(
  name="my_gcp_outbound_notification_integration",
  enabled=False,
  notification_hook=NotificationQueueGcpPubsubOutbound(
      gcp_pubsub_topic_name="projects/fake-project-name/topics/pythonapi-test",
  )
)

root.notification_integrations.create(my_notification_integration)
Copy

Google Pub/Sub 主题(入站)

以下示例中的代码会创建一个 NotificationIntegration 对象,其表示名为 my_gcp_inbound_notification_integration 且具有指定的 NotificationQueueGcpPubsubInbound 属性的通知集成:

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueGcpPubsubInbound

my_notification_integration = NotificationIntegration(
  name="my_gcp_inbound_notification_integration",
  enabled=True,
  notification_hook=NotificationQueueGcpPubsubInbound(
      gcp_pubsub_subscription_name="projects/fake-project-name/subscriptions/sub-test",
  )
)

root.notification_integrations.create(my_notification_integration)
Copy

获取通知集成详细信息

您可以通过调用 NotificationIntegrationResource.fetch 方法来获取有关通知集成的信息,该方法会返回一个 NotificationIntegration 对象。

以下示例中的代码会获取有关名为 my_notification_integration 的通知集成的信息:

my_notification_integration = root.notification_integrations["my_notification_integration"].fetch()
print(my_notification_integration.to_dict())
Copy

列出通知集成

您可以使用 NotificationIntegrationCollection.iter 方法列出通知集成,该方法会返回 NotificationIntegration 对象的 PagedIter 迭代器。

以下示例中的代码会列出名称以 my 开头的通知集成,并打印每个通知集成的名称:

notification_integration_iter = root.notification_integrations.iter(like="my%")
for notification_integration_obj in notification_integration_iter:
  print(notification_integration_obj.name)
Copy

删除通知集成

您可以使用 NotificationIntegrationResource 对象删除通知集成。

以下示例中的代码会获取 my_notification_integration 通知集成资源对象,然后删除通知集成。

my_notification_integration_res = root.notification_integrations["my_notification_integration"]
my_notification_integration_res.drop()
Copy
语言: 中文