设置 Openflow Connector for Snowflake to Kafka

备注

使用该连接器需遵守 连接器条款

本主题介绍设置 Openflow Connector for Snowflake to Kafka 的步骤。

先决条件

  1. 确保您已查看 关于 Openflow Connector for Snowflake to Kafka

  2. Ensure that you have 设置 Openflow - BYOC or 设置 Openflow - Snowflake 部署 - 任务概述.

  3. 创建 Snowflake 流,用于查询变更信息。

  4. 创建 Kafka 主题,用于接收来自 Snowflake 流的 CDC 消息。

设置 Snowflake 账户

作为 Snowflake 账户管理员,请执行以下任务:

  1. 创建连接器用于读取 CDC 事件的数据库、源表和流对象。例如:

    create database stream_db;
    use database stream_db;
    create table stream_source (user_id varchar, data varchar);
    create stream stream_on_table on table stream_source;
    
    Copy
  2. 创建新角色或使用现有角色,并为流和流的源对象授予 SELECT 权限。连接器还需要对包含流和流的源对象的数据库和架构具有 USAGE 权限。例如:

    create role stream_reader;
    grant usage on database stream_db to role stream_reader;
    grant usage on schema stream_db.public to role stream_reader;
    grant select on stream_source to role stream_reader;
    grant select on stream_on_table to role stream_reader;
    
    Copy
  3. 创建类型为 SERVICE 的新 Snowflake 服务用户。例如:

    create user stream_user type = service;
    
    Copy
  4. 向该 Snowflake 服务用户授予您在前面步骤中创建的角色。例如:

    grant role stream_reader to user stream_user;
    
    Copy
  5. 为第 3 步中的 Snowflake SERVICE 用户配置 密钥对认证

  6. Snowflake 强烈建议执行此步骤。配置 Openflow 支持的密钥管理器(例如 AWS、Azure 和 Hashicorp),并将公钥和私钥存储在密钥存储库中。但是,请注意,第 4 步中生成的私钥可以直接用作连接器配置的配置参数。在这种情况下,私钥存储在 Openflow 运行时配置中。

    备注

    如果您出于任何原因不希望使用密钥管理器,则您有责任根据组织的安全策略保护用于密钥对身份验证的公钥和私钥文件。

    1. 配置密钥管理器后,确定如何对其进行身份验证。在 AWS 中,建议您使用与 Openflow 关联的 EC2 实例角色,因为这样就无需保留其他密钥。

    2. In Openflow, configure a Parameter Provider associated with this Secrets Manager, from the hamburger menu in the upper right. Navigate to Controller Settings » Parameter Provider and then fetch your parameter values.

    3. 此时,可以使用关联的参数路径引用所有凭据,无需在 Openflow 中保留敏感值。

  7. 指定一个仓库供连接器使用。一个连接器可以将单个表复制到单个 Kafka 主题。对于这种处理,您可以选择最小的仓库。

设置连接器

作为数据工程师,执行以下任务以安装和配置连接器:

  1. 导航到 Openflow“Overview”页面。在 Featured connectors 部分中,选择 View more connectors

  2. 在 Openflow 连接器页面上,根据连接器应与哪种 Kafka 代理实例进行通信,查找并选择连接器。

    • mTLS 版本:如果您在使用 SSL(相互 TLS)安全协议,或者如果您在使用 SASL_SSL 协议并连接到使用自签名证书的代理,请选择此连接器。

    • SASL 版本:如果您在使用任何其他安全协议,请选择此连接器

  3. 选择 Add to runtime

  4. Select runtime 对话框中,从 Available runtimes 下拉列表中选择您的运行时。

  5. 选择 Add

  6. 使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。

  7. 使用您的 Snowflake 账户凭据进行运行时身份验证。

    此时将显示 Openflow 画布,其中添加了连接器进程组。

  8. 右键点击导入的进程组并选择 Parameters

  9. 按照 流参数 中所述填充所需的参数值。

流参数

本部分介绍您可以根据以下参数上下文配置的流参数:

Kafka Sink 源参数

参数

描述

必填

Snowflake 账户标识符

使用以下方式时:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR: Snowflake account name formatted as [organization-name]-[account-name] where data will be persisted.

Snowflake 身份验证策略

使用以下方式时:

  • Snowflake Openflow 部署:使用 SNOWFLAKE_SESSION_TOKEN。此令牌由 Snowflake 自动管理。

  • BYOC:使用 KEY_PAIR 作为身份验证策略的值。

源数据库

Source database. This database should contain the Snowflake Stream object that will be consumed.

Snowflake 私钥密码

使用以下方式时:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR:提供与 Snowflake 私钥文件关联的密码。

Snowflake 角色

使用以下方式时:

  • 会话令牌身份验证策略:使用运行时角色。您可以在 Openflow UI 中,通过导航至您的运行时的 View Details 来找到您的运行时角色。

  • KEY_PAIR 身份验证策略:使用为您的服务用户配置的有效角色。

Snowflake 用户名

使用以下方式时:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR: Provide the user name used to connect to the Snowflake instance.

Snowflake 私钥

Leave this blank when using Session Token for your Authentication Strategy. When using KEY_PAIR, provide the RSA private key used for authentication. The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined.

Snowflake 私钥文件

Leave this blank when using Session Token for your Authentication Strategy. When using KEY_PAIR, upload the file that contains the RSA Private Key used for authentication to Snowflake, formatted according to PKCS8 standards and having standard PEM headers and footers. The header line begins with -----BEGIN PRIVATE. Select the Reference asset checkbox to upload the private key file.

源架构

源架构。该架构应包含将被使用的 Snowflake Stream 对象。

Snowflake 仓库

用于运行查询的 Snowflake 仓库

Kafka Sink 目标参数

参数

描述

必填

Kafka 启动服务器

用于向其发送数据的 Kafka 代理列表(以逗号分隔)。

Kafka SASL 机制

用于身份验证的 SASL 机制。对应于 Kafka 客户端 sasl.mechanism 属性。可能的值如下:

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

  • AWS_MSK_IAM

Kafka SASL 用户名

向 Kafka 进行身份验证的用户名

Kafka SASL 密码

向 Kafka 进行身份验证的密码

Kafka 安全协议

用于与代理通信的安全协议。对应于 Kafka 客户端 security.protocol 属性。可能的值如下:

  • PLAINTEXT

  • SASL_PLAINTEXT

  • SASL_SSL

  • SSL

Kafka 主题

Kafka 主题,Snowflake 流中 CDCs 的发送目标

Kafka 消息密钥字段

指定将用作 Kafka 消息密钥的数据库列名称。如果未指定,则不会设置消息密钥。如果指定,则此列的值将用作消息密钥。此参数的值区分大小写。

Kafka 密钥库文件名

密钥库的完整路径,该密钥库存储了用于 mTLS 身份验证方法的客户端密钥和证书。mTLS 身份验证以及使用 SSL 安全协议时需要。

Kafka 密钥库类型

密钥库的类型。身份验证时需要 mTLS。可能的值如下:

  • PKCS12

  • JKS

  • BCFKS

Kafka 密钥库密码

用于保护密钥库文件的密码。

Kafka 密钥密码

存储在密钥库中的私钥的密码。身份验证时需要 mTLS。

Kafka 信任库文件名

存储代理证书的信任库的完整路径。客户将使用该信任库中的证书来验证代理身份。

Kafka 信任库类型

信任库文件的类型。可能的值如下:

  • PKCS12

  • JKS

  • BCFKS

Kafka 信任库密码

信任库文件的密码。

Kafka Sink 引入参数

参数

描述

必填

Snowflake FQN 流名称

完全限定的 Snowflake 流名称。

运行流

  1. 右键点击“飞机”图标并选择 Enable all Controller Services

  2. 右键点击导入的进程组并选择 Start。连接器开始数据引入。

语言: 中文