设置 Openflow Connector for Kafka¶
备注
This connector is subject to the Snowflake Connector Terms.
先决条件¶
Ensure that you have 设置 Openflow - BYOC or Set up Openflow - Snowflake Deployments.
If using Openflow - Snowflake Deployments, ensure that you've reviewed configuring required domains and have granted access to the required domains for the Kafka connector. The connector must be able to connect to all Kafka brokers in the cluster.
设置 Snowflake 账户¶
作为 Snowflake 账户管理员,请执行以下任务:
创建类型为 SERVICE 的新 Snowflake 服务用户。
Create a new role or use an existing role and grant the Database privileges.
The connector requires a user to create the destination table. Make sure the user has the required privileges for managing Snowflake objects:
对象
权限
备注
数据库
USAGE
架构
USAGE
表
OWNERSHIP
Required for the connector to ingest data into a table.
Snowflake recommends creating a separate user and role for each Kafka Cluster for better access control.
您可以使用以下脚本创建和配置自定义角色(需要 SECURITYADMIN 或等效角色):
备注
Privileges must be granted directly to the connector role and cannot be inherited.
Configure the destination table
Snowflake highly recommends using server-side schema evolution for schema changes and an error table for DML error logging. The following example shows how to create a table and add proper OWNERSHIP permissions.
The connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector. It automatically maps the record content's first-level keys to table columns matching by name (case-insensitive).
With Schema evolution enabled, Snowflake can automatically expand the destination table by adding new columns that are detected in the incoming stream and dropping NOT NULL constraints to accommodate new data patterns. For more information, see Table schema evolution.
If ENABLE_SCHEMA_EVOLUTION isn't enabled, you must create the schema manually by extending the table definition. The connector tries to match the record content's first-level keys to the table columns by name. If keys from the JSON don't match the table columns, the connector ignores the keys.
(Optional) Configure a secrets manager
Snowflake 强烈建议执行此步骤。配置 Openflow 支持的密钥管理器(例如 AWS、Azure 和 Hashicorp),并将公钥和私钥存储在密钥存储库中。
Determine how you'll authenticate to the secrets manager after it's configured. On AWS, Snowflake recommends using the EC2 instance role associated with Openflow so no other secrets need to be persisted.
Configure a Parameter Provider associated with this Secrets Manager in Openflow from the hamburger menu in the upper right. Navigate to Controller Settings > Parameter Provider and fetch your parameter values.
Reference all credentials with the associated parameter paths so no sensitive values need to be persisted within Openflow.
Grant access to users
For any other Snowflake users who require access to the raw ingested data by the connector (for example, for custom processing in Snowflake), grant those users the role created in step 1.
设置连接器¶
作为数据工程师,执行以下任务以安装和配置连接器:
安装连接器¶
To install the connector, do the following:
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
On the Openflow connectors page, find the connector and select Add to runtime.
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and select Add.
备注
Before you install the connector, ensure that you have created a database, schema, and a table in Snowflake for the connector to store ingested data.
Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.
使用您的 Snowflake 账户凭据进行运行时身份验证。
此时将显示 Openflow 画布,其中添加了连接器进程组。
配置连接器¶
If needed, customize the connector configuration before configuring the built-in parameters.
填充进程组参数
Right click on the imported process group and select Parameters.
Fill out the required parameter values
Parameters¶
The following table describes the parameters for the Openflow Connector for Kafka:
参数 |
描述 |
必填 |
|---|---|---|
Kafka 自动偏移重置 |
Automatic offset configuration applied when no previous consumer offset is found corresponding to Kafka Possible values: earliest: automatically reset the offset to the earlier offset, latest: automatically reset the offset to the latest offset, none: throw exception to the consumer if no previous offset found for the consumer group. Default: latest |
是 |
Kafka 启动服务器 |
A comma-separated list of Kafka bootstrap servers, should contain a port, for example |
是 |
Kafka Consumer Group ID |
连接器使用的使用者组的 ID。可以是任意唯一 ID。 |
是 |
Kafka SASL 密码 |
Password provided with configured password when using SASL512 SCRAM Mechanism |
|
Kafka SASL 用户名 |
Username provided with configured password when using SASL512 SCRAM Mechanism |
|
Kafka 主题格式 |
One of: names / pattern. Specifies whether the "Kafka Topics" provided are a comma separated list of names or a single regular expression. |
是 |
Kafka 主题 |
以逗号分隔的 Kafka 主题列表或正则表达式。 |
是 |
Snowflake Destination Database |
The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. |
是 |
Snowflake Destination Schema |
The schema where data is persisted, which must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. 请参阅以下示例:
|
是 |
Snowflake Destination Table |
The table where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. |
是 |
Start the connector¶
Right-click on the plane and select Enable all Controller Services.
Right-click on the plane and select Start. The connector starts data ingestion.
Understanding KAFKAMETADATA column¶
The connector populates the KAFKAMETADATA structure with metadata about the Kafka record. The structure contains the following information:
Field |
Data Type |
描述 |
|---|---|---|
topic |
String |
The name of the Kafka topic that the record came from. |
partition |
number |
The number of the partition within the topic. (Note that this is the Kafka partition, not the Snowflake micro-partition.) |
offset |
number |
The offset in that partition. |
timestamp |
number |
Timestamp when the record was added to Kafka. |
key |
String |
If the message is a Kafka KeyedMessage, this is the key for that message. In order for the connector to store the key in the RECORD_METADATA, the |
headers |
对象 |
A header is a user-defined key-value pair associated with the record. Each record can have 0, 1, or multiple headers. |
Measuring ingestion latency¶
For change tracking, incremental processing, and time-travel queries based on row modification time, the ROW_TIMESTAMP feature can be used.
Enable it by running the following command on your destination table:
After row timestamps are enabled, tables expose the METADATA$ROW_LAST_COMMIT_TIME column, which returns the timestamp when each row was last modified.
For more information, see METADATA$ROW_LAST_COMMIT_TIME.
备注
Row timestamp isn't available for interactive tables. For more information, see Snowflake 交互式表和交互式仓库.
Using the connector with Apache Iceberg™ tables¶
The connector can ingest data into Snowflake-managed Apache Iceberg™ tables but must meet the following requirements:
You must have been granted the USAGE privilege on the external volume associated with your Apache Iceberg™ table.
You must create an Apache Iceberg™ table before running the connector.
Grant usage on an external volume¶
For example, if your Iceberg table uses the kafka_external_volume external volume and the connector uses the role openflow_kafka_connector_role, run the following statement:
Create an Apache Iceberg™ table for ingestion¶
The connector doesn't create Iceberg tables automatically and doesn't support schema evolution. Before you run the connector, you must create an Iceberg table manually.
When you create an Iceberg table, you can use Iceberg data types (including VARIANT) or compatible Snowflake types.
For example, consider the following message:
To create an Iceberg table for the example message, use one of the following statements:
Using the connector with Interactive Tables¶
Interactive tables are a special type of Snowflake table optimized for low-latency, high-concurrency queries. For more information, see Snowflake 交互式表和交互式仓库.
Create an interactive table:
Important considerations:
Interactive tables have specific limitations and query restrictions. Review Snowflake 交互式表和交互式仓库 before using them with the connector.
For interactive tables, any required transformations must be handled in the table definition.
Interactive warehouses are required to query interactive tables efficiently.
Using the connector with a customer-defined schema for the destination table¶
The connector treats each Kafka record as a row to be inserted into a Snowflake table. For example, if you have a Kafka topic with the content of the message structured like the following JSON:
By default you don't have to specify all fields from the JSON thanks to the ENABLE_SCHEMA_EVOLUTION = TRUE feature. However, if you prefer a static schema, it can be created by running:
Using the connector with a customer-defined PIPE¶
If you choose to create your own pipe, you can define the data transformation logic in the pipe's COPY INTO statement. You can rename columns as required and cast the data types as needed. For example:
When you define your own pipe, your destination table columns don't need to match the JSON keys. You can rename the columns to your desired names and cast the data types if required.
To adjust the connector to work with a custom pipe, perform the following tasks:
Right-click on the PublishSnowpipeStreaming processor used in your Kafka ingestion flow in the Openflow canvas.
Select Configure from the context menu.
Navigate to the Properties tab.
In the Destination type field, pick Pipe.
In the Pipe field, type the name of your PIPE.
Select Apply to save the configuration.
Customizing error handling¶
Error handling is split between Openflow-side failures and server-side failures within the Snowpipe Streaming service.
Openflow Errors (Client-Side Failures): Errors such as unparseable payloads or custom transformation failures occur before records reach Snowflake. By default these records are discarded. It's possible to process these errors in Openflow - use FlowFiles from the parse failure relationship in the ConsumeKafka processor.
Snowpipe Streaming Errors (Server-Side Failures): Errors for records that successfully reach Snowflake but are incompatible with the destination table's schema (for example, type mismatches) are captured by the Snowflake infrastructure. When error logging is enabled on the destination table (
error_logging = true), these failed rows are automatically ingested into the destination Error table.