使用 Snowflake Connector for Kafka 加载 Protobuf 数据¶
本主题提供有关在 Snowflake Connector for Kafka(“Kafka Connector”)中安装和配置协议缓冲区 (protobuf) 支持的说明。支持 protobuf 需要 Kafka Connector 1.5.0(或更高版本)。
Kafka Connector 支持以下版本的 protobuf 转换器:
- Confluent 版本:
此版本仅受 Kafka 的 Confluent 包版本支持。
- 社区版:
此版本受开源软件 (OSS) Apache Kafka 包支持。Kafka 的 Confluent 包版本也支持此版本;但是,为了便于使用,我们建议改用 Confluent 版本。
只能安装其中一个 protobuf 转换器。
本主题内容:
先决条件:安装 Snowflake Connector for Kafka¶
按照:doc:/user-guide/kafka-connector-install 中的说明安装 Kafka Connector。
配置 Protobuf 转换器的 Confluent 版本¶
备注
Protobuf 转换器的 Confluent 版本随 Confluent 版本 5.5.0(或更高版本)提供。
在文本编辑器中打开 Kafka 配置文件(例如:file:<kafka_dir>/config/connect-distributed.properties)。
在该文件中配置转换器属性。有关 Kafka Connector 属性的一般信息,请参阅 Kafka 配置属性。
{ "name":"XYZCompanySensorData", "config":{ .. "key.converter":"io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url":"http://localhost:8081" } }
例如:
{ "name":"XYZCompanySensorData", "config":{ "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector", "tasks.max":"8", "topics":"topic1,topic2", "snowflake.topic2table.map": "topic1:table1,topic2:table2", "buffer.count.records":"10000", "buffer.flush.time":"60", "buffer.size.bytes":"5000000", "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.cn:443", "snowflake.user.name":"jane.smith", "snowflake.private.key":"xyz123", "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r", "snowflake.database.name":"mydb", "snowflake.schema.name":"myschema", "key.converter":"io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url":"http://localhost:8081" } }
保存文件。
使用 Confluent 控制台 protobuf 生成器、源 protobuf 生成器或 Python 生成器通过 Kafka 生成 protobuf 数据。
GitHub (https://github.com/snowflakedb/snowflake-kafka-connector/blob/3bb3e0491d932cdbc58fba3efc0f5c71fa341429/test/test_suit/test_confluent_protobuf_protobuf.py) 中的示例 Python 代码演示了如何通过 Kafka 生成 protobuf 数据。
配置 Protobuf 转换器的社区版本¶
本节提供有关安装和配置 protobuf 转换器社区版本的说明。
第 1 步:安装 Protobuf 转换器社区版本¶
在终端窗口中,切换到要存储 protobuf 转换器存储库 GitHub 的克隆的目录。
执行以下命令以克隆 GitHub 存储库 (https://github.com/blueapron/kafka-connect-protobuf-converter):
git clone https://github.com/blueapron/kafka-connect-protobuf-converter
执行以下命令以使用 Apache Maven (https://maven.apache.org/) 构建转换器的 3.1.0 版本。请注意,Kafka Connector 支持转换器的 2.3.0、3.0.0 和 3.1.0 版本:
备注
Maven 必须已安装在本地计算机上。
cd kafka-connect-protobuf-converter git checkout tags/v3.1.0 mvn clean package
Maven 会在当前文件夹中构建一个名为
kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jar
的文件。这是转换器 JAR 文件。将编译后的
kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jar
文件复制到 Kafka 包版本的目录:- Confluent:
<confluenct_dir>/share/java/kafka-serde-tools
- Apache Kafka:
<apache_kafka_dir>/libs
第 2 步:编译 .proto 文件¶
将定义消息的 protobuf .proto
文件编译为 java
文件。
例如,假设消息在名为 sensor.proto
的文件中定义。在终端窗口中,执行以下命令以编译协议缓冲区文件。指定应用程序源代码的源目录、目标目录(适用于 .java
文件)和 .proto
文件的路径:
protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/sensor.proto
以下网址提供了示例 .proto
文件:https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto (https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto)。
该命令会在指定目标目录中生成一个名为 SensorReadingImpl.java
的文件。
有关更多信息,请参阅 Google 开发者文档 (https://developers.google.com/protocol-buffers/docs/javatutorial)
第 3 步:编译 SensorReadingImpl.Java 文件¶
编译 第 2 步:编译 .proto 文件 中生成的 SensorReadingImpl.java
文件以及 protobuf 项目结构的项目对象模型。
在文本编辑器中打开 第 2 步:编译 .proto 文件 中的
.pom
文件。创建一个采用以下结构的空目录:
protobuf_folder ├── pom.xml └── src └── main └── java └── com └── ..
其中
src
/main
/java
下的目录结会构镜像.proto
文件(第 3 行)中的包名称。将 第 2 步:编译 .proto 文件 中生成的
SensorReadingImpl.java
文件复制到目录结构的底部文件夹中。在
protobuf_folder
目录的根目录中创建一个名为pom.xml
的文件。在文本编辑器中打开空白
pom.xml
文件。将以下示例项目模型复制到该文件中并对其进行修改:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId><group_id></groupId> <artifactId><artifact_id></artifactId> <version><version></version> <properties> <java.version><java_version></java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
其中:
<group_id>
对
.proto
文件中指定的包名称的 ID 段进行分组。例如,如果包名称为com.foo.bar.buz
,则组 ID 为com.foo
。<artifact_id>
您选择的包的构件 ID。构件 ID 可以随机选择。
<version>
您选择的包的版本。版本可以随机选择。
<java_version>
本地计算机上安装的 Java 运行时环境 (JRE) 的版本。
例如:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.snowflake</groupId> <artifactId>kafka-test-protobuf</artifactId> <version>1.0.0</version> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
在终端窗口中,切换到目录的
protobuf_folder
根目录。执行以下命令,从目录中的文件编译 protobuf 数据 JAR 文件:mvn clean package
Maven 会在
protobuf_folder/target
文件夹中生成一个名为<artifact_id>-<version>-jar-with-dependencies.jar
的文件(例如kafka-test-protobuf-1.0.0-jar-with-dependencies.jar
)。将编译的
kafka-test-protobuf-1.0.0-jar-with-dependencies.jar
文件复制到 Kafka 包版本的目录:- Confluent:
<confluenct_dir>/share/java/kafka-serde-tools
- Apache Kafka:
将文件复制到环境变量中的
$CLASSPATH
目录。
第 4 步:配置 Kafka Connector¶
在文本编辑器中打开 Kafka 配置文件(例如:file:<kafka_dir>/config/connect-distributed.properties)。
将
value.converter.protoClassName
属性添加到文件中。此属性会指定用于反序列化消息的协议缓冲区类(例如com.google.protobuf.Int32Value
)。备注
嵌套类必须使用
$
表示法(例如com.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto
)指定。例如:
{ "name":"XYZCompanySensorData", "config":{ .. "value.converter.protoClassName":"com.snowflake.kafka.test.protobuf.SensorReadingImpl$SensorReading" } }
有关 Kafka Connector 属性的一般信息,请参阅 Kafka 配置属性。
有关协议缓冲区类的更多信息,请参阅本主题前面引用的 Google 开发者文档。
保存文件。