Snowflake High Performance connector for Kafka: Install and configure

This topic describes the steps to install and configure the Snowflake High Performance connector for Kafka.

安装 Kafka Connector

Kafka Connector 以 JAR(Java 可执行文件)文件的形式提供。

Snowflake 提供两个版本的连接器:

  • 适用于 Confluent Kafka 安装 (https://www.confluent.io/hub/snowflakeinc/snowflake-kafka-connector/) 的版本。

  • A version for the open source software (OSS) Apache Kafka https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/ (https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/) ecosystem.

本主题中的说明指明了哪些步骤仅适用于这两个版本中的特定版本。

安装先决条件

  • The Kafka connector supports the following package versions:

    Snowflake Kafka Connector Version

    包支持(由 Snowflake 测试)

    Apache Kafka

    2.0.0(或更高版本)

    Apache Kafka 2.8.2、3.7.2

    Confluent

    2.0.0(或更高版本)

    Confluent 6.2.15、7.8.2

  • Kafka Connector 专为与 Kafka Connect API 3.9.0 搭配使用而构建。Kafka Connect API 的更高版本未经测试。3.9.0 之前的版本与该连接器兼容。有关更多信息,请参阅 Kafka 兼容性 (https://kafka.apache.org/protocol.html#protocol_compatibility)。

  • 当您的环境中同时具有 Kafka 连接器和 JDBC 驱动程序 jar 文件时,请确保您的 JDBC 版本与您打算使用的 Kafka 连接器版本的 pom.xml 文件中指定的 snowflake-jdbc 版本相匹配。您可以转到您首选的 Kafka 连接器版本,例如 v4.0.0-rc4 (https://github.com/snowflakedb/snowflake-kafka-connector/releases/tag/v4.0.0-rc4)。然后浏览 pom.xml 文件以找出 snowflake-jdbc 的版本。

  • 如果您使用 Avro 格式摄入数据:

    • 使用 Avro 解析器版本 1.8.2(或更高版本),可从以下网址获取:https://mvnrepository.com/artifact/org.apache.avro (https://mvnrepository.com/artifact/org.apache.avro)。

    • 如果使用 Avro 的架构注册功能,请使用 Kafka Connect Avro Converter 版本 5.0.0(或更高版本),可从以下网址获取:https://mvnrepository.com/artifact/io.confluent (https://mvnrepository.com/artifact/io.confluent)。

      请注意,OSS Apache Kafka 包不提供架构注册功能。

  • 根据所需的数据保留时间和/或存储限制配置 Kafka。

  • Install and configure the Kafka Connect cluster.

    每个 Kafka Connect 集群节点都应为 Kafka Connector 提供足够的 RAM。建议每个 Kafka 分区至少使用 5 MB。这还不包括 Kafka Connect 执行其他工作所需的 RAM。

  • 我们建议在 Kafka Broker 和 Kafka Connect Runtime 上使用相同的版本。

  • 我们强烈建议在与 Snowflake 账户相同的云提供商 区域 运行 Kafka Connect 实例。这并非严格要求的,但通常会提高吞吐量。

有关 Snowflake 客户端支持的操作系统列表,请参阅 操作系统支持

Installing the connector

本节说明了如何安装和配置 Kafka Connector for Confluent。下表说明了受支持的版本以及有关预发布版和候选发布版的信息。

版本系列

状态

备注

4.x.x

公开预览版

抢先体验。 支持 Snowpipe Streaming High Performance Architecture https://docs.snowflake.cn/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview 目前从 3.x 和 2.x 版本的迁移必须手动完成。它不能作为早期版本的直接替代品。它具有与 3.x、2.x、1.x 版本不同的功能集

3.x.x

官方支持

不支持 Snowpipe Streaming High Performance Architecture https://docs.snowflake.cn/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview

2.xx

官方支持

建议升级。不支持 Snowpipe Streaming High Performance Architecture https://docs.snowflake.cn/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview

1.x.x

不支持

切勿使用此版本系列。

Installing the connector for Confluent

Download the Kafka connector files

从以下任一位置 Kafka Connector JAR 文件:

Confluent Hub:

https://www.confluent.io/hub/ (https://www.confluent.io/hub/)

该包包含使用加密或未加密私钥进行密钥对身份身份验证所需的所有依赖项。有关详细信息,请参阅本主题后面的 使用密钥对身份验证和密钥轮换

Maven Central Repository:

https://mvnrepository.com/artifact/com.snowflake (https://mvnrepository.com/artifact/com.snowflake)

使用此版本时,您需要下载 Bouncy Castle (https://www.bouncycastle.org/) 加密库(JAR 文件):

  • https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/2.1.0 (https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/2.1.0)

  • https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/2.1.8 (https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/2.1.8)

将这些文件下载到与 Kafka Connector JAR 文件相同的本地文件夹中。

连接器的源代码可从以下网址获取:https://github.com/snowflakedb/snowflake-kafka-connector (https://github.com/snowflakedb/snowflake-kafka-connector)。

Install the Kafka connector

使用安装其他连接器的说明安装 Kafka Connector:

https://docs.confluent.io/current/connect/userguide.html (https://docs.confluent.io/current/connect/userguide.html)

为开源 Apache Kafka 安装连接器

本节说明了如何为开源 Apache Kafka 安装和配置 Kafka Connector。

Install Apache Kafka

  1. 从 Kafka 官方网站 (https://kafka.apache.org/downloads) 下载 Kafka 软件包。

  2. 在终端窗口中,切换为下载包文件的目录。

  3. 执行以下命令解压:file:kafka_<scala_version>-<kafka_version>.tgz 文件:

    tar xzvf kafka_<scala_version>-<kafka_version>.tgz
    
    Copy

安装 JDK

安装并配置 Java 开发工具包 (JDK) 11 或更高版本。使用 SE 的 Standard Edition (JDK) 测试 Snowflake。Enterprise Edition (EE) 预计会兼容,但尚未进行测试。

如果您之前已安装 JDK,则可以跳过此部分。

  1. 从 Oracle JDK 网站 (https://www.oracle.com/technetwork/java/javase/downloads/index.html) 下载 JDK。

  2. 安装或解压 JDK。

  3. 按照适用于您操作系统的说明,设置环境变量 JAVA_HOME,使其指向包含 JDK 的目录。

Download the Kafka connector JAR files

  1. 从 Maven Central Repository 下载 Kafka Connector JAR 文件:

    https://mvnrepository.com/artifact/com.snowflake (https://mvnrepository.com/artifact/com.snowflake)

  2. 下载 Bouncy Castle (https://www.bouncycastle.org/) 加密库 jar 文件:

    • https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/2.1.0 (https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/2.1.0)

    • https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/2.1.8 (https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/2.1.8)

  3. 如果您的 Kafka 数据以 Apache Avro (https://avro.apache.org/) 格式流式传输,请下载 Avro JAR 文件 (1.11.4):

    • https://mvnrepository.com/artifact/org.apache.avro/avro (https://mvnrepository.com/artifact/org.apache.avro/avro)

连接器的源代码可从以下网址获取:https://github.com/snowflakedb/snowflake-kafka-connector (https://github.com/snowflakedb/snowflake-kafka-connector)。

Install the Kafka connector

将您在 为开源 Apache Kafka 安装连接器 中下载的 JAR 文件复制到 <kafka_dir>/libs 文件夹。

Configuring the Kafka connector

在独立模式下部署时,通过创建一个指定 Snowflake 登录凭据、主题名称、Snowflake 表名称等参数的文件来配置连接器。在分布式模式下部署时,通过调用 Kafka Connect 集群的 REST API 端点来配置连接器。

重要

The Kafka Connect framework broadcasts the configuration settings for the Kafka connector from the master node to worker nodes. Configuration settings include sensitive information, specifically, the Snowflake username and private key. Make sure to secure the communication channel between Kafka Connect nodes. For more information, see the documentation for your Apache Kafka software.

Each configuration specifies the topics and corresponding tables for one database and one schema in that database. Note that a connector can ingest messages from any number of topics, but the corresponding tables must all be stored in a single database and schema.

本部分提供了分布式模式和独立模式的说明。

有关配置字段的描述,请参阅 连接器配置属性

重要

由于配置文件通常包含安全相关信息(例如私钥),因此请适当设置文件的读/写权限,以限制访问。

In addition, consider storing the configuration file in a secure external location or a key management service. For more information, see Externalizing Secrets (in this topic).

分布式模式

创建 Kafka 配置文件,例如:file:<path>/<config_file>.json。使用所有连接器配置信息填充该文件。该文件必须为 JSON 格式。

Sample configuration file

{
  "name":"XYZCompanySensorData",
  "config":{
      "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
      "tasks.max": "1",
      "snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
      "snowflake.url.name": "myorganization-myaccount.snowflakecomputing.cn:443",
      "snowflake.warehouse.name": "WH",
      "snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
      "snowflake.schema.name": "MY_SCHEMA",
      "snowflake.database.name": "MY_DATABASE",
      "snowflake.role.name": "MY_ROLE",
      "snowflake.user.name": "MY_USER",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "errors.log.enable": "true",
      "topics": "topic1,topic2",
      "value.converter.schemas.enable": "false",
      "errors.tolerance": "none"
      }
}
Copy

独立模式

创建一个配置文件,例如:file:<kafka_dir>/config/SF_connect.properties。使用所有连接器配置信息填充该文件。

Sample configuration file

connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
tasks.max=1
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.cn:443
snowflake.warehouse.name=WH
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=none
Copy

测试和原型设计的缓存注意事项

连接器会缓存表和管道查找检查,以提高分区重新平衡期间的性能。然而,在测试和原型设计期间,这种缓存行为可能导致连接器无法立即检测到手动创建的表或管道。

问题:当您在连接器运行时手动创建表或管道时,连接器默认可能会在长达 5 分钟的时间内继续使用缓存的存在性检查结果(这可能表明对象不存在)。这可能导致测试期间出现意外错误或行为。

测试建议: 为避免在测试和原型设计期间出现与缓存相关的问题,请将两个缓存有效期参数配置为最小值 1 毫秒,或禁用缓存:

snowflake.cache.table.exists.expire.ms=1
snowflake.cache.pipe.exists.expire.ms=1
Copy

此配置可确保连接器在每次分区重新平衡时都执行全新的存在性检查,从而允许您立即看到手动创建的表和管道的效果。

重要

这些最小缓存设置建议 仅用于测试和原型设计。在生产环境中,请使用默认的缓存有效值(5 分钟或更长),以最大程度地减少对 Snowflake 的元数据查询并提高重新平衡性能,尤其是在处理大量分区时。

连接器配置属性

所需属性

name

应用程序名称。该名称在客户使用的所有 Kafka Connector 中必须是唯一的。此名称必须是有效的 Snowflake 无引号标识符。有关有效标识符的信息,请参阅 标识符要求

connector.class

com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

topics

以逗号分隔的主题列表。默认情况下,Snowflake 会假定表名称与主题名称相同。如果表名称与主题名称不一致,则使用可选的 topic2table.map 参数(如下)指定从主题名称到表名称的映射。表名称必须是有效的 Snowflake 无引号标识符。有关有效表名称的信息,请参阅 标识符要求

备注

需要使用 topicstopics.regex 中的 任一项 ;而不是两者。

topics.regex

这是一个正则表达式(“regex”),用于指定包含要加载到 Snowflake 表中的消息的主题。连接器会从与 regex 匹配的任何主题名称中加载数据。regex 必须遵循 Java 正则表达式的规则(即与 java.util.regex.Pattern 兼容)。配置文件应包含 topicstopics.regex 中的 任一项,而不是两者。

snowflake.url.name

URL 用于访问您的 Snowflake 账户。URL 必须包括您的 账户标识符。请注意,协议 (https://) 和端口号是可选的。

snowflake.user.name

Snowflake 账户的用户登录名。

snowflake.role.name

连接器将用于向表中插入数据的角色名称。

snowflake.private.key

用于验证用户身份的私钥。仅包含密钥,不包含标头或页脚。如果密钥分为多行,请删除换行符。您可以提供未加密的密钥,也可以提供加密的密钥并提供 snowflake.private.key.passphrase 参数,允许 Snowflake 解密密钥。当且仅当 snowflake.private.key 参数值已加密时才使用此参数。这将解密根据 密钥对身份验证和密钥对轮换 中的说明进行加密的私钥。

备注

另请参阅 可选属性 中的 snowflake.private.key.passphrase

snowflake.database.name

包含要插入行的表的数据库名称。

snowflake.schema.name

包含要插入行的表的架构名称。

header.converter

仅当记录采用 Avro 格式且包含标头时才需要。其值为 "org.apache.kafka.connect.storage.StringConverter"

key.converter

这是 Kafka 记录的密钥转换器(例如 "org.apache.kafka.connect.storage.StringConverter")。Kafka Connector 不使用这个转换器,但 Kafka Connect 平台需要它。

有关当前限制的信息,请参阅 Kafka Connector 限制

value.converter

该连接器支持标准的 Kafka 社区转换器。根据您的数据格式选择合适的转换器:

  • 对于 JSON 记录:"org.apache.kafka.connect.json.JsonConverter"

  • 对于具有架构注册表的 Avro 记录:"io.confluent.connect.avro.AvroConverter"

有关当前限制的信息,请参阅 Kafka Connector 限制

可选属性

snowflake.private.key.passphrase

如果此参数的值不为空,那么该连接器会使用此短语来尝试解密私钥。

tasks.max

任务数,通常与Kafka Connect集群中工作节点的 CPU 内核数相同。为了实现最佳性能,Snowflake 建议将任务数量设置为等于 Kafka 分区的总数,但不超过 CPU 个核心的数量。任务数量高可能会导致内存使用增加和频繁的重新平衡。

snowflake.topic2table.map

此可选参数允许用户指定哪些主题应映射到哪些表。每个主题及其表名称要用冒号隔开(请参阅下面的示例)。表名称必须是有效的 Snowflake 无引号标识符。有关有效表名称的信息,请参阅 标识符要求。主题配置允许使用正则表达式定义主题,就像使用 topics.regex 一样。正则表达式不能含糊不清,任何匹配的主题只能匹配单个目标表。

示例:

topics="topic1,topic2,topic5,topic6"
snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
Copy

可以写成:

topics.regex="topic[0-9]"
snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
Copy
value.converter.schema.registry.url

如果采用 Avro 格式并且您正在使用架构注册表服务,则此值应为架构注册服务的 URL。否则该字段应为空。

value.converter.break.on.schema.registry.error

如果从架构注册表服务加载 Avro 数据,此属性确定 Kafka Connector 在获取架构 ID 时遇到错误时是否应停止使用记录。默认值为 false。将值设为 true 可启用此行为。

jvm.proxy.host

要使 Snowflake Kafka Connector 能够通过代理服务器访问 Snowflake,请设置此参数以指定代理服务器的主机。

jvm.proxy.port

要使 Snowflake Kafka Connector 能够通过代理服务器访问 Snowflake,请设置此参数以指定代理服务器的端口。

snowflake.streaming.max.client.lag

Specifies how often the connector flushes the data to Snowflake, in seconds.

:
  • 最小值:1

  • 最大值:600

默认值:

1

jvm.proxy.username

通过代理服务器进行身份验证的用户名。

jvm.proxy.password

通过代理服务器进行身份验证的用户名的密码。

snowflake.jdbc.map

示例:"snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"

不会验证附加的 JDBC 属性(请参阅 JDBC 驱动程序连接参数参考)。这些附加属性不会验证,且不得覆盖或替换必需的属性,例如:jvm.proxy.xxxsnowflake.user.namesnowflake.private.keysnowflake.schema.name 等。

指定以下任一组合:
  • tracing 属性以及 JDBC_TRACE 环境变量

  • database 属性以及 snowflake.database.name

将导致模糊行为,并且行为将由 JDBC 驱动程序确定。

value.converter.basic.auth.credentials.source

如果使用 Avro 数据格式并需要安全访问 Kafka架构注册表,请将此参数设置为字符串“USER_INFO”,并设置如下所述的 value.converter.basic.auth.user.info 参数。否则,省略该参数。

value.converter.basic.auth.user.info

如果使用 Avro 数据格式并需要安全访问 Kafka架构注册表,请将此参数设置为字符串“<user_ID>:<password>”,并设置如上所述的 value.converter.basic.auth.credentials.source 参数。否则,省略该参数。

snowflake.metadata.createtime

如果值设置为 FALSE,从 RECORD_METADATA 列的元数据中将省略 CreateTime 属性值。默认值为 TRUE。

snowflake.metadata.topic

如果值设置为 FALSE,从 RECORD_METADATA 列的元数据中将省略 topic 属性值。默认值为 TRUE。

snowflake.metadata.offset.and.partition

如果值设置为 FALSE,从 RECORD_METADATA 列的元数据中将省略 OffsetPartition 属性值。默认值为 TRUE。

snowflake.metadata.all

如果值设置为 FALSE, RECORD_METADATA 列中的元数据将完全为空。默认值为 TRUE。

transforms

指定跳过 Kafka Connector 遇到的逻辑删除记录,不将它们加载到目标表中。逻辑删除记录定义为整个值字段为 null 的记录。

将属性值设置为 "tombstoneHandlerExample"

备注

此属性仅用于 Kafka 社区转换器(即 value.converter 属性值)(例如 org.apache.kafka.connect.json.JsonConverterorg.apache.kafka.connect.json.AvroConverter)。要使用 Snowflake 转换器管理逻辑删除记录处理,请改为使用 behavior.on.null.values 属性。

transforms.tombstoneHandlerExample.type

设置 transforms 属性时需要。

将属性值设置为 "io.confluent.connect.transforms.TombstoneHandler"

behavior.on.null.values

指定 Kafka Connector 应如何处理逻辑删除记录。逻辑删除记录定义为整个值字段为 null 的记录。 对于 Snowpipe,Kafka Connector 版本 1.5.5 及更高版本支持此属性。对于 Snowpipe Streaming,Kafka Connector 版本 2.1.0 及更高版本支持此属性。

此属性支持以下值:

DEFAULT

当 Kafka Connector 遇到逻辑删除记录时,它会在内容列中插入一个空的 JSON 的字符串。

IGNORE

Kafka Connector 会跳过逻辑删除记录,并且不会为这些记录插入行。

默认值为 DEFAULT

备注

逻辑删除记录的引入因引入方法而异:

  • 对于 Snowpipe,Kafka Connector 仅使用 Snowflake 转换器。要使用 Kafka 社区转换器管理逻辑删除记录处理,请改为使用 transformtransforms.tombstoneHandlerExample.type 属性。

  • 对于 Snowpipe Streaming,Kafka Connector 仅使用社区转换器。

发送到 Kafka Broker 的记录不得为 NULL,因为这些记录会被 Kafka Connector 删除,从而导致偏移丢失。在特定用例中,缺失的偏移会破坏 Kafka Connector。建议您使用逻辑删除记录而不是 NULL 记录。

使用密钥对身份验证和密钥轮换

Kafka 连接器依赖于密钥对身份验证,而不是用户名和密码身份验证。RSA 使用通过 加密的公钥/私钥对签名。使用 OpenSSL 生成公钥-私钥对。公钥分配给配置文件中定义的 Snowflake 用户。

在完成此页面上的密钥对身份验证任务以及 密钥对轮换 任务后,评估本主题后面关于 外部化密钥 的建议。

要配置公钥/私钥对,请执行以下操作:

  1. 在终端窗口中的命令行生成私钥。

    您可以生成加密版或未加密版的私钥。

    备注

    Kafka Connector 支持经过验证且符合联邦信息处理标准 (140-2)(即 FIPS 140-2)的加密算法要求。有关更多信息,请参阅 FIPS 140-2 (https://csrc.nist.gov/publications/detail/fips/140/2/final)。

    要生成未加密的版本,请使用以下命令:

    $ openssl genrsa -out rsa_key.pem 2048
    
    Copy

    要生成加密的版本,请使用以下命令:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
    
    Copy

    其中 <algorithm> 是符合 FIPS 140-2 标准的加密算法。

    例如,指定 AES 256 作为加密算法:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
    
    Copy

    如果生成加密版的私钥,请记录加密短语。稍后,您将在 Kafka 配置文件 snowflake.private.key.passphrase 属性中指定该加密短语。

    示例 PEM 私钥

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
    Copy
  2. 从命令行中,通过引用私钥生成公钥:

    假设私钥已加密并包含在名为 rsa_key.p8 的文件中,使用以下命令:

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    
    Copy

    示例 PEM 公钥

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
    Copy
  3. 将公钥和私钥文件复制到本地目录进行存储。记下文件的路径。私钥使用 PKCS#8(公钥加密标准)格式存储,并使用您在上一步中指定的加密短语进行加密;但仍应使用操作系统提供的文件权限机制保护文件,防止未经授权的访问。不使用文件时,用户有责任保护该文件的安全。

  4. 登录 Snowflake。使用 ALTER USER 将公钥分配给 Snowflake 用户。

    For example:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    
    Copy

    备注

    • 只有安全管理员(即具有 SECURITYADMIN 角色的用户)或更高级别的用户才能更改用户。

    • 在 SQL 语句中排除公钥标头和页脚。

    使用 DESCRIBE USER 验证用户的公钥指纹:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    
    Copy

    备注

    RSA_PUBLIC_KEY_2_FP 属性在 配置密钥对轮换 中作了说明。

  5. 将整个私钥复制并粘贴到配置文件中的 snowflake.private.key 字段。保存文件。

外部化密钥

Snowflake 强烈建议将私钥等密钥外部化,并以加密形式存储或存储在密钥管理服务中,例如 AWS Key Management Service (KMS)、Microsoft Azure Key Vault,或 HashiCorp Vault。这可以通过在Kafka Connect集群上使用 ConfigProvider 来完成。

有关更多信息,请参阅 Confluent 对此 服务 (https://docs.confluent.io/current/connect/security.html#externalizing-secrets) 的描述。

Starting the connector

使用第三方 Confluent 或 Apache Kafka 文档中提供的说明启动 Kafka。您可以在分布式模式或独立模式下启动 Kafka Connector。每种模式的说明如下所示:

分布式模式

在终端窗口中,执行以下命令:

curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors
Copy

独立模式

在终端窗口中,执行以下命令:

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties
Copy

备注

Apache Kafka 或 Confluent Kafka 的默认安装应该已经包含了 connect-standalone.properties 文件。

后续步骤

测试连接器

语言: 中文