安装和配置 Kafka Connector¶
Kafka Connector 以 JAR(Java 可执行文件)文件的形式提供。
Snowflake 提供两个版本的连接器:
一个版本为 Kafka 的 Confluent 包版本 (https://www.confluent.io/hub/snowflakeinc/snowflake-kafka-connector/)。
一个版本为 开源软件 (OSS) Apache Kafka 包 (https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/)。
备注
Kafka Connector 受 Connector 条款 的约束。
本主题中的说明指明了哪些步骤仅适用于这两个版本中的特定版本。
本主题内容:
配置 Snowflake 对象的访问控制¶
所需权限¶
创建和管理 Kafka Connecter 使用的 Snowflake 对象需要具有以下最低权限的角色:
对象 |
权限 |
备注 |
---|---|---|
数据库 |
USAGE |
|
架构 |
USAGE . CREATE TABLE . CREATE STAGE . CREATE PIPE |
创建架构级对象后,可以撤消 CREATE |
表 |
OWNERSHIP |
仅在使用 Kafka Connector 将数据引入到 现有 表时需要。. 如果连接器为来自 Kafka 主题的记录创建了一个新的目标表,则 Kafka 配置文件中指定的用户的默认角色将成为表所有者(即拥有表的 OWNERSHIP 权限)。 |
暂存区 |
READ . WRITE |
仅在使用 Kafka Connector 将 Kafka 中的数据文件暂存到 现有 内部暂存区时才需要(不推荐)。. 如果连接器创建了一个新的暂存区来临时存储从 Kafka 主题消耗的数据文件,那么 Kafka 配置文件中指定的用户的默认角色将成为暂存区所有者(即具有暂存区的 OWNERSHIP 权限)。 |
Snowflake 建议您为每个 Kafka 实例创建单独的用户(使用 CREATE USER)和角色(使用 CREATE ROLE),以便在需要时单独撤消访问权限。角色应分配为用户的默认角色。
创建角色以使用 Kafka Connector¶
以下脚本可用于创建一个自定义角色供 Kafka Connector 使用(例如 KAFKA_CONNECTOR_ROLE_1)。任何可以授予权限的角色(例如 SECURITYADMIN 或任何具有 MANAGE GRANTS 权限的角色)都可以将此自定义角色授予任何用户,以允许 Kafka Connector 创建所需的 Snowflake 对象并将数据插入表中。该脚本引用了特定的 现有 数据库和架构 (kafka_db.kafka_schema
) 以及用户 (kafka_connector_user_1
):
-- Use a role that can create and manage roles and privileges.
USE ROLE securityadmin;
-- Create a Snowflake role with the privileges to work with the connector.
CREATE ROLE kafka_connector_role_1;
-- Grant privileges on the database.
GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;
-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
-- Only required if the Kafka connector will load data into an existing table.
GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;
-- Only required if the Kafka connector will stage data files in an existing internal stage: (not recommended).
GRANT READ, WRITE ON STAGE existing_stage1 TO ROLE kafka_connector_role_1;
-- Grant the custom role to an existing user.
GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;
-- Set the custom role as the default role for the user.
-- If you encounter an 'Insufficient privileges' error, verify the role that has the OWNERSHIP privilege on the user.
ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
有关创建自定义角色和角色层次结构的更多信息,请参阅 配置访问控制。
安装先决条件¶
Kafka Connector 支持以下包版本:
包
Snowflake Kafka Connector 版本
包支持(由 Snowflake 测试)
Apache Kafka
2.0.0(或更高版本)
Apache Kafka 2.5.1、2.8.1、3.2.1
Confluent
2.0.0(或更高版本)
Confluent 6.2.6、7.2.1
Kafka Connector 专为与 Kafka Connect API 3.2.3 搭配使用而构建。任何较新版本的Kafka Connect API 均未经过测试。任何早于 3.2.3 的版本都与该连接器兼容。有关更多信息,请参阅 Kafka 兼容性 (https://kafka.apache.org/protocol.html#protocol_compatibility)。
当环境中同时拥有 Kafka Connector 和 JDBC 驱动程序 jar 文件时,请确保您的 JDBC 版本与您想要的 Kafka Connector 版本的
pom.xml
文件中指定的snowflake-jdbc
版本相匹配。您可以转到首选的 Kafka Connector 发布版本,例如:v2.0.1 (https://github.com/snowflakedb/snowflake-kafka-connector/releases/tag/v2.0.1)。然后浏览pom.xml
文件以找出snowflake-jdbc
的版本。如果使用 Avro 格式引入数据:
使用 Avro 解析器版本 1.8.2(或更高版本),可从以下网址获取:https://mvnrepository.com/artifact/org.apache.avro (https://mvnrepository.com/artifact/org.apache.avro)。
如果使用 Avro 的架构注册功能,请使用 Kafka Connect Avro Converter 版本 5.0.0(或更高版本),可从以下网址获取:https://mvnrepository.com/artifact/io.confluent (https://mvnrepository.com/artifact/io.confluent)。
请注意,OSS Apache Kafka 包不提供架构注册功能。
根据所需的数据保留时间和/或存储限制配置 Kafka。
安装并配置Kafka Connect 集群。
每个 Kafka Connect 集群节点都应为 Kafka Connector 提供足够的 RAM。建议每个 Kafka 分区至少使用 5 MB。这还不包括 Kafka Connect 执行其他工作所需的 RAM。
我们建议在 Kafka Broker 和 Kafka Connect Runtime 上使用相同的版本。
我们强烈建议在与 Snowflake 账户相同的云提供商 区域 运行 Kafka Connect 实例。这并非严格要求的,但通常会提高吞吐量。
有关 Snowflake 客户端支持的操作系统列表,请参阅 操作系统支持。
安装连接器¶
本节说明了如何安装和配置 Kafka Connector for Confluent。有关 Kafka Connector 版本的信息,请参阅下表:
版本系列 |
状态 |
备注 |
---|---|---|
2.xx |
官方支持 |
最新版本,强烈推荐。 |
1.9.x |
官方支持 |
建议升级。 |
1.8.x |
不支持 |
切勿使用此版本系列。 |
1.7.x |
不支持 |
切勿使用此版本系列。 |
安装 Connector for Confluent¶
下载 Kafka Connector 文件¶
从以下任一位置 Kafka Connector JAR 文件:
- Confluent Hub:
https://www.confluent.io/hub/ (https://www.confluent.io/hub/)
该包包含使用加密或未加密私钥进行密钥对身份身份验证所需的所有依赖项。有关更多信息,请参阅 `使用密钥对身份验证和密钥轮换 `_ (本主题内容)。
- Maven Central Repository:
https://mvnrepository.com/artifact/com.snowflake (https://mvnrepository.com/artifact/com.snowflake)
JAR 文件不需要任何额外的依赖项,即可使用 未加密 私钥进行密钥对身份验证。要使用加密私钥,请下载 Bouncy Castle (https://www.bouncycastle.org/) 加密库(一个 JAR 文件)。Snowflake 使用Bouncy Castle解密用于登录的加密 RSA 私钥:
https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/1.0.1 (https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/1.0.1)
https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/1.0.3 (https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/1.0.3)
将这些文件下载到与 Kafka Connector JAR 文件相同的本地文件夹中。
连接器的源代码可从以下网址获取:https://github.com/snowflakedb/snowflake-kafka-connector (https://github.com/snowflakedb/snowflake-kafka-connector)。
安装 Kafka Connector¶
使用安装其他连接器的说明安装 Kafka Connector:
https://docs.confluent.io/current/connect/userguide.html (https://docs.confluent.io/current/connect/userguide.html)
为开源 Apache Kafka 安装连接器¶
本节说明了如何为开源 Apache Kafka 安装和配置 Kafka Connector。
安装 Apache Kafka¶
从 Kafka 官网下载 Kafka 包:https://kafka.apache.org/downloads (https://kafka.apache.org/downloads)。
在终端窗口中,切换为下载包文件的目录。
执行以下命令解压:file:kafka_<scala_version>-<kafka_version>.tgz 文件:
tar xzvf kafka_<scala_version>-<kafka_version>.tgz
安装 JDK¶
安装并配置 Java Development Kit (JDK)。使用 JDK 的 Standard Edition (SE) 测试 Snowflake。Enterprise Edition (EE) 预计会兼容,但尚未进行测试。
如果您已经完成此步骤,则可以跳过此部分。
从 https://www.oracle.com/technetwork/java/javase/downloads/index.html (https://www.oracle.com/technetwork/java/javase/downloads/index.html) 下载 JDK。
安装或解压 JDK。
按照适用于您操作系统的说明,设置环境变量 JAVA_HOME,使其指向包含 JDK 的目录。
下载 Kafka Connector JAR 文件¶
从 Maven Central Repository 下载 Kafka Connector JAR 文件:
https://mvnrepository.com/artifact/com.snowflake (https://mvnrepository.com/artifact/com.snowflake)
JAR 文件不需要任何额外的依赖项,即可使用 未加密 私钥进行密钥对身份验证。要使用加密私钥,请下载 Bouncy Castle (https://www.bouncycastle.org/) 加密库(一个 JAR 文件)。Snowflake 使用Bouncy Castle解密用于登录的加密 RSA 私钥:
https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/1.0.1 (https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/1.0.1)
https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/1.0.3 (https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/1.0.3)
如果您的 Kafka 数据是以 Apache Avro (https://avro.apache.org/) 格式流式传输的,那么请下载 Avro JAR 文件:
https://mvnrepository.com/artifact/org.apache.avro/avro (https://mvnrepository.com/artifact/org.apache.avro/avro)
连接器的源代码可从以下网址获取:https://github.com/snowflakedb/snowflake-kafka-connector (https://github.com/snowflakedb/snowflake-kafka-connector)。
安装 Kafka Connector¶
将您在 下载 Kafka Connector JAR 文件 中下载的 JAR 文件复制到 <kafka_dir>/libs
文件夹中。
配置 Kafka Connector¶
该连接器通过创建一个文件进行配置,该文件中指定了诸如 Snowflake 登录凭据、主题名称、Snowflake 表名称等参数。
重要
Kafka Connect 框架会将 Kafka Connector 的配置设置从主节点广播到工作节点。配置设置包括敏感信息(特别是 Snowflake 用户名和私钥)。确保确保 Kafka Connect 节点之间的通信渠道的安全。有关说明,请参阅 Apache Kafka 软件的文档。
每个配置文件会为数据库和该数据库中的架构指定主题和相应的表。请注意,连接器可以从任意数量的主题中引入消息,但相应的表必须全部存储在同一数据库和架构中。
本部分提供了分布式模式和独立模式的说明。
有关配置字段的描述,请参阅 Kafka 配置属性。
重要
由于配置文件通常包含安全相关信息(例如私钥),因此请适当设置文件的读/写权限,以限制访问。
此外,可考虑将配置文件存储在安全的外部位置或密钥管理服务中。有关更多信息,请参阅 外部化密钥 (本主题内容)。
分布式模式¶
创建 Kafka 配置文件,例如:file:<path>/<config_file>.json。使用所有连接器配置信息填充该文件。该文件应为 JSON 格式。
配置文件示例
{
"name":"XYZCompanySensorData",
"config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"topic1,topic2",
"snowflake.topic2table.map": "topic1:table1,topic2:table2",
"buffer.count.records":"10000",
"buffer.flush.time":"60",
"buffer.size.bytes":"5000000",
"snowflake.url.name":"myorganization-myaccount.snowflakecomputing.cn:443",
"snowflake.user.name":"jane.smith",
"snowflake.private.key":"xyz123",
"snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
"snowflake.database.name":"mydb",
"snowflake.schema.name":"myschema",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"value.converter.basic.auth.credentials.source":"USER_INFO",
"value.converter.basic.auth.user.info":"jane.smith:MyStrongPassword"
}
}
独立模式¶
创建配置文件,例如:file:<kafka_dir>/config/SF_connect.properties。使用所有连接器配置信息填充该文件。
配置文件示例
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=topic1,topic2
snowflake.topic2table.map= topic1:table1,topic2:table2
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=myorganization-myaccount.snowflakecomputing.cn:443
snowflake.user.name=jane.smith
snowflake.private.key=xyz123
snowflake.private.key.passphrase=jkladu098jfd089adsq4r
snowflake.database.name=mydb
snowflake.schema.name=myschema
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeAvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=jane.smith:MyStrongPassword
Kafka 配置属性¶
可在 Kafka 配置文件中为分布式模式或独立模式设置以下属性:
所需属性¶
name
应用程序名称。该名称在客户使用的所有 Kafka Connector 中必须是唯一的。此名称必须是有效的 Snowflake 无引号标识符。有关有效标识符的信息,请参阅 标识符要求。
connector.class
com.snowflake.kafka.connector.SnowflakeSinkConnector
topics
以逗号分隔的主题列表。默认情况下,Snowflake 会假定表名称与主题名称相同。如果表名称与主题名称不一致,则使用可选的
topic2table.map
参数(如下)指定从主题名称到表名称的映射。表名称必须是有效的 Snowflake 无引号标识符。有关有效表名称的信息,请参阅 标识符要求。备注
需要使用
topics
或topics.regex
中的 任一项 ;而不是两者。topics.regex
这是一个正则表达式(“regex”),用于指定包含要加载到 Snowflake 表中的消息的主题。连接器会从与 regex 匹配的任何主题名称中加载数据。regex 必须遵循 Java 正则表达式的规则(即与 java.util.regex.Pattern 兼容)。配置文件应包含
topics
或topics.regex
中的 任一项,而不是两者。snowflake.url.name
URL 用于访问您的 Snowflake 账户。URL 必须包括您的 :doc:` 账户标识符 </user-guide/admin-account-identifier>` 。请注意,协议 (
https://
) 和端口号是可选的。snowflake.user.name
Snowflake 账户的用户登录名。
snowflake.private.key
用于验证用户身份的私钥。仅包含密钥,不包含标头或页脚。如果密钥分为多行,请删除换行符。您可以提供未加密的密钥,也可以提供加密的密钥并提供
snowflake.private.key.passphrase
参数,允许 Snowflake 解密密钥。当且仅当snowflake.private.key
参数值已加密时才使用此参数。这将解密按照 `使用密钥对身份验证和密钥轮换 `_ (本主题内容)中的说明加密的私钥。备注
此外,请参阅 可选属性 (本主题内容)中的
snowflake.private.key.passphrase
。snowflake.database.name
包含要插入行的表的数据库名称。
snowflake.schema.name
包含要插入行的表的架构名称。
header.converter
仅当记录采用 Avro 格式且包含标头时才需要。其值为
"org.apache.kafka.connect.storage.StringConverter"
。key.converter
这是 Kafka 记录的密钥转换器(例如
"org.apache.kafka.connect.storage.StringConverter"
)。Kafka Connector 不使用这个转换器,但 Kafka Connect 平台需要它。有关当前限制的信息,请参阅 Kafka Connector 限制。
value.converter
如果记录采用 JSON 格式,此值应为
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
。备注
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
按原样反序列化记录。每个 json 字段都被视为一个记录字段,不会对架构或包含元数据的任何其他字段进行特殊处理。如果记录采用 Avro 格式并使用 Kafka 的架构注册表服务,此值应为
"com.snowflake.kafka.connector.records.SnowflakeAvroConverter"
。如果记录采用 Avro 格式并包含架构(因此不需要 Kafka 的架构注册表服务),此值应为
"com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry"
。如果记录采用纯文本格式,此值应为
"org.apache.kafka.connect.storage.StringConverter"
。有关当前限制的信息,请参阅 Kafka Connector 限制。
可选属性¶
snowflake.private.key.passphrase
如果此参数的值不为空,那么 Kafka 会使用此短语来尝试解密私钥。
tasks.max
任务数,通常与Kafka Connect集群中工作节点的 CPU 内核数相同。这个数字可以设置得更低或更高;但是,Snowflake 不建议将其设置得更高。
snowflake.topic2table.map
此可选参数允许用户指定哪些主题应映射到哪些表。每个主题及其表名称要用冒号隔开(请参阅下面的示例)。表名称必须是有效的 Snowflake 无引号标识符。有关有效表名称的信息,请参阅 标识符要求。
buffer.count.records
在引入到 Snowflake 之前,每个 Kafka 分区在内存中缓冲的记录数。默认值为
10000
条记录。buffer.flush.time
缓冲区刷新之间的间隔秒数,其中刷新是从 Kafka 的内存缓存到内部暂存区。默认值为
120
秒。buffer.size.bytes
记录作为数据文件引入到 Snowflake 之前,每个 Kafka 分区内存中缓冲记录的累计大小(以字节为单位)。默认值为
5000000
(5 MB)。记录在写入数据文件时被压缩。因此,缓冲区中记录的大小可能大于从记录创建的数据文件的大小。
value.converter.schema.registry.url
如果采用 Avro 格式并且您正在使用架构注册表服务,则此值应为架构注册服务的 URL。否则该字段应为空。
value.converter.break.on.schema.registry.error
如果从架构注册表服务加载 Avro 数据,此属性确定 Kafka Connector 在获取架构 ID 时遇到错误时是否应停止使用记录。默认值为
false
。将值设为true
可启用此行为。受 Kafka Connector 版本 1.4.2(及更高版本)支持。
jvm.proxy.host
要使 Snowflake Kafka Connector 能够通过代理服务器访问 Snowflake,请设置此参数以指定代理服务器的主机。
jvm.proxy.port
要使 Snowflake Kafka Connector 能够通过代理服务器访问 Snowflake,请设置此参数以指定代理服务器的端口。
jvm.proxy.username
通过代理服务器进行身份验证的用户名。
受 Kafka Connector 版本 1.4.4(及更高版本)支持。
jvm.proxy.password
通过代理服务器进行身份验证的用户名的密码。
受 Kafka Connector 版本 1.4.4(及更高版本)支持。
value.converter.basic.auth.credentials.source
如果使用 Avro 数据格式并需要安全访问 Kafka架构注册表,请将此参数设置为字符串“USER_INFO”,并设置如下所述的
value.converter.basic.auth.user.info
参数。否则,省略该参数。value.converter.basic.auth.user.info
如果使用 Avro 数据格式并需要安全访问 Kafka架构注册表,请将此参数设置为字符串“<user_ID>:<password>”,并设置如上所述的 value.converter.basic.auth.credentials.source 参数。否则,省略该参数。
snowflake.metadata.createtime
如果值设置为 FALSE,从 RECORD_METADATA 列的元数据中将省略
CreateTime
属性值。默认值为 TRUE。受 Kafka Connector 版本 1.2.0(及更高版本)支持。
snowflake.metadata.topic
如果值设置为 FALSE,从 RECORD_METADATA 列的元数据中将省略
topic
属性值。默认值为 TRUE。受 Kafka Connector 版本 1.2.0(及更高版本)支持。
snowflake.metadata.offset.and.partition
如果值设置为 FALSE,从 RECORD_METADATA 列的元数据中将省略
Offset
和Partition
属性值。默认值为 TRUE。受 Kafka Connector 版本 1.2.0(及更高版本)支持。
snowflake.metadata.all
如果值设置为 FALSE, RECORD_METADATA 列中的元数据将完全为空。默认值为 TRUE。
受 Kafka Connector 版本 1.2.0(及更高版本)支持。
transforms
指定跳过 Kafka Connector 遇到的逻辑删除记录,不将它们加载到目标表中。逻辑删除记录定义为整个值字段为 null 的记录。
将属性值设置为
"tombstoneHandlerExample"
。备注
此属性仅用于 Kafka 社区转换器(即
value.converter
属性值)(例如org.apache.kafka.connect.json.JsonConverter
或org.apache.kafka.connect.json.AvroConverter
)。要使用 Snowflake 转换器管理逻辑删除记录处理,请改为使用behavior.on.null.values
属性。transforms.tombstoneHandlerExample.type
设置
transforms
属性时需要。将属性值设置为
"io.confluent.connect.transforms.TombstoneHandler"
behavior.on.null.values
指定 Kafka Connector 应如何处理逻辑删除记录。逻辑删除记录定义为整个值字段为 null 的记录。 对于 Snowpipe,Kafka Connector 版本 1.5.5 及更高版本支持此属性。对于 Snowpipe Streaming,Kafka Connector 版本 2.1.0 及更高版本支持此属性。
此属性支持以下值:
DEFAULT
当 Kafka Connector 遇到逻辑删除记录时,它会在内容列中插入一个空的 JSON 的字符串。
IGNORE
Kafka Connector 会跳过逻辑删除记录,并且不会为这些记录插入行。
默认值为
DEFAULT
。备注
逻辑删除记录的引入因引入方法而异:
对于 Snowpipe,Kafka Connector 仅使用 Snowflake 转换器。要使用 Kafka 社区转换器管理逻辑删除记录处理,请改为使用
transform
和transforms.tombstoneHandlerExample.type
属性。对于 Snowpipe Streaming,Kafka Connector 仅使用社区转换器。
发送到 Kafka Broker 的记录不得为 NULL,因为这些记录会被 Kafka Connector 删除,从而导致偏移丢失。在特定用例中,丢失的偏移会破坏 Kafka Connector。建议您使用逻辑删除记录而不是 NULL 记录。
使用密钥对身份验证和密钥轮换¶
Kafka Connector 依赖于密钥对身份验证,而不是基本身份验证(即用户名和密码)。此身份验证方法需要 2048 位(最低)RSA 密钥对。使用 OpenSSL 生成公钥-私钥对。公钥分配给配置文件中定义的 Snowflake 用户。
完成本页上的密钥身份验证说明和 密钥对轮换 说明后,评估 外部化密钥 建议(本主题内容)。
要配置公钥/私钥对,请执行以下操作:
在终端窗口中的命令行生成私钥。
您可以生成加密版或未加密版的私钥。
备注
Kafka Connector 支持经过验证且符合联邦信息处理标准 (140-2)(即 FIPS 140-2)的加密算法要求。有关更多信息,请参阅 FIPS 140-2 (https://csrc.nist.gov/publications/detail/fips/140/2/final)。
要生成未加密的版本,请使用以下命令:
$ openssl genrsa -out rsa_key.pem 2048
要生成加密的版本,请使用以下命令:
$ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
其中
<algorithm>
是符合 FIPS 140-2 标准的加密算法。例如,指定 AES 256 作为加密算法:
$ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
如果生成加密版的私钥,请记录加密短语。稍后,您将在 Kafka 配置文件
snowflake.private.key.passphrase
属性中指定该加密短语。示例 PEM 私钥
-----BEGIN ENCRYPTED PRIVATE KEY----- MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1 wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL ... -----END ENCRYPTED PRIVATE KEY-----
从命令行中,通过引用私钥生成公钥:
假设私钥已加密并包含在名为
rsa_key.p8
的文件中,使用以下命令:$ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
示例 PEM 公钥
-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4 zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk ... -----END PUBLIC KEY-----
将公钥和私钥文件复制到本地目录进行存储。记录文件的路径。请注意,私钥使用 PKCS#8(公钥加密标准)格式存储,并使用您在上一步中指定的加密短语进行加密;但仍应使用操作系统提供的文件权限机制保护文件,防止未经授权的访问。您有责任在不使用文件时保护该文件。
登录 Snowflake。使用 ALTER USER 将公钥分配给 Snowflake 用户。例如:
ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
备注
只有安全管理员(即具有 SECURITYADMIN 角色的用户)或更高级别的用户才能更改用户。
在 SQL 语句中排除公钥标头和页脚。
使用 DESCRIBE USER 验证用户的公钥指纹:
DESC USER jsmith; +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+ | property | value | default | description | |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------| | NAME | JSMITH | null | Name | ... ... | RSA_PUBLIC_KEY_FP | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null | Fingerprint of user's RSA public key. | | RSA_PUBLIC_KEY_2_FP | null | null | Fingerprint of user's second RSA public key. | +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
备注
RSA_PUBLIC_KEY_2_FP
属性在 配置密钥对轮换 中作了说明。将整个私钥复制并粘贴到配置文件中的
snowflake.private.key
字段。保存文件。
外部化密钥¶
Snowflake 强烈建议将私钥等密钥外部化,并以加密形式存储或存储在密钥管理服务中,例如 AWS Key Management Service (KMS)、Microsoft Azure Key Vault,或 HashiCorp Vault。这可以通过在Kafka Connect集群上使用 ConfigProvider
来完成。
有关更多信息,请参阅 Confluent 对此 服务 (https://docs.confluent.io/current/connect/security.html#externalizing-secrets) 的描述。
启动 Kafka¶
使用第三方 Confluent 或 Apache Kafka 文档中提供的说明启动 Kafka。
启动 Kafka Connector¶
您可以在分布式模式或独立模式下启动 Kafka Connector。每种模式的说明如下所示:
分布式模式¶
在终端窗口中,执行以下命令:
curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors
独立模式¶
在终端窗口中,执行以下命令:
<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties
(Apache Kafka 或 Confluent Kafka 的默认安装应该已经包含该 connect-standalone.properties
文件。)
测试和使用 Kafka Connector¶
我们建议在生产系统中使用 Kafka Connector 之前,先用少量数据对其进行测试。测试过程与正常使用连接器的过程相同:
验证 Kafka 和Kafka Connect是否正在运行。
验证是否已创建适当的 Kafka 主题。
创建(或使用现有的)消息发布器。确保发布到主题的消息具有正确的格式(JSON、Avro 或纯文本)。
创建一个配置文件,指定要订阅的主题和要写入的 Snowflake 表。有关说明,请参阅 配置 Kafka Connector (本主题内容)。
(可选)创建要写入数据的表。此步骤是可选的;如果您不创建表,Kafka Connector 会为您创建表。如果不打算使用连接器将数据添加到现有的非空表,那么我们建议让连接器为您创建表,以尽量减少架构不匹配的可能性。
将 Snowflake 对象(数据库、架构、目标表等)所需的最低权限授予将用于引入数据的角色。
向配置的 Kafka 主题发布一组示例数据。
等待几分钟,让数据在系统中传播,然后检查 Snowflake 表以验证记录是否已插入。
小技巧
在测试和生产环境中将数据加载到 Snowflake 之前,请考虑使用 SnowCD 验证您与 Snowflake 的网络连接。