Set up Openflow Connector for Kinesis Data Streams for JSON data format¶
备注
This connector is subject to the Snowflake Connector Terms.
This topic describes the set up steps for the Openflow Connector for Kinesis Data Streams for JSON data format. This is a simplified connector optimized for basic message ingestion with schema evolution capabilities.
The Openflow Connector for Kinesis Data Streams for JSON data format is designed for straightforward JSON message ingestion from Kinesis streams to Snowflake tables.
先决条件¶
Ensure that you have 设置 Openflow - BYOC or Set up Openflow - Snowflake Deployments.
If using Openflow - Snowflake Deployments, ensure that you've reviewed configuring required domains and have granted access to the required domains for the Kinesis connector.
备注
If you need the support of other data formats or features, such as DLQ, reach out to your Snowflake representative.
设置 Kinesis 流¶
作为 AWS 管理员,在您的 AWS 账户中执行以下操作:
Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB (https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html).
Ensure that the AWS User has configured Access Key credentials (https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html).
设置 Snowflake 账户¶
作为 Snowflake 账户管理员,请执行以下任务:
创建新角色或使用现有角色并授予 数据库权限。
创建目标数据库和目标架构,以创建用于存储数据的目标表。
如果您计划使用连接器的功能自动创建目标表(如果目标表尚不存在),请确保用户具有创建和管理 Snowflake 对象所需的权限:
对象
权限
备注
数据库
USAGE
架构
USAGE . CREATE TABLE .
创建架构级对象后,可以撤销 CREATE
object权限。表
OWNERSHIP
仅在使用 Kinesis 连接器将数据引入到 现有 表时需要。. 如果连接器为来自 Kinesis 流的记录创建新的目标表,则配置中指定的用户的默认角色将成为表所有者。
您可以使用以下脚本创建和配置自定义角色(需要 SECURITYADMIN 或等效角色):
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; -- Only for existing tables. GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
创建一个类型为 SERVICE 的新 Snowflake 服务用户。
向该 Snowflake 服务用户授予您在前面步骤中创建的角色。
GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user; ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
为第 3 步中的 Snowflake SERVICE 用户配置 密钥对认证。
Snowflake 强烈建议执行此步骤。配置 Openflow 支持的密钥管理器(例如 AWS、Azure 和 Hashicorp),并将公钥和私钥存储在密钥存储库中。
备注
如果您出于任何原因不希望使用密钥管理器,则您有责任根据组织的安全策略保护用于密钥对身份验证的公钥和私钥文件。
配置密钥管理器后,确定如何对其进行身份验证。在 AWS 中,建议您使用与 Openflow 关联的 EC2 实例角色,因为这样就无需保留其他密钥。
In Openflow, configure a Parameter Provider associated with this Secrets Manager, from the hamburger menu in the upper right. Navigate to Controller Settings » Parameter Provider and then fetch your parameter values.
此时,可以使用关联的参数路径引用所有凭据,无需在 Openflow 中保留敏感值。
If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.
设置连接器¶
作为数据工程师,执行以下任务以安装和配置连接器:
安装连接器¶
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
在 Openflow 连接器页面上,找到连接器并选择 Add to runtime。
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
备注
在安装连接器之前,请确保在 Snowflake 中为连接器创建了数据库和架构,用于存储引入的数据。
使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。
使用您的 Snowflake 账户凭据进行运行时身份验证。
此时将显示 Openflow 画布,其中添加了连接器进程组。
配置连接器¶
右键点击导入的进程组并选择 Parameters。
Populate the required parameter values as described in Parameters section below.
Parameters¶
This section describes all parameters for the Openflow Connector for Kinesis Data Streams for JSON data format.
The connector consists of a several modules. To see the set, double click on the connector process group. You will be able to set the parameters for each module in the module's parameter context.
Snowflake destination parameters¶
参数 |
描述 |
必填 |
|---|---|---|
目标数据库 |
The database where data will be persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. |
是 |
目标架构 |
The schema where data will be persisted, which must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. 请参阅以下示例:
|
是 |
已启用 Iceberg |
Whether Iceberg is enabled for table operations. One of |
是 |
Schema Evolution Enabled |
Enables or disables schema evolution at the connector level. When enabled, allows automatic schema changes for tables.
Note that schema evolution can also be controlled at the individual table level through table-specific parameters.
One of: |
是 |
Schema Evolution For New Tables Enabled |
Controls whether schema evolution is enabled when creating new tables. When set to 'true', new tables will be created with ENABLE_SCHEMA_EVOLUTION = TRUE parameter.
When set to 'false', new tables will be created with ENABLE_SCHEMA_EVOLUTION = FALSE parameter.
Not applicable to Iceberg tables as they are not being created automatically.
This setting only affects table creation, not existing tables. One of: |
是 |
Snowflake 账户标识符 |
使用以下方式时:
|
是 |
Snowflake 身份验证策略 |
使用以下方式时:
|
是 |
Snowflake 私钥 |
使用以下方式时:
|
否 |
Snowflake 私钥文件 |
使用以下方式时:
|
否 |
Snowflake 私钥密码 |
使用以下方式时:
|
否 |
Snowflake 角色 |
使用以下方式时:
|
是 |
Snowflake 用户名 |
使用以下方式时:
|
是 |
Kinesis JSON Source Parameters¶
参数 |
描述 |
必填 |
|---|---|---|
AWS 区域代码 |
您的 Kinesis Stream 所在的 AWS 区域,例如 |
是 |
AWS 访问密钥 ID |
The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
是 |
AWS 私密访问密钥 |
The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
是 |
Kinesis 应用程序名称 |
该名称用作 DynamoDB 表名,用于跟踪应用程序在 Kinesis Stream 使用方面的进度。 |
是 |
Kinesis 初始流位置 |
数据开始复制的初始流位置。
|
是 |
Kinesis Stream 名称 |
用于使用数据的 AWS Kinesis Stream 名称。 |
是 |
Metrics Publishing |
Specifies where Kinesis Client Library metrics are published to. Possible values: |
是 |
运行流¶
右键点击“飞机”图标并选择 Enable all Controller Services。
Right-click on the connector's process group and select Start.
连接器开始数据引入。
Table Schema¶
The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages.
The connector also adds a KINESISMETADATA column which stores metadata about the record.
Below is an example of a Snowflake table loaded by the connector:
行 |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
KINESISMETADATA |
|---|---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
{ ... KINESISMETADATA object ... } |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
{ ... KINESISMETADATA object ... } |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
{ ... KINESISMETADATA object ... } |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
{ ... KINESISMETADATA object ... } |
The KINESISMETADATA column contains an object with the following fields:
Field Name |
Field Type |
Example Value |
描述 |
|---|---|---|---|
|
String |
|
The name of the Kinesis stream the record came from. |
|
String |
|
The identifier of the shard in the stream the record came from. |
|
String |
|
The approximate time that the record was inserted into the stream (ISO 8601 format). |
|
String |
|
The partition key specified by the data producer for the record. |
|
String |
|
The unique sequence number assigned by Kinesis Data Streams to the record in the shard. |
|
Number |
|
The subsequence number for the record (used for aggregated records with the same sequence number). |
|
String |
|
A combination of the sequence number and the subsequence number for the record. |
架构演化¶
This connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector.
Snowflake detects the schema of the incoming data and loads data into tables
that match any user-defined schema. Snowflake also allows adding
new columns or dropping the NOT NULL constraint from columns missing in new incoming records.
Schema detection with the connector infers data types based on the JSON data provided.
If the connector creates the target table, schema evolution is enabled by default.
If you want to enable or disable schema evolution on an existing table,
use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter.
You must also use a role that has the OWNERSHIP privilege on the table. For more information, see 表模式演化.
However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.
Iceberg table support¶
Openflow Connector for Kinesis Data Streams can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.
要求和限制¶
Before you configure the Openflow Connector for Kinesis Data Streams for Iceberg table ingestion, note the following requirements and limitations:
在运行连接器之前,必须创建 Iceberg 表。
确保用户有权将数据插入到创建的表中。
配置和设置¶
To configure the Openflow Connector for Kinesis Data Streams for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis Data Streams for JSON data format with a few differences noted in the following sections.
启用向 Iceberg 表中引入数据¶
要启用向 Iceberg 表中引入功能,必须将 Iceberg Enabled 参数设置为 true。
创建用于引入的 Iceberg 表¶
Before you run the connector, you must create an Iceberg table.
The initial table schema depends on your connector Schema Evolution Enabled property settings.
With enabled schema evolution, you must create a table with a column named kinesisMetadata.
The connector automatically creates the columns for message fields and alters the kinesisMetadata column schema.
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ENABLE_SCHEMA_EVOLUTION = true;
If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn't supported. Instead, use a structured OBJECT or MAP.
例如,请考虑以下消息:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
The following statement creates a table with all fields the Kinesis message contains:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT(
stream STRING,
shardId STRING,
approximateArrival STRING,
partitionKey STRING,
sequenceNumber STRING,
subSequenceNumber INTEGER,
shardedSequenceNumber STRING
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
备注
kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.