使用 Snowflake Connector for Kafka 加载 Protobuf 数据¶
本主题提供有关在 Snowflake Connector for Kafka(“Kafka Connector”)中安装和配置协议缓冲区 (protobuf) 支持的说明。支持 protobuf 需要 Kafka Connector 1.5.0(或更高版本)。
Kafka Connector 支持以下版本的 protobuf 转换器:
- Confluent 版本:
此版本仅受 Kafka 的 Confluent 包版本支持。
- 社区版:
此版本受开源软件 (OSS) Apache Kafka 包支持。Kafka 的 Confluent 包版本也支持此版本;但是,为了便于使用,我们建议改用 Confluent 版本。
只能安装其中一个 protobuf 转换器。
先决条件:安装 Snowflake Connector for Kafka¶
按照:doc:/user-guide/kafka-connector-install 中的说明安装 Kafka Connector。
配置 Protobuf 转换器的 Confluent 版本¶
备注
Protobuf 转换器的 Confluent 版本随 Confluent 版本 5.5.0(或更高版本)提供。
在文本编辑器中打开 Kafka 配置文件(例如:file:
<kafka_dir>/config/connect-distributed.properties)。在该文件中配置转换器属性。有关 Kafka Connector 属性的一般信息,请参阅 Kafka 配置属性。
例如:
保存文件。
使用 Confluent 控制台 protobuf 生成器、源 protobuf 生成器或 Python 生成器通过 Kafka 生成 protobuf 数据。
GitHub (https://github.com/snowflakedb/snowflake-kafka-connector/blob/3bb3e0491d932cdbc58fba3efc0f5c71fa341429/test/test_suit/test_confluent_protobuf_protobuf.py) 中的示例 Python 代码演示了如何通过 Kafka 生成 protobuf 数据。
配置 Protobuf 转换器的社区版本¶
本节提供有关安装和配置 protobuf 转换器社区版本的说明。
第 1 步:安装 Protobuf 转换器社区版本¶
在终端窗口中,切换到要存储 protobuf 转换器存储库 GitHub 的克隆的目录。
执行以下命令以克隆 GitHub 存储库 (https://github.com/blueapron/kafka-connect-protobuf-converter):
执行以下命令以使用 Apache Maven (https://maven.apache.org/) 构建转换器的 3.1.0 版本。请注意,Kafka Connector 支持转换器的 2.3.0、3.0.0 和 3.1.0 版本:
备注
Maven 必须已安装在本地计算机上。
Maven 会在当前文件夹中构建一个名为
kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jar的文件。这是转换器 JAR 文件。将编译后的
kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jar文件复制到 Kafka 包版本的目录:- Confluent:
<confluenct_dir>/share/java/kafka-serde-tools- Apache Kafka:
<apache_kafka_dir>/libs
第 2 步:编译 .proto 文件¶
将定义消息的 protobuf .proto 文件编译为 java 文件。
例如,假设消息在名为 sensor.proto 的文件中定义。在终端窗口中,执行以下命令以编译协议缓冲区文件。指定应用程序源代码的源目录、目标目录(适用于 .java 文件)和 .proto 文件的路径:
以下网址提供了示例 .proto 文件:https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto (https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto)。
该命令会在指定目标目录中生成一个名为 SensorReadingImpl.java 的文件。
有关更多信息,请参阅 Google 开发者文档 (https://developers.google.com/protocol-buffers/docs/javatutorial)
第 3 步:编译 SensorReadingImpl.Java 文件¶
编译 第 2 步:编译 .proto 文件 中生成的 SensorReadingImpl.java 文件以及 protobuf 项目结构的项目对象模型。
在文本编辑器中打开 第 2 步:编译 .proto 文件 中的
.pom文件。创建一个采用以下结构的空目录:
其中
src/main/java下的目录结会构镜像.proto文件(第 3 行)中的包名称。将 第 2 步:编译 .proto 文件 中生成的
SensorReadingImpl.java文件复制到目录结构的底部文件夹中。在
protobuf_folder目录的根目录中创建一个名为pom.xml的文件。在文本编辑器中打开空白
pom.xml文件。将以下示例项目模型复制到该文件中并对其进行修改:其中:
<group_id>对
.proto文件中指定的包名称的 ID 段进行分组。例如,如果包名称为com.foo.bar.buz,则组 ID 为com.foo。<artifact_id>您选择的包的构件 ID。构件 ID 可以随机选择。
<version>您选择的包的版本。版本可以随机选择。
<java_version>本地计算机上安装的 Java 运行时环境 (JRE) 的版本。
例如:
在终端窗口中,切换到目录的
protobuf_folder根目录。执行以下命令,从目录中的文件编译 protobuf 数据 JAR 文件:Maven 会在
protobuf_folder/target文件夹中生成一个名为<artifact_id>-<version>-jar-with-dependencies.jar的文件(例如kafka-test-protobuf-1.0.0-jar-with-dependencies.jar)。将编译的
kafka-test-protobuf-1.0.0-jar-with-dependencies.jar文件复制到 Kafka 包版本的目录:- Confluent:
<confluenct_dir>/share/java/kafka-serde-tools- Apache Kafka:
将文件复制到环境变量中的
$CLASSPATH目录。
第 4 步:配置 Kafka Connector¶
在文本编辑器中打开 Kafka 配置文件(例如:file:
<kafka_dir>/config/connect-distributed.properties)。将
value.converter.protoClassName属性添加到文件中。此属性会指定用于反序列化消息的协议缓冲区类(例如com.google.protobuf.Int32Value)。备注
嵌套类必须使用
$表示法(例如com.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto)指定。例如:
有关 Kafka Connector 属性的一般信息,请参阅 Kafka 配置属性。
有关协议缓冲区类的更多信息,请参阅本主题前面引用的 Google 开发者文档。
保存文件。