安装和配置 Kafka Connector

Kafka Connector 以 JAR(Java 可执行文件)文件的形式提供。

Snowflake 提供两个版本的连接器:

  • 一个版本为 Kafka 的 Confluent 包版本 (https://www.confluent.io/hub/snowflakeinc/snowflake-kafka-connector/)。

  • 一个版本为 开源软件 (OSS) Apache Kafka 包 (https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/)。

备注

Kafka Connector 受 Connector 条款 的约束。

本主题中的说明指明了哪些步骤仅适用于这两个版本中的特定版本。

本主题内容:

配置 Snowflake 对象的访问控制

所需权限

创建和管理 Kafka Connecter 使用的 Snowflake 对象需要具有以下最低权限的角色:

对象

权限

备注

数据库

USAGE

架构

USAGE . CREATE TABLE . CREATE STAGE . CREATE PIPE

创建架构级对象后,可以撤消 CREATE object 权限。

OWNERSHIP

仅在使用 Kafka Connector 将数据引入到 现有 表时需要。. 如果连接器为来自 Kafka 主题的记录创建了一个新的目标表,则 Kafka 配置文件中指定的用户的默认角色将成为表所有者(即拥有表的 OWNERSHIP 权限)。

暂存区

READ . WRITE

仅在使用 Kafka Connector 将 Kafka 中的数据文件暂存到 现有 内部暂存区时才需要(不推荐)。. 如果连接器创建了一个新的暂存区来临时存储从 Kafka 主题消耗的数据文件,那么 Kafka 配置文件中指定的用户的默认角色将成为暂存区所有者(即具有暂存区的 OWNERSHIP 权限)。

Snowflake 建议您为每个 Kafka 实例创建单独的用户(使用 CREATE USER)和角色(使用 CREATE ROLE),以便在需要时单独撤消访问权限。角色应分配为用户的默认角色。

创建角色以使用 Kafka Connector

以下脚本可用于创建一个自定义角色供 Kafka Connector 使用(例如 KAFKA_CONNECTOR_ROLE_1)。任何可以授予权限的角色(例如 SECURITYADMIN 或任何具有 MANAGE GRANTS 权限的角色)都可以将此自定义角色授予任何用户,以允许 Kafka Connector 创建所需的 Snowflake 对象并将数据插入表中。该脚本引用了特定的 现有 数据库和架构 (kafka_db.kafka_schema) 以及用户 (kafka_connector_user_1):

-- Use a role that can create and manage roles and privileges.
USE ROLE securityadmin;

-- Create a Snowflake role with the privileges to work with the connector.
CREATE ROLE kafka_connector_role_1;

-- Grant privileges on the database.
GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;

-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will load data into an existing table.
GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will stage data files in an existing internal stage: (not recommended).
GRANT READ, WRITE ON STAGE existing_stage1 TO ROLE kafka_connector_role_1;

-- Grant the custom role to an existing user.
GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;

-- Set the custom role as the default role for the user.
-- If you encounter an 'Insufficient privileges' error, verify the role that has the OWNERSHIP privilege on the user.
ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
Copy

有关创建自定义角色和角色层次结构的更多信息,请参阅 配置访问控制

安装先决条件

  • Kafka Connector 支持以下包版本:

    Snowflake Kafka Connector 版本

    包支持(由 Snowflake 测试)

    Apache Kafka

    2.0.0(或更高版本)

    Apache Kafka 2.5.1、2.8.1、3.2.1

    Confluent

    2.0.0(或更高版本)

    Confluent 6.2.6、7.2.1

  • Kafka Connector 专为与 Kafka Connect API 3.2.3 搭配使用而构建。任何较新版本的Kafka Connect API 均未经过测试。任何早于 3.2.3 的版本都与该连接器兼容。有关更多信息,请参阅 Kafka 兼容性 (https://kafka.apache.org/protocol.html#protocol_compatibility)。

  • 当环境中同时拥有 Kafka Connector 和 JDBC 驱动程序 jar 文件时,请确保您的 JDBC 版本与您想要的 Kafka Connector 版本的 pom.xml 文件中指定的 snowflake-jdbc 版本相匹配。您可以转到首选的 Kafka Connector 发布版本,例如:v2.0.1 (https://github.com/snowflakedb/snowflake-kafka-connector/releases/tag/v2.0.1)。然后浏览 pom.xml 文件以找出 snowflake-jdbc 的版本。

  • 如果使用 Avro 格式引入数据:

    • 使用 Avro 解析器版本 1.8.2(或更高版本),可从以下网址获取:https://mvnrepository.com/artifact/org.apache.avro (https://mvnrepository.com/artifact/org.apache.avro)。

    • 如果使用 Avro 的架构注册功能,请使用 Kafka Connect Avro Converter 版本 5.0.0(或更高版本),可从以下网址获取:https://mvnrepository.com/artifact/io.confluent (https://mvnrepository.com/artifact/io.confluent)。

      请注意,OSS Apache Kafka 包不提供架构注册功能。

  • 根据所需的数据保留时间和/或存储限制配置 Kafka。

  • 安装并配置Kafka Connect 集群。

    每个 Kafka Connect 集群节点都应为 Kafka Connector 提供足够的 RAM。建议每个 Kafka 分区至少使用 5 MB。这还不包括 Kafka Connect 执行其他工作所需的 RAM。

  • 我们建议在 Kafka Broker 和 Kafka Connect Runtime 上使用相同的版本。

  • 我们强烈建议在与 Snowflake 账户相同的云提供商 区域 运行 Kafka Connect 实例。这并非严格要求的,但通常会提高吞吐量。

有关 Snowflake 客户端支持的操作系统列表,请参阅 操作系统支持

安装连接器

本节说明了如何安装和配置 Kafka Connector for Confluent。有关 Kafka Connector 版本的信息,请参阅下表:

版本系列

状态

备注

2.xx

官方支持

最新版本,强烈推荐。

1.9.x

官方支持

建议升级。

1.8.x

不支持

切勿使用此版本系列。

1.7.x

不支持

切勿使用此版本系列。

安装 Connector for Confluent

下载 Kafka Connector 文件

从以下任一位置 Kafka Connector JAR 文件:

Confluent Hub:

https://www.confluent.io/hub/ (https://www.confluent.io/hub/)

该包包含使用加密或未加密私钥进行密钥对身份身份验证所需的所有依赖项。有关更多信息,请参阅 `使用密钥对身份验证和密钥轮换 `_ (本主题内容)。

Maven Central Repository:

https://mvnrepository.com/artifact/com.snowflake (https://mvnrepository.com/artifact/com.snowflake)

JAR 文件不需要任何额外的依赖项,即可使用 未加密 私钥进行密钥对身份验证。要使用加密私钥,请下载 Bouncy Castle (https://www.bouncycastle.org/) 加密库(一个 JAR 文件)。Snowflake 使用Bouncy Castle解密用于登录的加密 RSA 私钥:

  • https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/1.0.1 (https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/1.0.1)

  • https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/1.0.3 (https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/1.0.3)

将这些文件下载到与 Kafka Connector JAR 文件相同的本地文件夹中。

连接器的源代码可从以下网址获取:https://github.com/snowflakedb/snowflake-kafka-connector (https://github.com/snowflakedb/snowflake-kafka-connector)。

安装 Kafka Connector

使用安装其他连接器的说明安装 Kafka Connector:

https://docs.confluent.io/current/connect/userguide.html (https://docs.confluent.io/current/connect/userguide.html)

为开源 Apache Kafka 安装连接器

本节说明了如何为开源 Apache Kafka 安装和配置 Kafka Connector。

安装 Apache Kafka

  1. 从 Kafka 官网下载 Kafka 包:https://kafka.apache.org/downloads (https://kafka.apache.org/downloads)。

  2. 在终端窗口中,切换为下载包文件的目录。

  3. 执行以下命令解压:file:kafka_<scala_version>-<kafka_version>.tgz 文件:

    tar xzvf kafka_<scala_version>-<kafka_version>.tgz
    
    Copy

安装 JDK

安装并配置 Java Development Kit (JDK)。使用 JDK 的 Standard Edition (SE) 测试 Snowflake。Enterprise Edition (EE) 预计会兼容,但尚未进行测试。

如果您已经完成此步骤,则可以跳过此部分。

  1. 从 https://www.oracle.com/technetwork/java/javase/downloads/index.html (https://www.oracle.com/technetwork/java/javase/downloads/index.html) 下载 JDK。

  2. 安装或解压 JDK。

  3. 按照适用于您操作系统的说明,设置环境变量 JAVA_HOME,使其指向包含 JDK 的目录。

下载 Kafka Connector JAR 文件

  1. 从 Maven Central Repository 下载 Kafka Connector JAR 文件:

    https://mvnrepository.com/artifact/com.snowflake (https://mvnrepository.com/artifact/com.snowflake)

  2. JAR 文件不需要任何额外的依赖项,即可使用 未加密 私钥进行密钥对身份验证。要使用加密私钥,请下载 Bouncy Castle (https://www.bouncycastle.org/) 加密库(一个 JAR 文件)。Snowflake 使用Bouncy Castle解密用于登录的加密 RSA 私钥:

    • https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/1.0.1 (https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/1.0.1)

    • https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/1.0.3 (https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/1.0.3)

  3. 如果您的 Kafka 数据是以 Apache Avro (https://avro.apache.org/) 格式流式传输的,那么请下载 Avro JAR 文件:

    https://mvnrepository.com/artifact/org.apache.avro/avro (https://mvnrepository.com/artifact/org.apache.avro/avro)

连接器的源代码可从以下网址获取:https://github.com/snowflakedb/snowflake-kafka-connector (https://github.com/snowflakedb/snowflake-kafka-connector)。

安装 Kafka Connector

将您在 下载 Kafka Connector JAR 文件 中下载的 JAR 文件复制到 <kafka_dir>/libs 文件夹中。

配置 Kafka Connector

该连接器通过创建一个文件进行配置,该文件中指定了诸如 Snowflake 登录凭据、主题名称、Snowflake 表名称等参数。

重要

Kafka Connect 框架会将 Kafka Connector 的配置设置从主节点广播到工作节点。配置设置包括敏感信息(特别是 Snowflake 用户名和私钥)。确保确保 Kafka Connect 节点之间的通信渠道的安全。有关说明,请参阅 Apache Kafka 软件的文档。

每个配置文件会为数据库和该数据库中的架构指定主题和相应的表。请注意,连接器可以从任意数量的主题中引入消息,但相应的表必须全部存储在同一数据库和架构中。

本部分提供了分布式模式和独立模式的说明。

有关配置字段的描述,请参阅 Kafka 配置属性

重要

由于配置文件通常包含安全相关信息(例如私钥),因此请适当设置文件的读/写权限,以限制访问。

此外,可考虑将配置文件存储在安全的外部位置或密钥管理服务中。有关更多信息,请参阅 外部化密钥 (本主题内容)。

分布式模式

创建 Kafka 配置文件,例如:file:<path>/<config_file>.json。使用所有连接器配置信息填充该文件。该文件应为 JSON 格式。

配置文件示例

{
  "name":"XYZCompanySensorData",
  "config":{
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"topic1,topic2",
    "snowflake.topic2table.map": "topic1:table1,topic2:table2",
    "buffer.count.records":"10000",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"5000000",
    "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.cn:443",
    "snowflake.user.name":"jane.smith",
    "snowflake.private.key":"xyz123",
    "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
    "snowflake.database.name":"mydb",
    "snowflake.schema.name":"myschema",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
    "value.converter.schema.registry.url":"http://localhost:8081",
    "value.converter.basic.auth.credentials.source":"USER_INFO",
    "value.converter.basic.auth.user.info":"jane.smith:MyStrongPassword"
  }
}
Copy

独立模式

创建配置文件,例如:file:<kafka_dir>/config/SF_connect.properties。使用所有连接器配置信息填充该文件。

配置文件示例

connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=topic1,topic2
snowflake.topic2table.map= topic1:table1,topic2:table2
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=myorganization-myaccount.snowflakecomputing.cn:443
snowflake.user.name=jane.smith
snowflake.private.key=xyz123
snowflake.private.key.passphrase=jkladu098jfd089adsq4r
snowflake.database.name=mydb
snowflake.schema.name=myschema
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeAvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=jane.smith:MyStrongPassword
Copy

Kafka 配置属性

可在 Kafka 配置文件中为分布式模式或独立模式设置以下属性:

所需属性

name

应用程序名称。该名称在客户使用的所有 Kafka Connector 中必须是唯一的。此名称必须是有效的 Snowflake 无引号标识符。有关有效标识符的信息,请参阅 标识符要求

connector.class

com.snowflake.kafka.connector.SnowflakeSinkConnector

topics

以逗号分隔的主题列表。默认情况下,Snowflake 会假定表名称与主题名称相同。如果表名称与主题名称不一致,则使用可选的 topic2table.map 参数(如下)指定从主题名称到表名称的映射。表名称必须是有效的 Snowflake 无引号标识符。有关有效表名称的信息,请参阅 标识符要求

备注

需要使用 topicstopics.regex 中的 任一项 ;而不是两者。

topics.regex

这是一个正则表达式(“regex”),用于指定包含要加载到 Snowflake 表中的消息的主题。连接器会从与 regex 匹配的任何主题名称中加载数据。regex 必须遵循 Java 正则表达式的规则(即与 java.util.regex.Pattern 兼容)。配置文件应包含 topicstopics.regex 中的 任一项,而不是两者。

snowflake.url.name

URL 用于访问您的 Snowflake 账户。URL 必须包括您的 :doc:` 账户标识符 </user-guide/admin-account-identifier>` 。请注意,协议 (https://) 和端口号是可选的。

snowflake.user.name

Snowflake 账户的用户登录名。

snowflake.private.key

用于验证用户身份的私钥。仅包含密钥,不包含标头或页脚。如果密钥分为多行,请删除换行符。您可以提供未加密的密钥,也可以提供加密的密钥并提供 snowflake.private.key.passphrase 参数,允许 Snowflake 解密密钥。当且仅当 snowflake.private.key 参数值已加密时才使用此参数。这将解密按照 `使用密钥对身份验证和密钥轮换 `_ (本主题内容)中的说明加密的私钥。

备注

此外,请参阅 可选属性 (本主题内容)中的 snowflake.private.key.passphrase

snowflake.database.name

包含要插入行的表的数据库名称。

snowflake.schema.name

包含要插入行的表的架构名称。

header.converter

仅当记录采用 Avro 格式且包含标头时才需要。其值为 "org.apache.kafka.connect.storage.StringConverter"

key.converter

这是 Kafka 记录的密钥转换器(例如 "org.apache.kafka.connect.storage.StringConverter")。Kafka Connector 不使用这个转换器,但 Kafka Connect 平台需要它。

有关当前限制的信息,请参阅 Kafka Connector 限制

value.converter

如果记录采用 JSON 格式,此值应为 "com.snowflake.kafka.connector.records.SnowflakeJsonConverter"

备注

"com.snowflake.kafka.connector.records.SnowflakeJsonConverter" 按原样反序列化记录。每个 json 字段都被视为一个记录字段,不会对架构或包含元数据的任何其他字段进行特殊处理。

如果记录采用 Avro 格式并使用 Kafka 的架构注册表服务,此值应为 "com.snowflake.kafka.connector.records.SnowflakeAvroConverter"

如果记录采用 Avro 格式并包含架构(因此不需要 Kafka 的架构注册表服务),此值应为 "com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry"

如果记录采用纯文本格式,此值应为 "org.apache.kafka.connect.storage.StringConverter"

有关当前限制的信息,请参阅 Kafka Connector 限制

可选属性

snowflake.private.key.passphrase

如果此参数的值不为空,那么 Kafka 会使用此短语来尝试解密私钥。

tasks.max

任务数,通常与Kafka Connect集群中工作节点的 CPU 内核数相同。这个数字可以设置得更低或更高;但是,Snowflake 不建议将其设置得更高。

snowflake.topic2table.map

此可选参数允许用户指定哪些主题应映射到哪些表。每个主题及其表名称要用冒号隔开(请参阅下面的示例)。表名称必须是有效的 Snowflake 无引号标识符。有关有效表名称的信息,请参阅 标识符要求

buffer.count.records

在引入到 Snowflake 之前,每个 Kafka 分区在内存中缓冲的记录数。默认值为 10000 条记录。

buffer.flush.time

缓冲区刷新之间的间隔秒数,其中刷新是从 Kafka 的内存缓存到内部暂存区。默认值为 120 秒。

buffer.size.bytes

记录作为数据文件引入到 Snowflake 之前,每个 Kafka 分区内存中缓冲记录的累计大小(以字节为单位)。默认值为 5000000 (5 MB)。

记录在写入数据文件时被压缩。因此,缓冲区中记录的大小可能大于从记录创建的数据文件的大小。

value.converter.schema.registry.url

如果采用 Avro 格式并且您正在使用架构注册表服务,则此值应为架构注册服务的 URL。否则该字段应为空。

value.converter.break.on.schema.registry.error

如果从架构注册表服务加载 Avro 数据,此属性确定 Kafka Connector 在获取架构 ID 时遇到错误时是否应停止使用记录。默认值为 false。将值设为 true 可启用此行为。

受 Kafka Connector 版本 1.4.2(及更高版本)支持。

jvm.proxy.host

要使 Snowflake Kafka Connector 能够通过代理服务器访问 Snowflake,请设置此参数以指定代理服务器的主机。

jvm.proxy.port

要使 Snowflake Kafka Connector 能够通过代理服务器访问 Snowflake,请设置此参数以指定代理服务器的端口。

jvm.proxy.username

通过代理服务器进行身份验证的用户名。

受 Kafka Connector 版本 1.4.4(及更高版本)支持。

jvm.proxy.password

通过代理服务器进行身份验证的用户名的密码。

受 Kafka Connector 版本 1.4.4(及更高版本)支持。

value.converter.basic.auth.credentials.source

如果使用 Avro 数据格式并需要安全访问 Kafka架构注册表,请将此参数设置为字符串“USER_INFO”,并设置如下所述的 value.converter.basic.auth.user.info 参数。否则,省略该参数。

value.converter.basic.auth.user.info

如果使用 Avro 数据格式并需要安全访问 Kafka架构注册表,请将此参数设置为字符串“<user_ID>:<password>”,并设置如上所述的 value.converter.basic.auth.credentials.source 参数。否则,省略该参数。

snowflake.metadata.createtime

如果值设置为 FALSE,从 RECORD_METADATA 列的元数据中将省略 CreateTime 属性值。默认值为 TRUE。

受 Kafka Connector 版本 1.2.0(及更高版本)支持。

snowflake.metadata.topic

如果值设置为 FALSE,从 RECORD_METADATA 列的元数据中将省略 topic 属性值。默认值为 TRUE。

受 Kafka Connector 版本 1.2.0(及更高版本)支持。

snowflake.metadata.offset.and.partition

如果值设置为 FALSE,从 RECORD_METADATA 列的元数据中将省略 OffsetPartition 属性值。默认值为 TRUE。

受 Kafka Connector 版本 1.2.0(及更高版本)支持。

snowflake.metadata.all

如果值设置为 FALSE, RECORD_METADATA 列中的元数据将完全为空。默认值为 TRUE。

受 Kafka Connector 版本 1.2.0(及更高版本)支持。

transforms

指定跳过 Kafka Connector 遇到的逻辑删除记录,不将它们加载到目标表中。逻辑删除记录定义为整个值字段为 null 的记录。

将属性值设置为 "tombstoneHandlerExample"

备注

此属性仅用于 Kafka 社区转换器(即 value.converter 属性值)(例如 org.apache.kafka.connect.json.JsonConverterorg.apache.kafka.connect.json.AvroConverter)。要使用 Snowflake 转换器管理逻辑删除记录处理,请改为使用 behavior.on.null.values 属性。

transforms.tombstoneHandlerExample.type

设置 transforms 属性时需要。

将属性值设置为 "io.confluent.connect.transforms.TombstoneHandler"

behavior.on.null.values

指定 Kafka Connector 应如何处理逻辑删除记录。逻辑删除记录定义为整个值字段为 null 的记录。 对于 Snowpipe,Kafka Connector 版本 1.5.5 及更高版本支持此属性。对于 Snowpipe Streaming,Kafka Connector 版本 2.1.0 及更高版本支持此属性。

此属性支持以下值:

DEFAULT

当 Kafka Connector 遇到逻辑删除记录时,它会在内容列中插入一个空的 JSON 的字符串。

IGNORE

Kafka Connector 会跳过逻辑删除记录,并且不会为这些记录插入行。

默认值为 DEFAULT

备注

逻辑删除记录的引入因引入方法而异:

  • 对于 Snowpipe,Kafka Connector 仅使用 Snowflake 转换器。要使用 Kafka 社区转换器管理逻辑删除记录处理,请改为使用 transformtransforms.tombstoneHandlerExample.type 属性。

  • 对于 Snowpipe Streaming,Kafka Connector 仅使用社区转换器。

发送到 Kafka Broker 的记录不得为 NULL,因为这些记录会被 Kafka Connector 删除,从而导致偏移丢失。在特定用例中,丢失的偏移会破坏 Kafka Connector。建议您使用逻辑删除记录而不是 NULL 记录。

使用密钥对身份验证和密钥轮换

Kafka Connector 依赖于密钥对身份验证,而不是基本身份验证(即用户名和密码)。此身份验证方法需要 2048 位(最低)RSA 密钥对。使用 OpenSSL 生成公钥-私钥对。公钥分配给配置文件中定义的 Snowflake 用户。

完成本页上的密钥身份验证说明和 密钥对轮换 说明后,评估 外部化密钥 建议(本主题内容)。

要配置公钥/私钥对,请执行以下操作:

  1. 在终端窗口中的命令行生成私钥。

    您可以生成加密版或未加密版的私钥。

    备注

    Kafka Connector 支持经过验证且符合联邦信息处理标准 (140-2)(即 FIPS 140-2)的加密算法要求。有关更多信息,请参阅 FIPS 140-2 (https://csrc.nist.gov/publications/detail/fips/140/2/final)。

    要生成未加密的版本,请使用以下命令:

    $ openssl genrsa -out rsa_key.pem 2048
    
    Copy

    要生成加密的版本,请使用以下命令:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
    
    Copy

    其中 <algorithm> 是符合 FIPS 140-2 标准的加密算法。

    例如,指定 AES 256 作为加密算法:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
    
    Copy

    如果生成加密版的私钥,请记录加密短语。稍后,您将在 Kafka 配置文件 snowflake.private.key.passphrase 属性中指定该加密短语。

    示例 PEM 私钥

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
    Copy
  2. 从命令行中,通过引用私钥生成公钥:

    假设私钥已加密并包含在名为 rsa_key.p8 的文件中,使用以下命令:

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    
    Copy

    示例 PEM 公钥

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
    Copy
  3. 将公钥和私钥文件复制到本地目录进行存储。记录文件的路径。请注意,私钥使用 PKCS#8(公钥加密标准)格式存储,并使用您在上一步中指定的加密短语进行加密;但仍应使用操作系统提供的文件权限机制保护文件,防止未经授权的访问。您有责任在不使用文件时保护该文件。

  4. 登录 Snowflake。使用 ALTER USER 将公钥分配给 Snowflake 用户。例如:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    
    Copy

    备注

    • 只有安全管理员(即具有 SECURITYADMIN 角色的用户)或更高级别的用户才能更改用户。

    • 在 SQL 语句中排除公钥标头和页脚。

    使用 DESCRIBE USER 验证用户的公钥指纹:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    
    Copy

    备注

    RSA_PUBLIC_KEY_2_FP 属性在 配置密钥对轮换 中作了说明。

  5. 将整个私钥复制并粘贴到配置文件中的 snowflake.private.key 字段。保存文件。

外部化密钥

Snowflake 强烈建议将私钥等密钥外部化,并以加密形式存储或存储在密钥管理服务中,例如 AWS Key Management Service (KMS)、Microsoft Azure Key Vault,或 HashiCorp Vault。这可以通过在Kafka Connect集群上使用 ConfigProvider 来完成。

有关更多信息,请参阅 Confluent 对此 服务 (https://docs.confluent.io/current/connect/security.html#externalizing-secrets) 的描述。

启动 Kafka

使用第三方 Confluent 或 Apache Kafka 文档中提供的说明启动 Kafka。

启动 Kafka Connector

您可以在分布式模式或独立模式下启动 Kafka Connector。每种模式的说明如下所示:

分布式模式

在终端窗口中,执行以下命令:

curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors
Copy

独立模式

在终端窗口中,执行以下命令:

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties
Copy

(Apache Kafka 或 Confluent Kafka 的默认安装应该已经包含该 connect-standalone.properties 文件。)

测试和使用 Kafka Connector

我们建议在生产系统中使用 Kafka Connector 之前,先用少量数据对其进行测试。测试过程与正常使用连接器的过程相同:

  1. 验证 Kafka 和Kafka Connect是否正在运行。

  2. 验证是否已创建适当的 Kafka 主题。

  3. 创建(或使用现有的)消息发布器。确保发布到主题的消息具有正确的格式(JSON、Avro 或纯文本)。

  4. 创建一个配置文件,指定要订阅的主题和要写入的 Snowflake 表。有关说明,请参阅 配置 Kafka Connector (本主题内容)。

  5. (可选)创建要写入数据的表。此步骤是可选的;如果您不创建表,Kafka Connector 会为您创建表。如果不打算使用连接器将数据添加到现有的非空表,那么我们建议让连接器为您创建表,以尽量减少架构不匹配的可能性。

  6. 将 Snowflake 对象(数据库、架构、目标表等)所需的最低权限授予将用于引入数据的角色。

  7. 向配置的 Kafka 主题发布一组示例数据。

  8. 等待几分钟,让数据在系统中传播,然后检查 Snowflake 表以验证记录是否已插入。

小技巧

在测试和生产环境中将数据加载到 Snowflake 之前,请考虑使用 SnowCD 验证您与 Snowflake 的网络连接。

语言: 中文