设置 Openflow Connector for Snowflake to Kafka¶
备注
使用该连接器需遵守 连接器条款。
本主题介绍设置 Openflow Connector for Snowflake to Kafka 的步骤。
先决条件¶
确保您已 设置 Openflow。
创建 Snowflake 流,用于查询变更信息。
创建 Kafka 主题,用于接收来自 Snowflake 流的 CDC 消息。
设置 Snowflake 账户¶
作为 Snowflake 账户管理员,请执行以下任务:
创建连接器用于读取 CDC 事件的数据库、源表和流对象。例如:
create database stream_db; use database stream_db; create table stream_source (user_id varchar, data varchar); create stream stream_on_table on table stream_source;
创建新角色或使用现有角色,并为流和流的源对象授予 SELECT 权限。连接器还需要对包含流和流的源对象的数据库和架构具有 USAGE 权限。例如:
create role stream_reader; grant usage on database stream_db to role stream_reader; grant usage on schema stream_db.public to role stream_reader; grant select on stream_source to role stream_reader; grant select on stream_on_table to role stream_reader;
创建类型为 SERVICE 的新 Snowflake 服务用户。例如:
create user stream_user type = service;
向该 Snowflake 服务用户授予您在前面步骤中创建的角色。例如:
grant role stream_reader to user stream_user;
为第 3 步中的 Snowflake SERVICE 用户配置 密钥对认证。
Snowflake 强烈建议执行此步骤。配置 Openflow 支持的密钥管理器(例如 AWS、Azure 和 Hashicorp),并将公钥和私钥存储在密钥存储库中。但是,请注意,第 4 步中生成的私钥可以直接用作连接器配置的配置参数。在这种情况下,私钥存储在 Openflow 运行时配置中。
备注
如果您出于任何原因不希望使用密钥管理器,则您有责任根据组织的安全策略保护用于密钥对身份验证的公钥和私钥文件。
配置密钥管理器后,确定如何对其进行身份验证。在 AWS 中,建议您使用与 Openflow 关联的 EC2 实例角色,因为这样就无需保留其他密钥。
在 Openflow 中,从右上角的汉堡菜单中配置与此密钥管理器关联的参数提供商。导航到 Controller Settings » Parameter Provider,然后提取您的参数值。
此时,可以使用关联的参数路径引用所有凭据,无需在 Openflow 中保留敏感值。
指定一个仓库供连接器使用。一个连接器可以将单个表复制到单个 Kafka 主题。对于这种处理,您可以选择最小的仓库。
设置连接器¶
作为数据工程师,执行以下任务以安装和配置连接器:
导航到 Openflow“Overview”页面。在 Featured connectors 部分中,选择 View more connectors。
在 Openflow 连接器页面上,根据连接器应与哪种 Kafka 代理实例进行通信,查找并选择连接器。
mTLS 版本:如果您在使用 SSL(相互 TLS)安全协议,或者如果您在使用 SASL_SSL 协议并连接到使用自签名证书的代理,请选择此连接器。
SASL 版本:如果您在使用任何其他安全协议,请选择此连接器
选择 Add to runtime。
在 Select runtime 对话框中,从 Available runtimes 下拉列表中选择您的运行时。
选择 Add。
使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。
使用您的 Snowflake 账户凭据进行运行时身份验证。
此时将显示 Openflow 画布,其中添加了连接器进程组。
右键点击导入的进程组并选择 Parameters。
按照 流参数 中所述填充所需的参数值。
流参数¶
本部分介绍您可以根据以下参数上下文配置的流参数:
Kafka Sink 源参数¶
参数 |
描述 |
必填 |
---|---|---|
Snowflake 账户标识符 |
Snowflake 账户名称格式为 [organization-name]-[account-name],数据将在保存在其中。示例: |
是 |
Snowflake 身份验证策略 |
对 Snowflake 进行身份验证的策略。可能的值如下:
|
是 |
源数据库 |
源数据库。该数据库应包含将被使用的 Snowflake Stream 对象。 |
是 |
Snowflake 私钥密码 |
与 Snowflake 私钥关联的密码。如果所使用的私钥没有密码保护,则不需要这样做。 |
否 |
Snowflake 角色 |
查询执行期间使用的 Snowflake 角色 |
是 |
Snowflake 用户名 |
用于连接到 Snowflake 实例的用户名 |
是 |
Snowflake 仓库 |
用于运行查询的 Snowflake 仓库 |
是 |
Snowflake 私钥 |
用于身份验证的 RSA 私钥。RSA 密钥必须按照 PKCS8 标准进行格式化,并具有标准的 PEM 头尾标记。请注意,必须定义 Snowflake 私钥文件或 Snowflake 私钥。 |
是 |
Snowflake 私钥文件 |
该文件包含用于对 Snowflake 进行身份验证的 RSA 私钥,该私钥根据 PKCS8 标准进行格式化,并具有标准的 PEM 头尾标记。头标记行以 |
否 |
源架构 |
源架构。该架构应包含将被使用的 Snowflake Stream 对象。 |
是 |
Kafka Sink 目标参数¶
参数 |
描述 |
必填 |
---|---|---|
Kafka 启动服务器 |
用于向其发送数据的 Kafka 代理列表(以逗号分隔)。 |
是 |
Kafka SASL 机制 |
用于身份验证的 SASL 机制。对应于 Kafka 客户端
|
是 |
Kafka SASL 用户名 |
向 Kafka 进行身份验证的用户名 |
是 |
Kafka SASL 密码 |
向 Kafka 进行身份验证的密码 |
是 |
Kafka 安全协议 |
用于与代理通信的安全协议。对应于 Kafka 客户端
|
是 |
Kafka 主题 |
Kafka 主题,Snowflake 流中 CDCs 的发送目标 |
是 |
Kafka 消息密钥字段 |
指定将用作 Kafka 消息密钥的数据库列名称。如果未指定,则不会设置消息密钥。如果指定,则此列的值将用作消息密钥。此参数的值区分大小写。 |
否 |
Kafka 密钥库文件名 |
密钥库的完整路径,该密钥库存储了用于 mTLS 身份验证方法的客户端密钥和证书。mTLS 身份验证以及使用 SSL 安全协议时需要。 |
否 |
Kafka 密钥库类型 |
密钥库的类型。身份验证时需要 mTLS。可能的值如下:
|
否 |
Kafka 密钥库密码 |
用于保护密钥库文件的密码。 |
否 |
Kafka 密钥密码 |
存储在密钥库中的私钥的密码。身份验证时需要 mTLS。 |
否 |
Kafka 信任库文件名 |
存储代理证书的信任库的完整路径。客户将使用该信任库中的证书来验证代理身份。 |
否 |
Kafka 信任库类型 |
信任库文件的类型。可能的值如下:
|
否 |
Kafka 信任库密码 |
信任库文件的密码。 |
否 |
Kafka Sink 引入参数¶
参数 |
描述 |
必填 |
---|---|---|
Snowflake FQN 流名称 |
完全限定的 Snowflake 流名称。 |
是 |
运行流¶
右键点击“飞机”图标并选择 Enable all Controller Services。
右键点击导入的进程组并选择 Start。连接器开始数据引入。