使用 Snowflake Connector for Kafka 加载 Protobuf 数据

本主题提供有关在 Snowflake Connector for Kafka(“Kafka Connector”)中安装和配置协议缓冲区 (protobuf) 支持的说明。支持 protobuf 需要 Kafka Connector 1.5.0(或更高版本)。

Kafka Connector 支持以下版本的 protobuf 转换器:

Confluent 版本:

此版本仅受 Kafka 的 Confluent 包版本支持。

社区版:

此版本受开源软件 (OSS) Apache Kafka 包支持。Kafka 的 Confluent 包版本也支持此版本;但是,为了便于使用,我们建议改用 Confluent 版本。

只能安装其中一个 protobuf 转换器。

本主题内容:

先决条件:安装 Snowflake Connector for Kafka

按照:doc:/user-guide/kafka-connector-install 中的说明安装 Kafka Connector。

配置 Protobuf 转换器的 Confluent 版本

备注

Protobuf 转换器的 Confluent 版本随 Confluent 版本 5.5.0(或更高版本)提供。

  1. 在文本编辑器中打开 Kafka 配置文件(例如:file:<kafka_dir>/config/connect-distributed.properties)。

  2. 在该文件中配置转换器属性。有关 Kafka Connector 属性的一般信息,请参阅 Kafka 配置属性

    {
     "name":"XYZCompanySensorData",
       "config":{
         ..
         "key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
         "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
         "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
         "value.converter.schema.registry.url":"http://localhost:8081"
       }
     }
    
    Copy

    例如:

    {
      "name":"XYZCompanySensorData",
      "config":{
        "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
        "tasks.max":"8",
        "topics":"topic1,topic2",
        "snowflake.topic2table.map": "topic1:table1,topic2:table2",
        "buffer.count.records":"10000",
        "buffer.flush.time":"60",
        "buffer.size.bytes":"5000000",
        "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.cn:443",
        "snowflake.user.name":"jane.smith",
        "snowflake.private.key":"xyz123",
        "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
        "snowflake.database.name":"mydb",
        "snowflake.schema.name":"myschema",
        "key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
        "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
        "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url":"http://localhost:8081"
      }
    }
    
    Copy
  3. 保存文件。

使用 Confluent 控制台 protobuf 生成器、源 protobuf 生成器或 Python 生成器通过 Kafka 生成 protobuf 数据。

GitHub (https://github.com/snowflakedb/snowflake-kafka-connector/blob/3bb3e0491d932cdbc58fba3efc0f5c71fa341429/test/test_suit/test_confluent_protobuf_protobuf.py) 中的示例 Python 代码演示了如何通过 Kafka 生成 protobuf 数据。

配置 Protobuf 转换器的社区版本

本节提供有关安装和配置 protobuf 转换器社区版本的说明。

第 1 步:安装 Protobuf 转换器社区版本

  1. 在终端窗口中,切换到要存储 protobuf 转换器存储库 GitHub 的克隆的目录。

  2. 执行以下命令以克隆 GitHub 存储库 (https://github.com/blueapron/kafka-connect-protobuf-converter):

    git clone https://github.com/blueapron/kafka-connect-protobuf-converter
    
    Copy
  3. 执行以下命令以使用 Apache Maven (https://maven.apache.org/) 构建转换器的 3.1.0 版本。请注意,Kafka Connector 支持转换器的 2.3.0、3.0.0 和 3.1.0 版本:

    备注

    Maven 必须已安装在本地计算机上。

    cd kafka-connect-protobuf-converter
    
    git checkout tags/v3.1.0
    
    mvn clean package
    
    Copy

    Maven 会在当前文件夹中构建一个名为 kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jar 的文件。这是转换器 JAR 文件。

  4. 将编译后的 kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jar 文件复制到 Kafka 包版本的目录:

    Confluent:

    <confluenct_dir>/share/java/kafka-serde-tools

    Apache Kafka:

    <apache_kafka_dir>/libs

第 2 步:编译 .proto 文件

将定义消息的 protobuf .proto 文件编译为 java 文件。

例如,假设消息在名为 sensor.proto 的文件中定义。在终端窗口中,执行以下命令以编译协议缓冲区文件。指定应用程序源代码的源目录、目标目录(适用于 .java 文件)和 .proto 文件的路径:

protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/sensor.proto
Copy

以下网址提供了示例 .proto 文件:https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto (https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto)。

该命令会在指定目标目录中生成一个名为 SensorReadingImpl.java 的文件。

有关更多信息,请参阅 Google 开发者文档 (https://developers.google.com/protocol-buffers/docs/javatutorial)

第 3 步:编译 SensorReadingImpl.Java 文件

编译 第 2 步:编译 .proto 文件 中生成的 SensorReadingImpl.java 文件以及 protobuf 项目结构的项目对象模型。

  1. 在文本编辑器中打开 第 2 步:编译 .proto 文件 中的 .pom 文件。

  2. 创建一个采用以下结构的空目录:

    protobuf_folder
    ├── pom.xml
    └── src
        └── main
            └── java
                └── com
                    └── ..
    
    Copy

    其中 src / main / java 下的目录结会构镜像 .proto 文件(第 3 行)中的包名称。

  3. 第 2 步:编译 .proto 文件 中生成的 SensorReadingImpl.java 文件复制到目录结构的底部文件夹中。

  4. protobuf_folder 目录的根目录中创建一个名为 pom.xml 的文件。

  5. 在文本编辑器中打开空白 pom.xml 文件。将以下示例项目模型复制到该文件中并对其进行修改:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId><group_id></groupId>
       <artifactId><artifact_id></artifactId>
       <version><version></version>
    
       <properties>
           <java.version><java_version></java.version>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       </properties>
    
       <dependencies>
           <dependency>
               <groupId>com.google.protobuf</groupId>
               <artifactId>protobuf-java</artifactId>
               <version>3.11.1</version>
           </dependency>
       </dependencies>
    
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <version>3.3</version>
                   <configuration>
                       <source>${java.version}</source>
                       <target>${java.version}</target>
                   </configuration>
               </plugin>
               <plugin>
                   <artifactId>maven-assembly-plugin</artifactId>
                   <version>3.1.0</version>
                   <configuration>
                       <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                       </descriptorRefs>
                   </configuration>
                   <executions>
                       <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                               <goal>single</goal>
                           </goals>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
    </project>
    
    Copy

    其中:

    <group_id>

    .proto 文件中指定的包名称的 ID 段进行分组。例如,如果包名称为 com.foo.bar.buz,则组 ID 为 com.foo

    <artifact_id>

    您选择的包的构件 ID。构件 ID 可以随机选择。

    <version>

    您选择的包的版本。版本可以随机选择。

    <java_version>

    本地计算机上安装的 Java 运行时环境 (JRE) 的版本。

    例如:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId>com.snowflake</groupId>
       <artifactId>kafka-test-protobuf</artifactId>
       <version>1.0.0</version>
    
       <properties>
           <java.version>1.8</java.version>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       </properties>
    
       <dependencies>
           <dependency>
               <groupId>com.google.protobuf</groupId>
               <artifactId>protobuf-java</artifactId>
               <version>3.11.1</version>
           </dependency>
       </dependencies>
    
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <version>3.3</version>
                   <configuration>
                       <source>${java.version}</source>
                       <target>${java.version}</target>
                   </configuration>
               </plugin>
               <plugin>
                   <artifactId>maven-assembly-plugin</artifactId>
                   <version>3.1.0</version>
                   <configuration>
                       <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                       </descriptorRefs>
                   </configuration>
                   <executions>
                       <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                               <goal>single</goal>
                           </goals>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
    </project>
    
    Copy
  6. 在终端窗口中,切换到目录的 protobuf_folder 根目录。执行以下命令,从目录中的文件编译 protobuf 数据 JAR 文件:

    mvn clean package
    
    Copy

    Maven 会在 protobuf_folder/target 文件夹中生成一个名为 <artifact_id>-<version>-jar-with-dependencies.jar 的文件(例如 kafka-test-protobuf-1.0.0-jar-with-dependencies.jar)。

  7. 将编译的 kafka-test-protobuf-1.0.0-jar-with-dependencies.jar 文件复制到 Kafka 包版本的目录:

    Confluent:

    <confluenct_dir>/share/java/kafka-serde-tools

    Apache Kafka:

    将文件复制到环境变量中的 $CLASSPATH 目录。

第 4 步:配置 Kafka Connector

  1. 在文本编辑器中打开 Kafka 配置文件(例如:file:<kafka_dir>/config/connect-distributed.properties)。

  2. value.converter.protoClassName 属性添加到文件中。此属性会指定用于反序列化消息的协议缓冲区类(例如 com.google.protobuf.Int32Value)。

    备注

    嵌套类必须使用 $ 表示法(例如 com.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto)指定。

    例如:

    {
     "name":"XYZCompanySensorData",
       "config":{
         ..
         "value.converter.protoClassName":"com.snowflake.kafka.test.protobuf.SensorReadingImpl$SensorReading"
       }
     }
    
    Copy

    有关 Kafka Connector 属性的一般信息,请参阅 Kafka 配置属性

    有关协议缓冲区类的更多信息,请参阅本主题前面引用的 Google 开发者文档。

  3. 保存文件。

语言: 中文