Set up the Openflow Connector for Kinesis¶
Note
The connector is subject to the Connector Terms.
This topic describes the steps to set up the Openflow Connector for Kinesis.
Prerequisites¶
Ensure that you have reviewed About Openflow Connector for Kinesis.
Ensure that you have set up Openflow.
Set up a Kinesis stream¶
As an AWS administrator, perform the following actions in your AWS account:
Ensure that you have an AWS Account with IAM permissions to access Kinesis Streams and DynamoDB (https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html).
Optionally, create a dead-letter queue (DLQ) Kinesis Stream. Messages that cannot be successfully parsed can be redirected to a designated DLQ.
Set up Snowflake account¶
As a Snowflake account administrator, perform the following tasks:
Create a new role or use an existing role and grant the Database privileges.
Create a new Snowflake service user with the type as SERVICE.
Grant the Snowflake service user the role you created in the previous steps.
Configure with key-pair auth for the Snowflake SERVICE user from step 2.
Snowflake strongly recommends this step. Configure a secrets manager supported by Openflow, for example, AWS, Azure, and Hashicorp, and store the public and private keys in the secret store.
Note
If for any reason, you do not wish to use a secrets manager, then you are responsible for safeguarding the public key and private key files used for key-pair authentication according to the security policies of your organization.
Once the secrets manager is configured, determine how you will authenticate to it. On AWS, it’s recommended that you the EC2 instance role associated with Openflow as this way no other secrets have to be persisted.
In Openflow, configure a Parameter Provider associated with this Secrets Manager, from the hamburger menu in the upper right. Navigate to Controller Settings » Parameter Provider and then fetch your parameter values.
At this point all credentials can be referenced with the associated parameter paths and no sensitive values need to be persisted within Openflow.
If any other Snowflake users require access to the raw ingested documents and tables ingested by the connector (for example, for custom processing in Snowflake), then grant those users the role created in step 1.
Designate a warehouse for the connector to use. Start with the smallest warehouse size, then experiment with size depending on the number of tables being replicated, and the amount of data transferred. Large table numbers typically scale better with multi-cluster warehouses, rather than larger warehouse sizes.
Configure the connector¶
As a data engineer, perform the following tasks to configure a connector:
Create a database and schema in Snowflake for the connector to store ingested data.
Download the
connector definition file
.Import the connector definition into Openflow:
Open the Snowflake Openflow canvas.
Add a process group. To do this, drag and drop the Process Group icon from the tool palette at the top of the page onto the canvas. Once you release your pointer, a Create Process Group dialog appears.
On the Create Process Group dialog, select the connector definition file to import.
Right-click on the imported process group and select Parameters.
Populate the required parameter values as described in Flow parameters.
Flow parameters¶
You can configure the following flow parameters:
Parameter |
Description |
---|---|
AWS Region Code |
The AWS region where your Kinesis Stream is located, for example |
AWS Access Key ID |
The AWS Access Key ID to connect to your Kinesis Stream and DynamoDB. |
AWS Secret Access Key |
The AWS Secret Access Key to connect to your Kinesis Stream and DynamoDB. |
Kinesis Application Name |
The name that is used for DynamoDB table name for tracking application’s progress on Kinesis Stream consumption. |
Kinesis Stream Name |
AWS Kinesis Stream Name to consume data from. |
Kinesis Initial Stream Position |
The initial stream position from which the data starts replication. Possible values are:
|
Kinesis DLQ Stream Name |
The stream name where all records that failed processing are sent. |
Message Format |
The format of messages in Kinesis. Possible values are:
|
AVRO Schema Access Strategy |
To access data in AVRO message format, the schema is required. This parameter defines the strategy to access the AVRO schema of a particular message.
If the Message Format parameter is set to
|
Schema Registry URL |
The URL of the AVRO Schema Registry. This is required if the AVRO Schema Access Strategy parameter is set to |
Schema Registry Authentication Type |
The authentication type used by the AVRO Schema Registry. This is required if the AVRO Schema Access Strategy parameter is set to
|
Schema Registry Username |
The username used for |
Schema Registry Password |
The password used for |
Snowflake Account |
Snowflake account identifier with the organization name and account name formatted as [organization-name]-[account-name]. |
Snowflake User |
The name of the Snowflake user that the connector uses. |
Snowflake Private Key |
The RSA private key used for authentication. The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined. |
Snowflake Private Key File |
The file that contains the RSA Private Key used for authentication to Snowflake, formatted according to PKCS8 standards and having standard PEM headers and footers. The header line starts with
|
Snowflake Private Key Password |
The password associated with the Snowflake Private Key File. |
Snowflake Role |
The name of the role used by the connector. |
Snowflake Warehouse |
The Snowflake warehouse for connections. If no value is set, the default warehouse is used. |
Snowflake DB |
The database used to store data. |
Snowflake Schema |
The schema used to store data. |
Kinesis Stream To Table Map |
This optional parameter lets a user specify which streams should be mapped to which tables. Each stream and its table name should be separated by a colon. This table name must be a valid Snowflake unquoted identifier. The regular expressions cannot be ambiguous and any matched stream must match only a single target table. If empty or no matches are found, the stream name is used as the table name. Examples:
|
Schematization |
Determines whether data is inserted into individual columns or a single RECORD_METADATA field. Possible values are:
|
Iceberg Enabled |
Specifies whether the processor ingests data into an Iceberg table. The processor fails if this property doesn’t match the actual table type. Possible values:
|
Run the flow¶
Right-click on the plane and select Enable all Controller Services.
Right-click on the imported process group and select Start.
The connector starts the data ingestion.
Schema detection and evolution¶
The connector supports schema detection and evolution. The structure of tables in Snowflake can be defined
and evolved automatically to support the structure of new data loaded by the connector.
Without schema detection and evolution, the Snowflake table loaded by the connector only consists of two OBJECT
columns: RECORD_CONTENT
and RECORD_METADATA
.
With schema detection and evolution enabled, Snowflake can detect the schema of the streaming data and load data into tables that
automatically match any user-defined schema. Snowflake also lets you add new columns or drop the NOT NULL
constraint from columns missing
in new data files.
Schema detection with the connector is supported with or without a provided schema registry. If using schema registry (Avro™), the column is created with the data types defined in the provided schema registry. If there is no schema registry (JSON), the data type is inferred based on the data provided.
JSON arrays are not supported for further schematization.