Snowflake High Performance connector for Kafka: Install and configure¶
This topic describes the steps to install and configure the Snowflake High Performance connector for Kafka.
安装 Kafka Connector¶
Kafka Connector 以 JAR(Java 可执行文件)文件的形式提供。
Snowflake 提供两个版本的连接器:
适用于 Confluent Kafka 安装 (https://www.confluent.io/hub/snowflakeinc/snowflake-kafka-connector/) 的版本。
A version for the open source software (OSS) Apache Kafka https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/ (https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/) ecosystem.
本主题中的说明指明了哪些步骤仅适用于这两个版本中的特定版本。
安装先决条件¶
The Kafka connector supports the following package versions:
包
Snowflake Kafka Connector Version
包支持(由 Snowflake 测试)
Apache Kafka
2.0.0(或更高版本)
Apache Kafka 2.8.2、3.7.2
Confluent
2.0.0(或更高版本)
Confluent 6.2.15、7.8.2
Kafka Connector 专为与 Kafka Connect API 3.9.0 搭配使用而构建。Kafka Connect API 的更高版本未经测试。3.9.0 之前的版本与该连接器兼容。有关更多信息,请参阅 Kafka 兼容性 (https://kafka.apache.org/protocol.html#protocol_compatibility)。
当您的环境中同时具有 Kafka 连接器和 JDBC 驱动程序 jar 文件时,请确保您的 JDBC 版本与您打算使用的 Kafka 连接器版本的
pom.xml文件中指定的snowflake-jdbc版本相匹配。您可以转到您首选的 Kafka 连接器版本,例如 v4.0.0-rc4 (https://github.com/snowflakedb/snowflake-kafka-connector/releases/tag/v4.0.0-rc4)。然后浏览pom.xml文件以找出snowflake-jdbc的版本。如果您使用 Avro 格式摄入数据:
使用 Avro 解析器版本 1.8.2(或更高版本),可从以下网址获取:https://mvnrepository.com/artifact/org.apache.avro (https://mvnrepository.com/artifact/org.apache.avro)。
如果使用 Avro 的架构注册功能,请使用 Kafka Connect Avro Converter 版本 5.0.0(或更高版本),可从以下网址获取:https://mvnrepository.com/artifact/io.confluent (https://mvnrepository.com/artifact/io.confluent)。
请注意,OSS Apache Kafka 包不提供架构注册功能。
根据所需的数据保留时间和/或存储限制配置 Kafka。
Install and configure the Kafka Connect cluster.
每个 Kafka Connect 集群节点都应为 Kafka Connector 提供足够的 RAM。建议每个 Kafka 分区至少使用 5 MB。这还不包括 Kafka Connect 执行其他工作所需的 RAM。
我们建议在 Kafka Broker 和 Kafka Connect Runtime 上使用相同的版本。
我们强烈建议在与 Snowflake 账户相同的云提供商 区域 运行 Kafka Connect 实例。这并非严格要求的,但通常会提高吞吐量。
有关 Snowflake 客户端支持的操作系统列表,请参阅 操作系统支持。
Installing the connector¶
本节说明了如何安装和配置 Kafka Connector for Confluent。下表说明了受支持的版本以及有关预发布版和候选发布版的信息。
版本系列 |
状态 |
备注 |
|---|---|---|
4.x.x |
公开预览版 |
抢先体验。 支持 Snowpipe Streaming High Performance Architecture https://docs.snowflake.cn/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview 目前从 3.x 和 2.x 版本的迁移必须手动完成。它不能作为早期版本的直接替代品。它具有与 3.x、2.x、1.x 版本不同的功能集 |
3.x.x |
官方支持 |
不支持 Snowpipe Streaming High Performance Architecture https://docs.snowflake.cn/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview。 |
2.xx |
官方支持 |
建议升级。不支持 Snowpipe Streaming High Performance Architecture https://docs.snowflake.cn/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview。 |
1.x.x |
不支持 |
切勿使用此版本系列。 |
Installing the connector for Confluent¶
Download the Kafka connector files¶
从以下任一位置 Kafka Connector JAR 文件:
- Confluent Hub:
https://www.confluent.io/hub/ (https://www.confluent.io/hub/)
该包包含使用加密或未加密私钥进行密钥对身份身份验证所需的所有依赖项。有关详细信息,请参阅本主题后面的 使用密钥对身份验证和密钥轮换。
- Maven Central Repository:
https://mvnrepository.com/artifact/com.snowflake (https://mvnrepository.com/artifact/com.snowflake)
使用此版本时,您需要下载 Bouncy Castle (https://www.bouncycastle.org/) 加密库(JAR 文件):
https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/2.1.0 (https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/2.1.0)
https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/2.1.8 (https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/2.1.8)
将这些文件下载到与 Kafka Connector JAR 文件相同的本地文件夹中。
连接器的源代码可从以下网址获取:https://github.com/snowflakedb/snowflake-kafka-connector (https://github.com/snowflakedb/snowflake-kafka-connector)。
Install the Kafka connector¶
使用安装其他连接器的说明安装 Kafka Connector:
https://docs.confluent.io/current/connect/userguide.html (https://docs.confluent.io/current/connect/userguide.html)
为开源 Apache Kafka 安装连接器¶
本节说明了如何为开源 Apache Kafka 安装和配置 Kafka Connector。
Install Apache Kafka¶
从 Kafka 官方网站 (https://kafka.apache.org/downloads) 下载 Kafka 软件包。
在终端窗口中,切换为下载包文件的目录。
执行以下命令解压:file:
kafka_<scala_version>-<kafka_version>.tgz文件:tar xzvf kafka_<scala_version>-<kafka_version>.tgz
安装 JDK¶
安装并配置 Java 开发工具包 (JDK) 11 或更高版本。使用 SE 的 Standard Edition (JDK) 测试 Snowflake。Enterprise Edition (EE) 预计会兼容,但尚未进行测试。
如果您之前已安装 JDK,则可以跳过此部分。
从 Oracle JDK 网站 (https://www.oracle.com/technetwork/java/javase/downloads/index.html) 下载 JDK。
安装或解压 JDK。
按照适用于您操作系统的说明,设置环境变量 JAVA_HOME,使其指向包含 JDK 的目录。
Download the Kafka connector JAR files¶
从 Maven Central Repository 下载 Kafka Connector JAR 文件:
https://mvnrepository.com/artifact/com.snowflake (https://mvnrepository.com/artifact/com.snowflake)
下载 Bouncy Castle (https://www.bouncycastle.org/) 加密库 jar 文件:
https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/2.1.0 (https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/2.1.0)
https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/2.1.8 (https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/2.1.8)
如果您的 Kafka 数据以 Apache Avro (https://avro.apache.org/) 格式流式传输,请下载 Avro JAR 文件 (1.11.4):
https://mvnrepository.com/artifact/org.apache.avro/avro (https://mvnrepository.com/artifact/org.apache.avro/avro)
连接器的源代码可从以下网址获取:https://github.com/snowflakedb/snowflake-kafka-connector (https://github.com/snowflakedb/snowflake-kafka-connector)。
Install the Kafka connector¶
将您在 为开源 Apache Kafka 安装连接器 中下载的 JAR 文件复制到 <kafka_dir>/libs 文件夹。
Configuring the Kafka connector¶
在独立模式下部署时,通过创建一个指定 Snowflake 登录凭据、主题名称、Snowflake 表名称等参数的文件来配置连接器。在分布式模式下部署时,通过调用 Kafka Connect 集群的 REST API 端点来配置连接器。
重要
The Kafka Connect framework broadcasts the configuration settings for the Kafka connector from the master node to worker nodes. Configuration settings include sensitive information, specifically, the Snowflake username and private key. Make sure to secure the communication channel between Kafka Connect nodes. For more information, see the documentation for your Apache Kafka software.
Each configuration specifies the topics and corresponding tables for one database and one schema in that database. Note that a connector can ingest messages from any number of topics, but the corresponding tables must all be stored in a single database and schema.
本部分提供了分布式模式和独立模式的说明。
有关配置字段的描述,请参阅 连接器配置属性。
重要
由于配置文件通常包含安全相关信息(例如私钥),因此请适当设置文件的读/写权限,以限制访问。
In addition, consider storing the configuration file in a secure external location or a key management service. For more information, see Externalizing Secrets (in this topic).
分布式模式¶
创建 Kafka 配置文件,例如:file:<path>/<config_file>.json。使用所有连接器配置信息填充该文件。该文件必须为 JSON 格式。
Sample configuration file
{
"name":"XYZCompanySensorData",
"config":{
"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"tasks.max": "1",
"snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
"snowflake.url.name": "myorganization-myaccount.snowflakecomputing.cn:443",
"snowflake.warehouse.name": "WH",
"snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
"snowflake.schema.name": "MY_SCHEMA",
"snowflake.database.name": "MY_DATABASE",
"snowflake.role.name": "MY_ROLE",
"snowflake.user.name": "MY_USER",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.log.enable": "true",
"topics": "topic1,topic2",
"value.converter.schemas.enable": "false",
"errors.tolerance": "none"
}
}
独立模式¶
创建一个配置文件,例如:file:<kafka_dir>/config/SF_connect.properties。使用所有连接器配置信息填充该文件。
Sample configuration file
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
tasks.max=1
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.cn:443
snowflake.warehouse.name=WH
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=none
测试和原型设计的缓存注意事项¶
连接器会缓存表和管道查找检查,以提高分区重新平衡期间的性能。然而,在测试和原型设计期间,这种缓存行为可能导致连接器无法立即检测到手动创建的表或管道。
问题:当您在连接器运行时手动创建表或管道时,连接器默认可能会在长达 5 分钟的时间内继续使用缓存的存在性检查结果(这可能表明对象不存在)。这可能导致测试期间出现意外错误或行为。
测试建议: 为避免在测试和原型设计期间出现与缓存相关的问题,请将两个缓存有效期参数配置为最小值 1 毫秒,或禁用缓存:
snowflake.cache.table.exists.expire.ms=1
snowflake.cache.pipe.exists.expire.ms=1
此配置可确保连接器在每次分区重新平衡时都执行全新的存在性检查,从而允许您立即看到手动创建的表和管道的效果。
重要
这些最小缓存设置建议 仅用于测试和原型设计。在生产环境中,请使用默认的缓存有效值(5 分钟或更长),以最大程度地减少对 Snowflake 的元数据查询并提高重新平衡性能,尤其是在处理大量分区时。
连接器配置属性¶
所需属性¶
name应用程序名称。该名称在客户使用的所有 Kafka Connector 中必须是唯一的。此名称必须是有效的 Snowflake 无引号标识符。有关有效标识符的信息,请参阅 标识符要求。
connector.classcom.snowflake.kafka.connector.SnowflakeStreamingSinkConnectortopics以逗号分隔的主题列表。默认情况下,Snowflake 会假定表名称与主题名称相同。如果表名称与主题名称不一致,则使用可选的
topic2table.map参数(如下)指定从主题名称到表名称的映射。表名称必须是有效的 Snowflake 无引号标识符。有关有效表名称的信息,请参阅 标识符要求。备注
需要使用
topics或topics.regex中的 任一项 ;而不是两者。topics.regex这是一个正则表达式(“regex”),用于指定包含要加载到 Snowflake 表中的消息的主题。连接器会从与 regex 匹配的任何主题名称中加载数据。regex 必须遵循 Java 正则表达式的规则(即与 java.util.regex.Pattern 兼容)。配置文件应包含
topics或topics.regex中的 任一项,而不是两者。snowflake.url.nameURL 用于访问您的 Snowflake 账户。URL 必须包括您的 账户标识符。请注意,协议 (
https://) 和端口号是可选的。snowflake.user.nameSnowflake 账户的用户登录名。
snowflake.role.name连接器将用于向表中插入数据的角色名称。
snowflake.private.key用于验证用户身份的私钥。仅包含密钥,不包含标头或页脚。如果密钥分为多行,请删除换行符。您可以提供未加密的密钥,也可以提供加密的密钥并提供
snowflake.private.key.passphrase参数,允许 Snowflake 解密密钥。当且仅当snowflake.private.key参数值已加密时才使用此参数。这将解密根据 密钥对身份验证和密钥对轮换 中的说明进行加密的私钥。备注
另请参阅 可选属性 中的
snowflake.private.key.passphrase。snowflake.database.name包含要插入行的表的数据库名称。
snowflake.schema.name包含要插入行的表的架构名称。
header.converter仅当记录采用 Avro 格式且包含标头时才需要。其值为
"org.apache.kafka.connect.storage.StringConverter"。key.converter这是 Kafka 记录的密钥转换器(例如
"org.apache.kafka.connect.storage.StringConverter")。Kafka Connector 不使用这个转换器,但 Kafka Connect 平台需要它。有关当前限制的信息,请参阅 Kafka Connector 限制。
value.converter该连接器支持标准的 Kafka 社区转换器。根据您的数据格式选择合适的转换器:
对于 JSON 记录:
"org.apache.kafka.connect.json.JsonConverter"对于具有架构注册表的 Avro 记录:
"io.confluent.connect.avro.AvroConverter"
有关当前限制的信息,请参阅 Kafka Connector 限制。
可选属性¶
snowflake.private.key.passphrase如果此参数的值不为空,那么该连接器会使用此短语来尝试解密私钥。
tasks.max任务数,通常与Kafka Connect集群中工作节点的 CPU 内核数相同。为了实现最佳性能,Snowflake 建议将任务数量设置为等于 Kafka 分区的总数,但不超过 CPU 个核心的数量。任务数量高可能会导致内存使用增加和频繁的重新平衡。
snowflake.topic2table.map此可选参数允许用户指定哪些主题应映射到哪些表。每个主题及其表名称要用冒号隔开(请参阅下面的示例)。表名称必须是有效的 Snowflake 无引号标识符。有关有效表名称的信息,请参阅 标识符要求。主题配置允许使用正则表达式定义主题,就像使用
topics.regex一样。正则表达式不能含糊不清,任何匹配的主题只能匹配单个目标表。示例:
topics="topic1,topic2,topic5,topic6" snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
可以写成:
topics.regex="topic[0-9]" snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
value.converter.schema.registry.url如果采用 Avro 格式并且您正在使用架构注册表服务,则此值应为架构注册服务的 URL。否则该字段应为空。
value.converter.break.on.schema.registry.error如果从架构注册表服务加载 Avro 数据,此属性确定 Kafka Connector 在获取架构 ID 时遇到错误时是否应停止使用记录。默认值为
false。将值设为true可启用此行为。jvm.proxy.host要使 Snowflake Kafka Connector 能够通过代理服务器访问 Snowflake,请设置此参数以指定代理服务器的主机。
jvm.proxy.port要使 Snowflake Kafka Connector 能够通过代理服务器访问 Snowflake,请设置此参数以指定代理服务器的端口。
snowflake.streaming.max.client.lagSpecifies how often the connector flushes the data to Snowflake, in seconds.
- 值:
最小值:
1秒最大值:
600秒
- 默认值:
1秒
jvm.proxy.username通过代理服务器进行身份验证的用户名。
jvm.proxy.password通过代理服务器进行身份验证的用户名的密码。
snowflake.jdbc.map示例:
"snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"不会验证附加的 JDBC 属性(请参阅 JDBC 驱动程序连接参数参考)。这些附加属性不会验证,且不得覆盖或替换必需的属性,例如:
jvm.proxy.xxx、snowflake.user.name、snowflake.private.key、snowflake.schema.name等。- 指定以下任一组合:
tracing属性以及JDBC_TRACE环境变量database属性以及snowflake.database.name
将导致模糊行为,并且行为将由 JDBC 驱动程序确定。
value.converter.basic.auth.credentials.source如果使用 Avro 数据格式并需要安全访问 Kafka架构注册表,请将此参数设置为字符串“USER_INFO”,并设置如下所述的
value.converter.basic.auth.user.info参数。否则,省略该参数。value.converter.basic.auth.user.info如果使用 Avro 数据格式并需要安全访问 Kafka架构注册表,请将此参数设置为字符串“<user_ID>:<password>”,并设置如上所述的 value.converter.basic.auth.credentials.source 参数。否则,省略该参数。
snowflake.metadata.createtime如果值设置为 FALSE,从 RECORD_METADATA 列的元数据中将省略
CreateTime属性值。默认值为 TRUE。snowflake.metadata.topic如果值设置为 FALSE,从 RECORD_METADATA 列的元数据中将省略
topic属性值。默认值为 TRUE。snowflake.metadata.offset.and.partition如果值设置为 FALSE,从 RECORD_METADATA 列的元数据中将省略
Offset和Partition属性值。默认值为 TRUE。snowflake.metadata.all如果值设置为 FALSE, RECORD_METADATA 列中的元数据将完全为空。默认值为 TRUE。
transforms指定跳过 Kafka Connector 遇到的逻辑删除记录,不将它们加载到目标表中。逻辑删除记录定义为整个值字段为 null 的记录。
将属性值设置为
"tombstoneHandlerExample"。备注
此属性仅用于 Kafka 社区转换器(即
value.converter属性值)(例如org.apache.kafka.connect.json.JsonConverter或org.apache.kafka.connect.json.AvroConverter)。要使用 Snowflake 转换器管理逻辑删除记录处理,请改为使用behavior.on.null.values属性。transforms.tombstoneHandlerExample.type设置
transforms属性时需要。将属性值设置为
"io.confluent.connect.transforms.TombstoneHandler"behavior.on.null.values指定 Kafka Connector 应如何处理逻辑删除记录。逻辑删除记录定义为整个值字段为 null 的记录。 对于 Snowpipe,Kafka Connector 版本 1.5.5 及更高版本支持此属性。对于 Snowpipe Streaming,Kafka Connector 版本 2.1.0 及更高版本支持此属性。
此属性支持以下值:
DEFAULT当 Kafka Connector 遇到逻辑删除记录时,它会在内容列中插入一个空的 JSON 的字符串。
IGNOREKafka Connector 会跳过逻辑删除记录,并且不会为这些记录插入行。
默认值为
DEFAULT。备注
逻辑删除记录的引入因引入方法而异:
对于 Snowpipe,Kafka Connector 仅使用 Snowflake 转换器。要使用 Kafka 社区转换器管理逻辑删除记录处理,请改为使用
transform和transforms.tombstoneHandlerExample.type属性。对于 Snowpipe Streaming,Kafka Connector 仅使用社区转换器。
发送到 Kafka Broker 的记录不得为 NULL,因为这些记录会被 Kafka Connector 删除,从而导致偏移丢失。在特定用例中,缺失的偏移会破坏 Kafka Connector。建议您使用逻辑删除记录而不是 NULL 记录。
使用密钥对身份验证和密钥轮换¶
Kafka 连接器依赖于密钥对身份验证,而不是用户名和密码身份验证。RSA 使用通过 加密的公钥/私钥对签名。使用 OpenSSL 生成公钥-私钥对。公钥分配给配置文件中定义的 Snowflake 用户。
在完成此页面上的密钥对身份验证任务以及 密钥对轮换 任务后,评估本主题后面关于 外部化密钥 的建议。
要配置公钥/私钥对,请执行以下操作:
在终端窗口中的命令行生成私钥。
您可以生成加密版或未加密版的私钥。
备注
Kafka Connector 支持经过验证且符合联邦信息处理标准 (140-2)(即 FIPS 140-2)的加密算法要求。有关更多信息,请参阅 FIPS 140-2 (https://csrc.nist.gov/publications/detail/fips/140/2/final)。
要生成未加密的版本,请使用以下命令:
$ openssl genrsa -out rsa_key.pem 2048
要生成加密的版本,请使用以下命令:
$ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
其中
<algorithm>是符合 FIPS 140-2 标准的加密算法。例如,指定 AES 256 作为加密算法:
$ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
如果生成加密版的私钥,请记录加密短语。稍后,您将在 Kafka 配置文件
snowflake.private.key.passphrase属性中指定该加密短语。示例 PEM 私钥
-----BEGIN ENCRYPTED PRIVATE KEY----- MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1 wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL ... -----END ENCRYPTED PRIVATE KEY-----
从命令行中,通过引用私钥生成公钥:
假设私钥已加密并包含在名为
rsa_key.p8的文件中,使用以下命令:$ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
示例 PEM 公钥
-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4 zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk ... -----END PUBLIC KEY-----
将公钥和私钥文件复制到本地目录进行存储。记下文件的路径。私钥使用 PKCS#8(公钥加密标准)格式存储,并使用您在上一步中指定的加密短语进行加密;但仍应使用操作系统提供的文件权限机制保护文件,防止未经授权的访问。不使用文件时,用户有责任保护该文件的安全。
登录 Snowflake。使用 ALTER USER 将公钥分配给 Snowflake 用户。
For example:
ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
备注
只有安全管理员(即具有 SECURITYADMIN 角色的用户)或更高级别的用户才能更改用户。
在 SQL 语句中排除公钥标头和页脚。
使用 DESCRIBE USER 验证用户的公钥指纹:
DESC USER jsmith; +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+ | property | value | default | description | |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------| | NAME | JSMITH | null | Name | ... ... | RSA_PUBLIC_KEY_FP | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null | Fingerprint of user's RSA public key. | | RSA_PUBLIC_KEY_2_FP | null | null | Fingerprint of user's second RSA public key. | +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
备注
RSA_PUBLIC_KEY_2_FP属性在 配置密钥对轮换 中作了说明。将整个私钥复制并粘贴到配置文件中的
snowflake.private.key字段。保存文件。
外部化密钥¶
Snowflake 强烈建议将私钥等密钥外部化,并以加密形式存储或存储在密钥管理服务中,例如 AWS Key Management Service (KMS)、Microsoft Azure Key Vault,或 HashiCorp Vault。这可以通过在Kafka Connect集群上使用 ConfigProvider 来完成。
有关更多信息,请参阅 Confluent 对此 服务 (https://docs.confluent.io/current/connect/security.html#externalizing-secrets) 的描述。
Starting the connector¶
使用第三方 Confluent 或 Apache Kafka 文档中提供的说明启动 Kafka。您可以在分布式模式或独立模式下启动 Kafka Connector。每种模式的说明如下所示:
分布式模式¶
在终端窗口中,执行以下命令:
curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors
独立模式¶
在终端窗口中,执行以下命令:
<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties
备注
Apache Kafka 或 Confluent Kafka 的默认安装应该已经包含了 connect-standalone.properties 文件。