设置 Openflow Connector for Kafka

备注

使用该连接器需遵守 连接器条款

先决条件

  1. 确保您已查看 Openflow Connector for Kafka

  2. 确保您已 设置 Openflow

连接器类型

Openflow Connector for Kafka 有三种不同的配置可供选择,每种配置都针对特定用例进行了优化。您可以从连接器库下载这些连接器定义:

适用于 JSON 数据格式的 Apache Kafka

适用于 JSON 消息引入的简化连接器,具有架构演化和主题到表的映射功能

适用于 AVRO 数据格式的 Apache Kafka

适用于 AVRO 消息引入的简化连接器,具有架构演化和主题到表的映射功能

Apache Kafka with DLQ and metadata

功能齐全的连接器,支持死信队列 (DLQ)、元数据处理,功能与 Snowflake Connector for Kafka 相同

有关特定连接器类型的详细配置,请参阅:

您应该选择哪个连接器?

选择最符合您的数据格式、操作要求和功能需求的连接器变体:

在以下情况下,请选择 Apache Kafka for JSON 或 AVRO 数据格式

  • 您的 Kafka 消息采用 JSON 或 AVRO 格式

  • 您需要基本的架构演化功能

  • 您想要一个配置最少的简单设置

  • 您不需要高级错误处理或死信队列功能

  • 您要设置新的集成,想要快速开始

特定于格式的注意事项:

  • **JSON 格式:**更灵活适应多种数据结构,便于调试和检查

  • **AVRO 格式:**强类型数据,内置架构注册表集成,更适用于结构化数据管道

在以下情况下,请选择 Apache Kafka with DLQ and metadata

  • 您要从 Snowflake Connector for Kafka 迁移,并且需要与兼容功能相同的功能

  • 您需要强大的错误处理功能以及对失败消息的死信队列支持。

  • 您需要有关消息引入的详细元数据(时间戳、偏移量、标头)

迁移注意事项

如果您目前正在使用 Snowflake Connector for Kafka,请选择 Apache Kafka with DLQ and metadata 连接器,以获得具有功能兼容性的无缝迁移体验。

字段名称处理差异:Openflow Connector for Kafka 处理字段名称中特殊字符的方式与 Snowflake Connector for Kafka 不同。迁移后,由于这些命名惯例的差异,Openflow Connector for Kafka 可能会创建名称不同的新 Snowflake 列。有关如何转换字段名称的详细信息,请参阅 字段名称映射和特殊字符处理

性能注意事项

  • JSON 和 AVRO 格式连接器因其简化的设计,可为简单的用例提供更好的性能

  • DLQ and metadata 连接器提供更全面的监控和错误处理功能,但代价是资源使用量略高

设置 Snowflake 账户

作为 Snowflake 账户管理员,请执行以下任务:

  1. 创建类型为 SERVICE 的新 Snowflake 服务用户。

  2. 创建新角色或使用现有角色并授予 数据库权限

    由于连接器能够自动创建目标表(如果目标表尚不存在),请确保用户具有创建和管理 Snowflake 对象所需的权限:

    对象

    权限

    备注

    数据库

    USAGE

    架构

    USAGE . CREATE TABLE .

    创建架构级对象后,可以撤销 CREATE object 权限。

    OWNERSHIP

    仅在使用 Kafka Connector 将数据引入到 现有 表时需要。. 如果连接器为来自 Kafka 主题的记录创建新的目标表,则配置中指定的用户的默认角色将成为表所有者。

    Snowflake 建议为每个 Kafka 实例创建单独的用户和角色,以便更好地进行访问控制。

    您可以使用以下脚本创建和配置自定义角色(需要 SECURITYADMIN 或等效角色):

    USE ROLE securityadmin;
    
    CREATE ROLE kafka_connector_role_1;
    GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;
    GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
    GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
    
    -- Only for existing tables
    GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;
    
    Copy

    请注意,权限必须直接授予连接器角色且不能继承。

  3. 向该 Snowflake 服务用户授予您在前面步骤中创建的角色。

    角色应分配为用户的默认角色:

    GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;
    ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
    
    Copy
  4. 为第 1 步中创建的 Snowflake SERVICE 用户配置 密钥对身份验证

  5. Snowflake 强烈建议执行此步骤。配置 Openflow 支持的密钥管理器(例如 AWS、Azure 和 Hashicorp),并将公钥和私钥存储在密钥存储库中。

    备注

    如果您出于任何原因不希望使用密钥管理器,则您有责任根据组织的安全策略保护用于密钥对身份验证的公钥和私钥文件。

    1. 配置密钥管理器后,确定如何对其进行身份验证。在 AWS 中,建议您使用与 Openflow 关联的 EC2 实例角色,因为这样就无需保留其他密钥。

    2. 在 Openflow 中,从右上角的汉堡菜单中配置与此密钥管理器关联的参数提供商。导航到 Controller Settings » Parameter Provider,然后提取您的参数值。

    3. 此时,可以使用关联的参数路径引用所有凭据,无需在 Openflow 中保留敏感值。

  6. 如果任何其他 Snowflake 用户需要访问连接器引入的原始文档和表(例如,在 Snowflake 中进行自定义处理),则授予这些用户在步骤 1 中创建的角色。

设置连接器

作为数据工程师,执行以下任务以安装和配置连接器:

安装连接器

  1. 导航到 Openflow“Overview”页面。在 Featured connectors 部分中,选择 View more connectors

  2. 在 Openflow 连接器页面上,找到连接器并选择 Add to runtime

  3. Select runtime 对话框中,从 Available runtimes 下拉列表中选择您的运行时。

  4. 选择 Add

    备注

    在安装连接器之前,请确保在 Snowflake 中为连接器创建了数据库和架构,用于存储引入的数据。

  5. 使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。

  6. 使用您的 Snowflake 账户凭据进行运行时身份验证。

此时将显示 Openflow 画布,其中添加了连接器进程组。

配置连接器

  1. 填充进程组参数

    1. 右键点击导入的进程组并选择 Parameters

    2. 按照 常用参数 中所述填充所需的参数值。

常用参数

所有 Kafka 连接器变体都共享用于基本连接和身份验证的通用参数上下文。

Snowflake 目标参数

参数

描述

必填

目标数据库

用于永久保存数据的数据库。它必须已经存在于 Snowflake 中

目标架构

用于永久保存数据的架构。它必须已经存在于 Snowflake 中。此参数区分大小写。请以大写形式提供此参数,除非架构是使用带双引号名称创建的(此时需确保大小写匹配),但不要包含两侧的双引号。请参阅以下示例:

  • CREATE SCHEMA SCHEMA_NAMECREATE SCHEMA schema_name – 使用 SCHEMA_NAME

  • CREATE SCHEMA "schema_name"CREATE SCHEMA "SCHEMA_NAME" – 分别使用 schema_nameSCHEMA_NAME

Snowflake 账户标识符

Snowflake 账户名称格式为 [organization-name]-[account-name],数据永久保存在其中

Snowflake 身份验证策略

对 Snowflake 进行身份验证的策略。可能的值:SNOWFLAKE_SESSION_TOKEN – 当我们在 SPCS 上运行流时;KEY_PAIR – 当我们想使用私钥设置访问权限时

Snowflake 私钥

用于身份验证的 RSA 私钥。RSA 密钥必须按照 PKCS8 标准进行格式化,并具有标准的 PEM 头尾标记。请注意,必须定义 Snowflake 私钥文件或 Snowflake 私钥

Snowflake 私钥文件

该文件包含用于对 Snowflake 进行身份验证的 RSA 私钥,该私钥根据 PKCS8 标准进行格式化,并具有标准的 PEM 头尾标记。头标记行以 -----BEGIN PRIVATE 开头。选中 Reference asset 复选框,上传私钥文件。

Snowflake 私钥密码

与 Snowflake 私钥文件关联的密码

Snowflake 角色

查询执行期间使用的 Snowflake 角色

Snowflake 用户名

用于连接到 Snowflake 实例的用户名

Kafka 源参数(SASL 身份验证)

参数

描述

必填

Kafka 安全协议

用于与代理通信的安全协议。与 Kafka 客户端的“security.protocol”属性相对应。其中之一:SASL_PLAINTEXT/SASL_SSL

Kafka SASL 机制

用于身份验证的 SASL 机制。与 Kafka 客户端的“sasl.mechanism”属性相对应。其中之一:PLAIN/SCRAM-SHA-256/SCRAM-SHA-512

Kafka SASL 用户名

向 Kafka 进行身份验证的用户名

Kafka SASL 密码

向 Kafka 进行身份验证的密码

Kafka 启动服务器

要从中提取数据的 Kafka 代理列表(以逗号分隔)应包含端口,例如 kafka-broker:9092。DLQ 主题使用相同的实例。

Kafka 引入参数

参数

描述

必填

Kafka 主题格式

其中之一:names/pattern。指定提供的“Kafka Topics”是以逗号分隔的名称列表还是单个正则表达式。

Kafka 主题

以逗号分隔的 Kafka 主题列表或正则表达式。

Kafka 组 ID

连接器使用的使用者组的 ID。可以是任意唯一 ID。

Kafka 自动偏移重置

如果未找到与 Kafka auto.offset.reset 属性对应的先前使用者偏移量,将应用自动偏移量配置。其中之一:earliest/latest。 默认值:latest

主题到表的映射

此可选参数允许用户指定哪些主题应映射到哪些表。每个主题及其表名称要用冒号隔开(请参阅下面的示例)。表名称必须是有效的 Snowflake 无引号标识符。正则表达式不能含糊不清,任何匹配的主题只能匹配单个目标表。如果为空或未找到匹配项,则主题名称将用作表名。注意:映射不能在逗号后面包含空格。

Topic To Table Map 示例值:

  • topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range

  • topic[0-4]:low_range,topic[5-9]:high_range

  • .*:destination_table - maps all topics to the destination_table

配置特定于变体的设置

配置常用参数后,需要配置特定于所选连接器变体的设置:

对于 适用于 JSON 数据格式的 Apache Kafka适用于 AVRO 数据格式的 Apache Kafka 连接器:

请参阅 适用于 JSON/AVRO 数据格式的 Apache Kafka,了解特定于 JSON/AVRO 的参数。

对于 Apache Kafka with DLQ and metadata 连接器:

请参阅 带有 DLQ 和元数据的 Apache Kafka,了解高级参数,其中包括 DLQ 配置、架构化设置、Iceberg 表支持和消息格式选项。

身份验证

所有连接器变体都支持通过参数上下文配置的 SASL 身份验证,如 Kafka 源参数(SASL 身份验证) 中所述。

有关其他身份验证方法(包括 mTLS 和 AWS MSK IAM),请参阅 为 Openflow Connector for Kafka 配置其他身份验证方法

运行流

  1. 右键点击平面,然后点击 Enable all Controller Services

  2. 右键点击平面,然后点击 Start。连接器开始数据引入。

语言: 中文