Kafka Consumer

The Kafka Consumer component subscribes to a Kafka topic and consumes messages in supported formats such as CSV, JSON, XML, and Avro.

It can consume data from:

  • Internal environments (default bootstrap servers).

  • External environments (custom bootstrap servers with SSL or Plaintext).

Kafka Consumer is widely used for real-time event-driven pipelines, stream processing, and log aggregation.

Note:

  • Currently supports SSL and Plaintext security types.

  • Host Aliases are supported with both SSL and Plaintext.

Configuration Sections

All component configurations are classified into the following sections:

  • Basic Information

  • Meta Information

  • Resource Configuration

Basic Information Tab

The Basic Information tab defines execution and scaling behavior.

Field
Description
Required

Invocation Type

Select the execution mode. Supported: Real-Time.

Yes

Deployment Type

Displays the deployment type of the component (pre-selected).

Yes

Container Image Version

Displays the Docker image version used (pre-selected).

Yes

Failover Event

Select an event to handle failures.

Optional

Batch Size

Maximum number of records per execution cycle (minimum: 10).

Yes

Enable Auto-Scaling

When enabled, the component pod scales automatically if lag exceeds 60%, up to the configured maximum instances.

Optional

Meta Information Tab

The Meta Information tab configures the Kafka topic, offsets, record type, and security.

Field
Description
Required

Topic Name

Name of the Kafka topic to consume messages from.

Yes

Start From

Starting offset for consumption: Processed, Beginning, Latest, Timestamp.

Yes

Is External

Enable to consume from an external Kafka cluster.

Optional

Bootstrap Server

Required if Is External is enabled. Provide external broker connection string.

Conditional

Config

Provide additional configuration for external clusters.

Conditional

Input Record Type

Select message format: CSV, JSON, XML, Avro.

Yes

Header

For CSV input: specify column headers.

Conditional

Separator

For CSV input: specify separator (e.g., ,).

Conditional

Security Type

Select: Plaintext or SSL.

Yes

Trust Store Location

For SSL: provide trust store path.

Conditional

Trust Store Password

For SSL: provide trust store password.

Conditional

Key Store Location

For SSL: provide key store path.

Conditional

Key Store Password

For SSL: provide key store password.

Conditional

SSL Key Password

For SSL: provide SSL key password.

Conditional

Host Aliases (IP / Host Names)

Map IP addresses to hostnames. Supported with both SSL and Plaintext.

Optional

Offset Starting Options

The Start From field controls where the consumer begins reading messages:

Option
Behavior
Example

Processed

Resumes from the last successfully processed offset.

If the last processed offset = 2, consumption resumes at offset 3.

Beginning

Reads from the earliest available offset in the topic.

Reads from offset 0 onward.

Latest

Reads only new messages produced after the consumer starts.

If started at 2:00 PM, only messages after that time are read.

Timestamp

Reads messages from a specific time range. Requires Start Time and End Time.

For 11:45 AM, consumes offsets with timestamps 1:00 PM and 2:30 PM.

Example Offset Timeline

Offset | Timestamp
-------|-------------------
0      | 2024-02-27 10:00 AM
1      | 2024-02-27 11:30 AM
2      | 2024-02-27 01:00 PM
3      | 2024-02-27 02:30 PM
  • Processed: If last processed offset = 2, consumer resumes at 3.

  • Beginning: Reads all messages from offset 0.

  • Latest: If started at 2:00 PM, only reads offset 3.

  • Timestamp (11:45 AM): Reads offsets 2 and 3 (messages at 1:00 PM and 2:30 PM).

Saving the Configuration

  1. Drag the Kafka Consumer into the workflow editor.

  2. Configure Basic Information and Meta Information fields.

  3. Save the component (Storage icon).

  4. Activate the pipeline to start consuming messages.

Example Workflow

  1. Configure Kafka Consumer with:

    • Topic Name: transactions

    • Start From: Beginning

    • Input Record Type: JSON

    • Security Type: SSL

    • Bootstrap Server: broker1.external:9093,broker2.external:9093

  2. Save and activate the pipeline.

  3. Messages from the transactions topic are consumed and passed downstream for real-time fraud detection.