设置 Openflow Connector for Kinesis

备注

使用该连接器需遵守 连接器条款

本主题介绍设置 Openflow Connector for Kinesis 的步骤。

先决条件

  1. 确保您已查看 关于 Openflow Connector for Kinesis

  2. 确保您已 设置 Openflow

设置 Kinesis 流

作为 AWS 管理员,在您的 AWS 账户中执行以下操作:

  1. 请确保您拥有一个具有 IAM 权限的 AWS 账户,以便访问 Kinesis Streams 和 DynamoDB (https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html)。

  2. (可选)创建死信队列 (DLQ) Kinesis Stream。无法成功解析的消息可以重定向到指定的 DLQ。

设置 Snowflake 账户

作为 Snowflake 账户管理员,请执行以下任务:

  1. 创建新角色或使用现有角色并授予 数据库权限

  2. 创建目标数据库和目标架构,以创建用于存储数据的目标表。

    1. 如果您计划使用连接器的功能自动创建目标表(如果目标表尚不存在),请确保用户具有创建和管理 Snowflake 对象所需的权限:

      对象

      权限

      备注

      数据库

      USAGE

      架构

      USAGE . CREATE TABLE .

      创建架构级对象后,可以撤销 CREATE object 权限。

      OWNERSHIP

      仅在使用 Kinesis 连接器将数据引入到 现有 表时需要。. 如果连接器为来自 Kinesis 流的记录创建新的目标表,则配置中指定的用户的默认角色将成为表所有者。

      您可以使用以下脚本创建和配置自定义角色(需要 SECURITYADMIN 或等效角色):

      USE ROLE SECURITYADMIN;
      
      CREATE ROLE kinesis_connector_role_1;
      GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role_1;
      GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1;
      GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1;
      
      -- Only for existing tables
      GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kinesis_connector_role_1;
      
      Copy
  3. 创建一个类型为 SERVICE 的新 Snowflake 服务用户。

  4. 向该 Snowflake 服务用户授予您在前面步骤中创建的角色。

    GRANT ROLE kinesis_connector_role_1 TO USER kinesis_connector_user_1;
    ALTER USER kinesis_connector_user_1 SET DEFAULT_ROLE = kinesis_connector_role_1;
    
    Copy
  5. 为第 3 步中的 Snowflake SERVICE 用户配置 密钥对认证

  6. Snowflake 强烈建议执行此步骤。配置 Openflow 支持的密钥管理器(例如 AWS、Azure 和 Hashicorp),并将公钥和私钥存储在密钥存储库中。

    备注

    如果您出于任何原因不希望使用密钥管理器,则您有责任根据组织的安全策略保护用于密钥对身份验证的公钥和私钥文件。

    1. 配置密钥管理器后,确定如何对其进行身份验证。在 AWS 中,建议您使用与 Openflow 关联的 EC2 实例角色,因为这样就无需保留其他密钥。

    2. 在 Openflow 中,从右上角的汉堡菜单中配置与此密钥管理器关联的参数提供商。导航到 Controller Settings » Parameter Provider,然后提取您的参数值。

    3. 此时,可以使用关联的参数路径引用所有凭据,无需在 Openflow 中保留敏感值。

  7. 如果任何其他 Snowflake 用户需要访问连接器引入的原始文档和表(例如,在 Snowflake 中进行自定义处理),则授予这些用户在步骤 1 中创建的角色。

  8. 指定一个仓库供连接器使用。从最小的仓库规模开始,然后根据要复制的表数量和传输的数据量来测试规模。相较于扩大仓库规模,采用 多集群仓库 通常能更有效地应对海量表数量的扩展需求。

设置连接器

作为数据工程师,执行以下任务以安装和配置连接器:

安装连接器

  1. 导航到 Openflow“Overview”页面。在 Featured connectors 部分中,选择 View more connectors

  2. 在 Openflow 连接器页面上,找到连接器并选择 Add to runtime

  3. Select runtime 对话框中,从 Available runtimes 下拉列表中选择您的运行时。

  4. 选择 Add

    备注

    在安装连接器之前,请确保在 Snowflake 中为连接器创建了数据库和架构,用于存储引入的数据。

  5. 使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。

  6. 使用您的 Snowflake 账户凭据进行运行时身份验证。

此时将显示 Openflow 画布,其中添加了连接器进程组。

配置连接器

  1. 右键点击导入的进程组并选择 Parameters

  2. 按照 流参数 中所述填充所需的参数值。

流参数

本节介绍您可以根据以下参数上下文配置的流参数:

Kinesis 源参数

参数

描述

AWS 区域代码

您的 Kinesis Stream 所在的 AWS 区域,例如 us-west-2

AWS 访问密钥 ID

用于连接您的 Kinesis Stream 和 DynamoDB 的 AWS 访问密钥 ID。

AWS 私密访问密钥

用于连接您的 Kinesis Stream 和 DynamoDB 的 AWS 私密访问密钥。

架构注册表 URL

AVRO 架构注册表的 URL。如果将 AVRO 架构访问策略参数设置为 schema-reference-reader,则这是必需的。

架构注册表身份验证类型

AVRO 架构注册表使用的身份验证类型。如果将 AVRO 架构访问策略参数设置为 schema-reference-reader,则这是必需的。

可能的值:
  • NONE:未使用身份验证

  • BASIC:使用的用户名/密码身份验证方法

架构注册表用户名

用于对 AVRO 架构注册表进行 BASIC 身份验证的用户名。如果将架构注册表身份验证类型参数设置为 BASIC,则这是必需的。

架构注册表密码

用于对 AVRO 架构注册表进行 BASIC 身份验证的密码。如果将架构注册表身份验证类型参数设置为 BASIC,则这是必需的。

Kinesis 目的地参数

参数

描述

目标数据库

用于永久保存数据的数据库。它必须已经存在于 Snowflake 中。

目标架构

用于永久保存数据的架构。它必须已经存在于 Snowflake 中。此参数区分大小写。

Snowflake 账户标识符

Snowflake 账户名称格式为 [organization-name]-[account-name],数据将永久保存在其中。

Snowflake 身份验证策略

对 Snowflake 进行身份验证的策略。可能的值:当您在 SPCS 上运行流时为 SNOWFLAKE_SESSION_TOKEN,当您想使用私钥设置访问权限时为 KEY_PAIR

Snowflake 私钥

用于身份验证的 RSA 私钥。RSA 密钥必须按照 PKCS8 标准进行格式化,并具有标准的 PEM 头尾标记。请注意,必须定义 Snowflake 私钥文件或 Snowflake 私钥。

Snowflake 私钥文件

该文件包含用于对 Snowflake 进行身份验证的 RSA 私钥,该私钥根据 PKCS8 标准进行格式化,并具有标准的 PEM 头尾标记。头标记行以 -----BEGIN PRIVATE 开头。选中 Reference asset 复选框,上传私钥文件。

Snowflake 私钥密码

与 Snowflake 私钥文件关联的密码。

Snowflake 角色

在查询执行期间使用的 Snowflake 角色。

Snowflake 用户名

用于连接到 Snowflake 实例的用户名。

Snowflake 仓库

用于运行查询的 Snowflake 仓库。此参数区分大小写。

Kinesis 引入参数

参数

描述

Kinesis 应用程序名称

该名称用作 DynamoDB 表名,用于跟踪应用程序在 Kinesis Stream 使用方面的进度。

Kinesis Stream 名称

用于使用数据的 AWS Kinesis Stream 名称。

Kinesis 初始流位置

数据开始复制的初始流位置。

可能的值:
  • LATEST:最新存储的记录

  • TRIM_HORIZON:最早存储的记录

Kinesis DLQ Stream 名称

发送所有处理失败记录的流名称。如果未添加此参数,则可以预计 Openflow 画布上连接器的 DLQ 相关部分会出现警告标志。

消息格式

Kinesis 中的消息格式。

可能的值:
  • JSON:JSON 是一种人类可读的消息格式,其架构可以从消息本身推断出来。

  • AVRO:AVRO 是一种消息格式,需要架构才能访问消息中的数据。

AVRO 架构访问策略

要访问 AVRO 消息格式中的数据,需要架构。此参数定义访问特定消息的 AVRO 架构的策略。如果将消息格式参数设置为 AVRO,则使用架构。

可能的值如下:
  • embedded-avro-schema:架构嵌入在记录本身中

  • schema-reference-reader:架构存储在 Confluent 架构注册表中

Kinesis Stream 到表映射

此可选参数允许用户指定哪些流应映射到哪些表。每个流及其表名应用冒号分隔。表名称必须是有效的 Snowflake 无引号标识符。正则表达式不能含糊不清,任何匹配的流只能匹配单个目标表。如果匹配项为空或未找到匹配项,则使用流名称作为表名。

示例:
  • stream1:low_range,stream2:low_range,stream5:high_range,stream6:high_range

  • stream[0-4]:low_range,stream[5-9]:high_range

已启用 Iceberg

指定处理器是否将数据引入 Iceberg 表。如果此属性与实际表类型不匹配,则处理器将失败。

可能的值如下:
  • true

  • false

运行流

  1. 右键点击“飞机”图标并选择 Enable all Controller Services

  2. 右键点击导入的进程组并选择 Start

连接器开始数据引入。

架构

连接器加载的 Snowflake 表包含以 Kinesis 消息的密钥命名的列。以下是此类表格的示例。

ACCOUNT

SYMBOL

SIDE

QUANTITY

1

ABC123

ZTEST

BUY

3572

2

XYZ789

ZABZX

SELL

3024

3

XYZ789

ZTEST

SELL

799

4

ABC123

ZABZX

BUY

2033

5

ABC123

ZTEST

BUY

1558

架构演化

当前,当 Iceberg Enabled 设置为 false 时。如果连接器创建了目标表,在默认情况下启用架构演化。如果要在现有表上启用或禁用架构演化,请使用 ALTER TABLE 命令来设置 ENABLE_SCHEMA_EVOLUTION 参数。您还必须使用对该表具有 OWNERSHIP 权限的角色。有关更多信息,请参阅 表模式演化

但是,如果对现有表禁用了架构演化,则连接器将尝试将架构不匹配的行发送到配置的死信队列 (DLQ)。

有关 Iceberg Enabled 设置为 true 的情况,请参阅段落 Apache Iceberg™ 表的架构演化

将 Openflow Connector for Kinesis 与 Apache Iceberg™ 表一起使用

Openflow Connector for Kinesis 可以将数据引入到 Snowflake 管理的 Apache Iceberg™ 表 中。

要求和限制

在为 Iceberg 表引入配置连接器之前,请注意以下要求和限制:

  • 在运行连接器之前,必须创建 Iceberg 表。

  • 确保用户有权将数据插入到创建的表中。

  • Iceberg 表不支持架构演化。

配置和设置

要配置连接器以引入 Iceberg 表,请按照 配置连接器 中的说明进行操作,但存在以下部分中描述的一些差异。

启用向 Iceberg 表中引入数据

要启用向 Iceberg 表中引入功能,必须将 Iceberg Enabled 参数设置为 true

创建用于引入的 Iceberg 表

在运行连接器之前,必须创建 Iceberg 表。由于不支持架构演化,因此必须创建包含 Kinesis 消息中的所有字段的表。

创建 Iceberg 表时,您可以使用 Iceberg 数据类型或 兼容的 Snowflake 类型。半结构化 VARIANT 类型不受支持。相反,使用 结构化的 OBJECT 或 MAP

例如,请考虑以下消息:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

要为示例消息创建 Iceberg 表,请使用以下语句:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

备注

嵌套结构(如 dogscats)内的字段名称区分大小写。

Apache Iceberg™ 表的架构演化

当前,该连接器不支持对 Apache Iceberg™ 表的架构进行演化。

已知问题

  • 连接器的进程组有一个名为“Upload Failure”的输出端口。它可以用来处理未成功上传到 Snowflake 的 FlowFiles。如果此端口未连接到连接器的进程组之外,它将显示一个可以忽略的警告标志。

  • 所有处理器停止后,可以下令运行一次。由于内部架构,ConsumeKinesisStream 处理器在下令运行一次后将无法完成任何有意义的工作。要使处理器开始工作,它必须启动并运行大约两分钟。

语言: 中文