Python Script

The Python Script Component allows users to write and execute custom Python scripts directly within a pipeline. It can also run scripts exported from a DSLab Notebook or pulled from a Version Control System (VCS). This component enables advanced transformations, custom integrations, and direct interactions with Kafka events.

Key Capabilities

  • Write custom Python scripts with function-based execution.

  • Reuse scripts exported from DSLab notebooks.

  • Manage scripts with Push/Pull from VCS for versioning.

  • Use Custom Kafka Producer to produce events directly into Kafka or Data Sync events.

  • Use Custom Logger to generate logs visible in the pipeline logs panel.

  • Support for DataFrame or List of Dictionaries input/output formats.

Configuration Overview

All Python Script configurations are organized into:

  • Basic Information

  • Meta Information

  • Resource Configuration

Configuring Meta Information

Component Name

  • Provide a unique component name.

  • Restrictions:

    • Must not contain spaces or special characters (use _ for spaces).

    • Must not be "test" or start with "test".

Start Function Name

  • Displays all function names from the script.

  • Select one as the entry point for execution.

In Event Data Type

  • Choose the expected input format:

    • DataFrame

    • List of Dictionary

External Libraries

  • List external Python libraries required in the script.

  • Multiple libraries can be separated by commas.

Execution Type

  1. Custom Script

    • Write the script in the editor.

    • Must define at least one function.

    • Validate the script before saving.

  2. DSLab Script

    • Use scripts exported from DSLab notebooks.

    • Required fields:

      • Project Name

      • Script Name

      • Start Function

    • Script content appears in the editor and can be validated.

  3. Version Control System (VCS)

    • Pull Script – Fetch committed scripts from VCS.

    • Push Script – Commit new script versions to VCS.

    • Versions appear as V1, V2, and so on.

Input Data

  • Define key-value pairs for function parameters.

  • Keys must match parameter names in the function.

Guidelines for Writing Python Scripts

  • Wrap all logic inside functions.

  • Functions can call other functions, but follow ordering rules:

    • Define outer functions above calling functions.

    • Define inner functions above calling statements.

  • Use 4 spaces per indentation.

  • Do not use reserved keywords (e.g., type) as parameter names.

  • Return output as either:

    • DataFrame

    • List of Dictionary

  • Explicitly import all required packages at the top of the script.

  • Input arguments must be mapped correctly in Meta Information → Input Data.

Custom Kafka Producer

Send data directly from the script to Kafka or Data Sync events.

kaf_obj.kafka_produce(df, "Event_name", "failure_message")
  • df – Data (DataFrame or List of Dict).

  • Event_name – Event identifier (use event name, not display name).

    • Use @EVENT.OUTEVENT for the connected out-event.

    • Use @EVENT.FAILEVENT for the configured failover event.

  • failure_message – Optional message appended to output data.

Note:

  • Must run in Real-Time mode when using custom Kafka producer.

  • In Batch mode, using actual event names may prevent the next component from triggering.

Custom Logger

Log custom messages from Python scripts:

log_obj.info("Executing logging info")
log_obj.info(f"Executing logging info - {key1}, {key2}, {key3}")
  • key1, key2, key3 – Parameters passed from the Input Data field.

Important Restrictions

  • Logging environment variables (e.g., @ENV.HOST, @ENV.PASSWORD, @ENV.USERNAME) is prohibited.

  • Attempting this will raise:

    Cannot log secrets to pipeline

Sample Usage

1. As Reader Component

No input event, return a dataset directly.

import json, requests, pandas as pd

def getmovies_result():
    data = requests.get("http://www.omdbapi.com/?s=water&apikey=demo_key")
    loaded_json = json.loads(data.content)
    df = pd.DataFrame.from_dict(loaded_json['Search'], orient='columns')
    return df

2. As Transformation Component

Filter incoming DataFrame from previous event.

def getdata(df):
    cond = df['Unit Price'] > 450
    return df[cond]

3. With Custom Arguments

Use input data values from Meta Information.

def getdata(df, range):
    cond = df['Unit Price'] > range
    return df[cond]

Limitations & Notes

  • Must be run in Real-Time mode when using custom Kafka producer.

  • Scripts must return DataFrame or List of Dictionary.

  • Cannot log environment variables using log_obj.info().

  • Data produced to failover events via custom producer will not appear as failed records in Failure Analysis but as processed records in Data Metrics.