Python Script
The Python Script Component allows users to write and execute custom Python scripts directly within a pipeline. It can also run scripts exported from a DSLab Notebook or pulled from a Version Control System (VCS). This component enables advanced transformations, custom integrations, and direct interactions with Kafka events.
Key Capabilities
Write custom Python scripts with function-based execution.
Reuse scripts exported from DSLab notebooks.
Manage scripts with Push/Pull from VCS for versioning.
Use Custom Kafka Producer to produce events directly into Kafka or Data Sync events.
Use Custom Logger to generate logs visible in the pipeline logs panel.
Support for DataFrame or List of Dictionaries input/output formats.
Configuration Overview
All Python Script configurations are organized into:
Basic Information
Meta Information
Resource Configuration
Configuring Meta Information
Component Name
Provide a unique component name.
Restrictions:
Must not contain spaces or special characters (use
_
for spaces).Must not be
"test"
or start with"test"
.
Start Function Name
Displays all function names from the script.
Select one as the entry point for execution.
In Event Data Type
Choose the expected input format:
DataFrame
List of Dictionary
External Libraries
List external Python libraries required in the script.
Multiple libraries can be separated by commas.
Execution Type
Custom Script
Write the script in the editor.
Must define at least one function.
Validate the script before saving.
DSLab Script
Use scripts exported from DSLab notebooks.
Required fields:
Project Name
Script Name
Start Function
Script content appears in the editor and can be validated.
Version Control System (VCS)
Pull Script – Fetch committed scripts from VCS.
Push Script – Commit new script versions to VCS.
Versions appear as
V1
,V2
, and so on.
Input Data
Define key-value pairs for function parameters.
Keys must match parameter names in the function.
Guidelines for Writing Python Scripts
Wrap all logic inside functions.
Functions can call other functions, but follow ordering rules:
Define outer functions above calling functions.
Define inner functions above calling statements.
Use 4 spaces per indentation.
Do not use reserved keywords (e.g.,
type
) as parameter names.Return output as either:
DataFrame
List of Dictionary
Explicitly import all required packages at the top of the script.
Input arguments must be mapped correctly in Meta Information → Input Data.
Custom Kafka Producer
Send data directly from the script to Kafka or Data Sync events.
kaf_obj.kafka_produce(df, "Event_name", "failure_message")
df – Data (DataFrame or List of Dict).
Event_name – Event identifier (use event name, not display name).
Use
@EVENT.OUTEVENT
for the connected out-event.Use
@EVENT.FAILEVENT
for the configured failover event.
failure_message – Optional message appended to output data.
Note:
Must run in Real-Time mode when using custom Kafka producer.
In Batch mode, using actual event names may prevent the next component from triggering.
Custom Logger
Log custom messages from Python scripts:
log_obj.info("Executing logging info")
log_obj.info(f"Executing logging info - {key1}, {key2}, {key3}")
key1, key2, key3 – Parameters passed from the Input Data field.
Important Restrictions
Logging environment variables (e.g.,
@ENV.HOST
,@ENV.PASSWORD
,@ENV.USERNAME
) is prohibited.Attempting this will raise:
Cannot log secrets to pipeline
Sample Usage
1. As Reader Component
No input event, return a dataset directly.
import json, requests, pandas as pd
def getmovies_result():
data = requests.get("http://www.omdbapi.com/?s=water&apikey=demo_key")
loaded_json = json.loads(data.content)
df = pd.DataFrame.from_dict(loaded_json['Search'], orient='columns')
return df
2. As Transformation Component
Filter incoming DataFrame from previous event.
def getdata(df):
cond = df['Unit Price'] > 450
return df[cond]
3. With Custom Arguments
Use input data values from Meta Information.
def getdata(df, range):
cond = df['Unit Price'] > range
return df[cond]
Limitations & Notes
Must be run in Real-Time mode when using custom Kafka producer.
Scripts must return DataFrame or List of Dictionary.
Cannot log environment variables using
log_obj.info()
.Data produced to failover events via custom producer will not appear as failed records in Failure Analysis but as processed records in Data Metrics.