# Kafka Consumer

The **Kafka Consumer** component subscribes to a **Kafka topic** and consumes messages in supported formats such as **CSV, JSON, XML, and Avro**.

It can consume data from:

* **Internal environments** (default bootstrap servers).
* **External environments** (custom bootstrap servers with SSL or Plaintext).

Kafka Consumer is widely used for **real-time event-driven pipelines**, **stream processing**, and **log aggregation**.

> **Note**:
>
> * Currently supports **SSL** and **Plaintext** security types.
> * **Host Aliases** are supported with both SSL and Plaintext.

### Configuration Sections

All component configurations are classified into the following sections:

* **Basic Information**
* **Meta Information**
* **Resource Configuration**

### Basic Information Tab

The *Basic Information* tab defines execution and scaling behavior.

| Field                       | Description                                                                                                      | Required |
| --------------------------- | ---------------------------------------------------------------------------------------------------------------- | -------- |
| **Invocation Type**         | Select the execution mode. Supported: **Real-Time**.                                                             | Yes      |
| **Deployment Type**         | Displays the deployment type of the component (pre-selected).                                                    | Yes      |
| **Container Image Version** | Displays the Docker image version used (pre-selected).                                                           | Yes      |
| **Failover Event**          | Select an event to handle failures.                                                                              | Optional |
| **Batch Size**              | Maximum number of records per execution cycle (minimum: 10).                                                     | Yes      |
| **Enable Auto-Scaling**     | When enabled, the component pod scales automatically if lag exceeds 60%, up to the configured maximum instances. | Optional |

### Meta Information Tab

The *Meta Information* tab configures the Kafka topic, offsets, record type, and security.

| Field                              | Description                                                                               | Required    |
| ---------------------------------- | ----------------------------------------------------------------------------------------- | ----------- |
| **Topic Name**                     | Name of the Kafka topic to consume messages from.                                         | Yes         |
| **Start From**                     | Starting offset for consumption: **Processed**, **Beginning**, **Latest**, **Timestamp**. | Yes         |
| **Is External**                    | Enable to consume from an external Kafka cluster.                                         | Optional    |
| **Bootstrap Server**               | Required if *Is External* is enabled. Provide external broker connection string.          | Conditional |
| **Config**                         | Provide additional configuration for external clusters.                                   | Conditional |
| **Input Record Type**              | Select message format: **CSV**, **JSON**, **XML**, **Avro**.                              | Yes         |
| **Header**                         | For CSV input: specify column headers.                                                    | Conditional |
| **Separator**                      | For CSV input: specify separator (e.g., `,`).                                             | Conditional |
| **Security Type**                  | Select: **Plaintext** or **SSL**.                                                         | Yes         |
| **Trust Store Location**           | For SSL: provide trust store path.                                                        | Conditional |
| **Trust Store Password**           | For SSL: provide trust store password.                                                    | Conditional |
| **Key Store Location**             | For SSL: provide key store path.                                                          | Conditional |
| **Key Store Password**             | For SSL: provide key store password.                                                      | Conditional |
| **SSL Key Password**               | For SSL: provide SSL key password.                                                        | Conditional |
| **Host Aliases (IP / Host Names)** | Map IP addresses to hostnames. Supported with both SSL and Plaintext.                     | Optional    |

### Offset Starting Options

The **Start From** field controls where the consumer begins reading messages:

| Option        | Behavior                                                                             | Example                                                                   |
| ------------- | ------------------------------------------------------------------------------------ | ------------------------------------------------------------------------- |
| **Processed** | Resumes from the last successfully processed offset.                                 | If the last processed offset = `2`, consumption resumes at offset `3`.    |
| **Beginning** | Reads from the earliest available offset in the topic.                               | Reads from offset `0` onward.                                             |
| **Latest**    | Reads only new messages produced after the consumer starts.                          | If started at `2:00 PM`, only messages after that time are read.          |
| **Timestamp** | Reads messages from a specific time range. Requires **Start Time** and **End Time**. | For `11:45 AM`, consumes offsets with timestamps `1:00 PM` and `2:30 PM`. |

### Example Offset Timeline

```
Offset | Timestamp
-------|-------------------
0      | 2024-02-27 10:00 AM
1      | 2024-02-27 11:30 AM
2      | 2024-02-27 01:00 PM
3      | 2024-02-27 02:30 PM
```

* **Processed**: If last processed offset = `2`, consumer resumes at `3`.
* **Beginning**: Reads all messages from offset `0`.
* **Latest**: If started at `2:00 PM`, only reads offset `3`.
* **Timestamp (11:45 AM)**: Reads offsets `2` and `3` (messages at `1:00 PM` and `2:30 PM`).

### Saving the Configuration

1. Drag the **Kafka Consumer** into the workflow editor.
2. Configure **Basic Information** and **Meta Information** fields.
3. Save the component (Storage icon).
4. Activate the pipeline to start consuming messages.

### Example Workflow

1. Configure Kafka Consumer with:
   * **Topic Name**: `transactions`
   * **Start From**: `Beginning`
   * **Input Record Type**: `JSON`
   * **Security Type**: `SSL`
   * **Bootstrap Server**: `broker1.external:9093,broker2.external:9093`
2. Save and activate the pipeline.
3. Messages from the `transactions` topic are consumed and passed downstream for real-time fraud detection.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.bdb.ai/bdb-user-documentation/platform-modules/11.0/data-engineering/data-pipelines/pipeline-editor/pipeline-components/consumers/kafka-consumer.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
