设置 Openflow Connector for Kafka

备注

This connector is subject to the Snowflake Connector Terms.

先决条件

  1. 确保您已查看 Snowflake Openflow Connector for Kafka

  2. Ensure that you have 设置 Openflow - BYOC or Set up Openflow - Snowflake Deployments.

  3. If using Openflow - Snowflake Deployments, ensure that you've reviewed configuring required domains and have granted access to the required domains for the Kafka connector. The connector must be able to connect to all Kafka brokers in the cluster.

设置 Snowflake 账户

作为 Snowflake 账户管理员,请执行以下任务:

  1. 创建类型为 SERVICE 的新 Snowflake 服务用户。

  2. Create a new role or use an existing role and grant the Database privileges.

    The connector requires a user to create the destination table. Make sure the user has the required privileges for managing Snowflake objects:

    对象

    权限

    备注

    数据库

    USAGE

    架构

    USAGE

    OWNERSHIP

    Required for the connector to ingest data into a table.

    Snowflake recommends creating a separate user and role for each Kafka Cluster for better access control.

    您可以使用以下脚本创建和配置自定义角色(需要 SECURITYADMIN 或等效角色):

    USE ROLE securityadmin;
    CREATE ROLE openflow_kafka_connector_role_1;
    
    GRANT USAGE ON DATABASE kafka_db TO ROLE openflow_kafka_connector_role_1;
    GRANT USAGE ON SCHEMA kafka_schema TO ROLE openflow_kafka_connector_role_1;
    

    备注

    Privileges must be granted directly to the connector role and cannot be inherited.

  3. Configure the destination table

    Snowflake highly recommends using server-side schema evolution for schema changes and an error table for DML error logging. The following example shows how to create a table and add proper OWNERSHIP permissions.

    USE ROLE openflow_kafka_connector_role_1;
    
    CREATE TABLE kafka_db.kafka_schema.<DESTINATION_TABLE_NAME> (
      kafkaMetadata variant
    )
    ENABLE_SCHEMA_EVOLUTION = TRUE
    ERROR_LOGGING = TRUE;
    
    USE ROLE securityadmin;
    GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE openflow_kafka_connector_role_1;
    

    The connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector. It automatically maps the record content's first-level keys to table columns matching by name (case-insensitive).

    With Schema evolution enabled, Snowflake can automatically expand the destination table by adding new columns that are detected in the incoming stream and dropping NOT NULL constraints to accommodate new data patterns. For more information, see Table schema evolution.

    If ENABLE_SCHEMA_EVOLUTION isn't enabled, you must create the schema manually by extending the table definition. The connector tries to match the record content's first-level keys to the table columns by name. If keys from the JSON don't match the table columns, the connector ignores the keys.

  4. (Optional) Configure a secrets manager

    Snowflake 强烈建议执行此步骤。配置 Openflow 支持的密钥管理器(例如 AWS、Azure 和 Hashicorp),并将公钥和私钥存储在密钥存储库中。

    1. Determine how you'll authenticate to the secrets manager after it's configured. On AWS, Snowflake recommends using the EC2 instance role associated with Openflow so no other secrets need to be persisted.

    2. Configure a Parameter Provider associated with this Secrets Manager in Openflow from the hamburger menu in the upper right. Navigate to Controller Settings > Parameter Provider and fetch your parameter values.

    3. Reference all credentials with the associated parameter paths so no sensitive values need to be persisted within Openflow.

  5. Grant access to users

    For any other Snowflake users who require access to the raw ingested data by the connector (for example, for custom processing in Snowflake), grant those users the role created in step 1.

设置连接器

作为数据工程师,执行以下任务以安装和配置连接器:

安装连接器

To install the connector, do the following:

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. On the Openflow connectors page, find the connector and select Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and select Add.

    备注

    Before you install the connector, ensure that you have created a database, schema, and a table in Snowflake for the connector to store ingested data.

  4. Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.

  5. 使用您的 Snowflake 账户凭据进行运行时身份验证。

    此时将显示 Openflow 画布,其中添加了连接器进程组。

配置连接器

  1. If needed, customize the connector configuration before configuring the built-in parameters.

  2. 填充进程组参数

    1. Right click on the imported process group and select Parameters.

    2. Fill out the required parameter values

Parameters

The following table describes the parameters for the Openflow Connector for Kafka:

参数

描述

必填

Kafka 自动偏移重置

Automatic offset configuration applied when no previous consumer offset is found corresponding to Kafka auto.offset.reset property.

Possible values: earliest: automatically reset the offset to the earlier offset, latest: automatically reset the offset to the latest offset, none: throw exception to the consumer if no previous offset found for the consumer group.

Default: latest

Kafka 启动服务器

A comma-separated list of Kafka bootstrap servers, should contain a port, for example kafka-broker:9092.

Kafka Consumer Group ID

连接器使用的使用者组的 ID。可以是任意唯一 ID。

Kafka SASL 密码

Password provided with configured password when using SASL512 SCRAM Mechanism

Kafka SASL 用户名

Username provided with configured password when using SASL512 SCRAM Mechanism

Kafka 主题格式

One of: names / pattern. Specifies whether the "Kafka Topics" provided are a comma separated list of names or a single regular expression.

Kafka 主题

以逗号分隔的 Kafka 主题列表或正则表达式。

Snowflake Destination Database

The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

Snowflake Destination Schema

The schema where data is persisted, which must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

请参阅以下示例:

CREATE SCHEMA SCHEMA_NAME or CREATE SCHEMA schema_name: use SCHEMA_NAME.

CREATE SCHEMA "schema_name" or CREATE SCHEMA "SCHEMA_NAME": use schema_name or SCHEMA_NAME, respectively.

Snowflake Destination Table

The table where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

Start the connector

  1. Right-click on the plane and select Enable all Controller Services.

  2. Right-click on the plane and select Start. The connector starts data ingestion.

Understanding KAFKAMETADATA column

The connector populates the KAFKAMETADATA structure with metadata about the Kafka record. The structure contains the following information:

Field

Data Type

描述

topic

String

The name of the Kafka topic that the record came from.

partition

number

The number of the partition within the topic. (Note that this is the Kafka partition, not the Snowflake micro-partition.)

offset

number

The offset in that partition.

timestamp

number

Timestamp when the record was added to Kafka.

key

String

If the message is a Kafka KeyedMessage, this is the key for that message. In order for the connector to store the key in the RECORD_METADATA, the key.converter parameter in the Kafka configuration properties must be set to org.apache.kafka.connect.storage.StringConverter; otherwise, the connector ignores keys.

headers

对象

A header is a user-defined key-value pair associated with the record. Each record can have 0, 1, or multiple headers.

Measuring ingestion latency

For change tracking, incremental processing, and time-travel queries based on row modification time, the ROW_TIMESTAMP feature can be used.

Enable it by running the following command on your destination table:

ALTER TABLE <DESTINATION_TABLE> SET ROW_TIMESTAMP = TRUE;

After row timestamps are enabled, tables expose the METADATA$ROW_LAST_COMMIT_TIME column, which returns the timestamp when each row was last modified.

For more information, see METADATA$ROW_LAST_COMMIT_TIME.

备注

Row timestamp isn't available for interactive tables. For more information, see Snowflake 交互式表和交互式仓库.

Using the connector with Apache Iceberg™ tables

The connector can ingest data into Snowflake-managed Apache Iceberg™ tables but must meet the following requirements:

  • You must have been granted the USAGE privilege on the external volume associated with your Apache Iceberg™ table.

  • You must create an Apache Iceberg™ table before running the connector.

Grant usage on an external volume

For example, if your Iceberg table uses the kafka_external_volume external volume and the connector uses the role openflow_kafka_connector_role, run the following statement:

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE openflow_kafka_connector_role;

Create an Apache Iceberg™ table for ingestion

The connector doesn't create Iceberg tables automatically and doesn't support schema evolution. Before you run the connector, you must create an Iceberg table manually.

When you create an Iceberg table, you can use Iceberg data types (including VARIANT) or compatible Snowflake types.

For example, consider the following message:

{
  "id": 1,
  "name": "Steve",
  "body_temperature": 36.6,
  "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
  "animals_possessed": {
    "dogs": true,
    "cats": false
  },
  "options": {
    "can_walk": true,
    "can_talk": false
  },
  "date_added": "2024-10-15"
}

To create an Iceberg table for the example message, use one of the following statements:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
  kafkaMetadata OBJECT(
    topic STRING,
    partition INTEGER,
    offset INTEGER,
    key STRING,
    headers variant,
    timestamp INTEGER
  ),
  id INT,
  name string,
  body_temperature float,
  approved_coffee_types array(string),
  animals_possessed variant,
  date_added date,
  options object(can_walk boolean, can_talk boolean)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ICEBERG_VERSION = 3;

Using the connector with Interactive Tables

Interactive tables are a special type of Snowflake table optimized for low-latency, high-concurrency queries. For more information, see Snowflake 交互式表和交互式仓库.

  1. Create an interactive table:

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) AS (SELECT
          $1:M_NAME::VARCHAR,
          $1:M_VALUE::NUMBER,
          $1:RECORD_METADATA.topic::VARCHAR,
          $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
          from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    

Important considerations:

  • Interactive tables have specific limitations and query restrictions. Review Snowflake 交互式表和交互式仓库 before using them with the connector.

  • For interactive tables, any required transformations must be handled in the table definition.

  • Interactive warehouses are required to query interactive tables efficiently.

Using the connector with a customer-defined schema for the destination table

The connector treats each Kafka record as a row to be inserted into a Snowflake table. For example, if you have a Kafka topic with the content of the message structured like the following JSON:

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}

By default you don't have to specify all fields from the JSON thanks to the ENABLE_SCHEMA_EVOLUTION = TRUE feature. However, if you prefer a static schema, it can be created by running:

CREATE TABLE ORDERS (
  kafkaMetadata OBJECT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total FLOAT,
  ispaid BOOLEAN
);

Using the connector with a customer-defined PIPE

If you choose to create your own pipe, you can define the data transformation logic in the pipe's COPY INTO statement. You can rename columns as required and cast the data types as needed. For example:

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);

CREATE PIPE ORDERS AS
COPY INTO ORDERS
SELECT
  $1:order_id::STRING,
  $1:customer_name,
  $1:order_total::STRING,
  $1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'));

When you define your own pipe, your destination table columns don't need to match the JSON keys. You can rename the columns to your desired names and cast the data types if required.

To adjust the connector to work with a custom pipe, perform the following tasks:

  1. Right-click on the PublishSnowpipeStreaming processor used in your Kafka ingestion flow in the Openflow canvas.

  2. Select Configure from the context menu.

  3. Navigate to the Properties tab.

  4. In the Destination type field, pick Pipe.

  5. In the Pipe field, type the name of your PIPE.

  6. Select Apply to save the configuration.

Customizing error handling

Error handling is split between Openflow-side failures and server-side failures within the Snowpipe Streaming service.

  • Openflow Errors (Client-Side Failures): Errors such as unparseable payloads or custom transformation failures occur before records reach Snowflake. By default these records are discarded. It's possible to process these errors in Openflow - use FlowFiles from the parse failure relationship in the ConsumeKafka processor.

  • Snowpipe Streaming Errors (Server-Side Failures): Errors for records that successfully reach Snowflake but are incompatible with the destination table's schema (for example, type mismatches) are captured by the Snowflake infrastructure. When error logging is enabled on the destination table (error_logging = true), these failed rows are automatically ingested into the destination Error table.

Performance tuning

Openflow Connector for Kafka 的性能调整