Enrichment Component

The Enrichment Component allows users to enhance incoming event data by joining it with master tables or collections stored in external databases. This is useful for appending contextual information (e.g., department details, customer demographics, or product metadata) to raw event data.

It supports both relational databases (RDBMS) and MongoDB, with flexible configuration for different drivers and query options.

Key Capabilities

  • Enrich pipeline data from master tables/collections in external databases.

  • Supports multiple databases:

    • MySQL

    • MS SQL

    • MongoDB

    • PostgreSQL

    • Oracle

    • ClickHouse

    • Snowflake

    • Redshift

  • Configure real-time or batch invocation.

  • Apply Spark SQL joins or MongoDB aggregation queries.

  • Support for selected columns, column aliasing, and data type assignment.

  • Automatic schema download and upload for simplified configuration.

Configuration Overview

All Enrichment configurations are organized into the following sections:

  • Basic Information

  • Meta Information

  • Resource Configuration

  • Connection Validation

Steps to Configure the Enrichment Component

  1. Add the Component

    • Drag and drop the Enrichment Component into the Workflow Editor.

  2. Create Events

    • Create two events:

      • Input Event – feeds raw or ingested data.

      • Output Event – enriched data after joining with master table.

    • Connect input and output events to the component.

  3. Access Properties

    • Click the Enrichment Component to open configuration tabs.

Basic Information Tab

  • Invocation Type – Select Real-Time or Batch.

  • Deployment Type – Pre-selected; defines deployment environment.

  • Container Image Version – Pre-selected docker image version.

  • Failover Event – Select a fallback event in case of failure.

  • Batch Size – Enter the maximum number of records per execution cycle.

Meta Information Tab

Database Connection

  • Driver (*) – Select the database type. Supported drivers: MySQL, MSSQL, Oracle, PostgreSQL, MongoDB, ClickHouse, Snowflake, Redshift.

  • Port (*) – Enter host server port number.

  • Host IP Address (*) – Enter IP address of the database host.

  • Username (*) – Provide database username.

  • Password (*) – Provide database password.

  • Database Name (*) – Enter the database name.

SSL Configuration

  • Enable SSL – Available only for MongoDB, PostgreSQL, and ClickHouse.

  • Certificate Folder – Select the folder containing certificates uploaded in Admin Settings.

MongoDB-Specific Options

  • Connection Type – Choose one:

    • Standard – Port field appears.

    • SRV – Port field does not appear.

  • Master Table Query – Enter a MongoDB aggregation query instead of Spark SQL.

Sample MongoDB Aggregation Query:

{
  $match: { state: "Uttar-Pradesh" }
},
{
  $group: {
    _id: "$District",
    totalPopulation: { $sum: "$Population" },
    totalArea: { $sum: "$Area_(km2)" },
    averageDensity: { $avg: "$Pop._Density" }
  }
},
{
  $sort: { totalPopulation: -1 }
}

Query Details (RDBMS / Spark SQL)

  • Table Name – Enter the master table name.

  • Refresh Rate in Seconds – Frequency (default = 3600 sec) to refresh the master table and fetch changes.

  • Conditions – Define condition types (Remove / Blank).

  • Master Table Query – Write a Spark SQL query to fetch data from the master table.

  • Join Query – Use inputDf as the alias for incoming event data when writing join queries.

Example Spark SQL Join Query:

SELECT * 
FROM inputDf 
LEFT JOIN departments 
ON inputDf.department_id = departments.department_id;

Selected Columns

  • Choose specific columns from the table instead of loading all.

  • For each column, configure:

    • Alias Name (optional)

    • Column Type (from drop-down menu)

Alternative Options:

  • Upload File – Upload CSV or JSON files (max size: 2 MB).

  • Download Data – Download schema structure in JSON format.

Saving the Component Configuration

  1. After completing setup, click Save Component in Storage.

  2. A notification message will confirm successful update.

Example Use Cases

  • Enrich employee data with department details from a master table.

  • Add geographic or demographic attributes to customer transactions.

  • Merge real-time IoT events with static lookup tables for device metadata.

  • Apply MongoDB aggregation queries for advanced rollups (e.g., population by district).