Data Pipeline
  • Data Pipeline
    • About Data Pipeline
    • Design Philosophy
    • Low Code Visual Authoring
    • Real-time and Batch Orchestration
    • Event based Process Orchestration
    • ML and Data Ops
    • Distributed Compute
    • Fault Tolerant and Auto-recovery
    • Extensibility via Custom Scripting
  • Getting Started
    • Homepage
      • Create
        • Creating a New Pipeline
          • Adding Components to Canvas
          • Connecting Components
            • Events [Kafka and Data Sync]
          • Memory and CPU Allocations
        • Creating a New Job
          • Job Editor Page
          • Spark Job
            • Readers
              • HDFS Reader
              • MongoDB Reader
              • DB Reader
              • S3 Reader
              • Azure Blob Reader
              • ES Reader
              • Sandbox Reader
              • Athena Query Executer
            • Writers
              • HDFS Writer
              • Azure Writer
              • DB Writer
              • ES Writer
              • S3 Writer
              • Sandbox Writer
              • Mongodb Writer
              • Kafka Producer
            • Transformations
          • PySpark Job
          • Python Job
          • Python Job(On demand)
          • Script Executer Job
          • Job Alerts
        • Register as Job
        • Exporting a Script From Data Science Lab
        • Utility
        • Git Sync
      • Overview
        • Jobs
        • Pipeline
      • List Jobs
      • List Pipelines
      • Scheduler
      • Data Channel & Cluster Events
      • Trash
      • Settings
    • Pipeline Workflow Editor
      • Pipeline Toolbar
        • Pipeline Overview
        • Pipeline Testing
        • Search Component in Pipelines
        • Push & Pull Pipeline
        • Pull Pipeline
        • Full Screen
        • Log Panel
        • Event Panel
        • Activate/Deactivate Pipeline
        • Update Pipeline
        • Failure Analysis
        • Delete Pipeline
        • Pipeline Component Configuration
        • Pipeline Failure Alert History
        • Format Flowchart
        • Zoom In/Zoom Out
        • Update Component Version
      • Component Panel
      • Right-side Panel
    • Testing Suite
    • Activating Pipeline
    • Pipeline Monitoring
    • Job Monitoring
  • Components
    • Adding Components to Workflow
    • Component Architecture
    • Component Base Configuration
    • Resource Configuration
    • Intelligent Scaling
    • Connection Validation
    • Readers
      • GCS Reader
      • S3 Reader
      • HDFS Reader
      • DB Reader
      • ES Reader
      • SFTP Stream Reader
      • SFTP Reader
      • Mongo DB Reader
        • MongoDB Reader Lite (PyMongo Reader)
        • MongoDB Reader
      • Azure Blob Reader
      • Azure Metadata Reader
      • ClickHouse Reader (Docker)
      • Sandbox Reader
      • Azure Blob Reader (Docker)
      • Athena Query Executer
    • Writers
      • S3 Writer
      • DB Writer
      • HDFS Writer
      • ES Writer
      • Video Writer
      • Azure Writer
      • ClickHouse Writer (Docker)
      • Sandbox Writer
      • MongoDB Writers
        • MongoDB Writer
        • MongoDB Writer Lite (PyMongo Writer)
    • Machine Learning
      • DSLab Runner
      • AutoML Runner
    • Consumers
      • GCS Monitor
      • Sqoop Executer
      • OPC UA
      • SFTP Monitor
      • MQTT Consumer
      • Video Stream Consumer
      • Eventhub Subscriber
      • Twitter Scrapper
      • Mongo ChangeStream
      • Rabbit MQ Consumer
      • AWS SNS Monitor
      • Kafka Consumer
      • API Ingestion and Webhook Listener
    • Producers
      • WebSocket Producer
      • Eventhub Publisher
      • EventGrid Producer
      • RabbitMQ Producer
      • Kafka Producer
      • Synthetic Data Generator
    • Transformations
      • SQL Component
      • File Splitter
      • Rule Splitter
      • Stored Producer Runner
      • Flatten JSON
      • Pandas Query Component
      • Enrichment Component
      • Mongo Aggregation
      • Data Loss Protection
      • Data Preparation (Docker)
      • Rest Api Component
      • Schema Validator
    • Scripting
      • Script Runner
      • Python Script
        • Keeping Different Versions of the Python Script in VCS
    • Scheduler
    • Alerts
      • Alerts
      • Email Component
    • Job Trigger
  • Custom Components
  • Advance Configuration & Monitoring
    • Configuration
      • Default Component Configuration
      • Logger
    • Data Channel
    • Cluster Events
    • System Component Status
  • Version Control
  • Use Cases
Powered by GitBook
On this page
  • Configuring Meta Information of Python Script component:
  • Custom Kafka Producer feature in Python Script Component
  • Custom Logger
  • Python Script Examples
  1. Components
  2. Scripting

Python Script

PreviousScript RunnerNextKeeping Different Versions of the Python Script in VCS

Last updated 11 months ago

The Python script component is designed to allow users to write their own custom Python scripts and run them in the pipeline. It also enables users to directly use scripts written in a DSLab notebook and run them in the pipeline.

Check out the given demonstration to understand the configuration steps involved in the Python Script.

All component configurations are classified broadly into 3 section

  • Meta Information

Configuring Meta Information of Python Script component:

Please Note: Do not provide 'test' as a component name or the component name should not starts with 'test' in component name field in the Meta information of Python Script component. The word 'test' is being used at the backend for some development process.

  • Component Name: Provide a name to the component. Please note that the component name should be without space and special characters. Use the underscore symbol to show space in between words.

  • Start Function Name: It displays all the function names used in the python script in a drop-down menu. Select one function name with which you want to start.

  • In Event Data Type: The user will find two options here:

    • DataFrame

    • List of Dictionary

  • External Libraries: The user can provide some external python library in order to use them in the script. The user can enter multiple library names separated by commas.

  • Execution Type: Select the Type of Execution from the drop-down. There are two execution types supported:

    • Custom Script: The user can write their own custom python script in the Script field.

      • Script: The user can write their own custom python script in this field. Make sure the start should contain at least one function. The user can also validate the script by Clicking on Validate Script option in this field.

      • Start Function: Here, all the function names used in the script will be listed. Select the start function name to execute the python script.

      • Input Data: If any parameter has been given in the function, then the name of the parameter is provided as Key, and value of the parameters has to be provided as value in this field.

    • DSLab Script: In this execution type, the user can use the script which is exported from DSLab notebook. The user needs to provide the following information if selects this option as an Execution Type:

      • Project Name: Select the same Project using the drop-down menu where the Notebook has been created.

      • Script Name: This field will list the exported Notebook names which are exported from the Data Science Lab module to Data Pipeline.

      • Start Function: Here, all the function names used in the script will be listed. Select the start function name to execute the python script.

      • Input Data: If any parameter has been given in the function, then the name of the parameter is provided as Key, and value of the parameters has to be provided as value in this field.

  • Pull script from VCS: It allows the user to pull desired committed script from the VCS.

  • Push script to VCS: It allow the user to commit different versions of a script to the VCS.

Please Note: The below-given instructions should be followed while writing a Python script in the Data Pipeline:

  • If the script in the component is same as the committed script it won't commit again. You can push any number of different scripts by giving different commit message.

  • The version of the committed message will be listed as V1,V2, and so on.

  • The Python script needs to be written inside a valid Python function. E.g., The entire code body should be inside the proper indentation of the function (Use 4 spaces per indentation level).

  • The Python script should have at least one main function. Multiple functions are acceptable, and one function can call another function.

    • It should be written above the calling function body (if the called function is an outer function).

    • It should be written above the calling statement (if called function is an inner function)·

  • Spaces are the preferred indentation method.

  • Do not use 'type' as the function argument as it is a predefined keyword.

  • The code in the core Python distribution should always use UTF-8.

  • Single-quoted strings and double-quoted strings are considered the same in Python.

  • All the packages used in the function need to import explicitly before writing the function.

  • The Python script should return data in the form of a DataFrame or List only. The form of data should be defined while writing the function.

  • If the user needs to use some external library, the user needs to mention the library name in the external libraries field. If the user wants to use multiple external libraries, the library names should be separated by a comma.

  • If the user needs to pass some external input in your main function, then you can use the input data field. The key name should be the same according to the variable's name and value that is put as per the requirement.

Custom Kafka Producer feature in Python Script Component

This feature enables the user to send data directly to the Kafka Event or data sync event connected to the component. Below is the command to configure Custom Kafka Producer in the script:

kaf_obj.kafka_produce(df, "Event_name", "failure_message")

#Here,

#df: It is the data either in the form of DataFrame or List of dictionary.

#failure_message: The user can give any messages here, this argument is optional.

#Event_name: Enter the Kafka or Data Sync event name (please do not use Display name) 
#where the data has to be produced. The event name should be inside quotes. 

#The user can send data to out event either by giving the event name in this field 
#or writing @EVENT.OUTEVENT in this field. For example:

kaf_obj.kafka_produce(df, "@EVENT.OUTEVENT", "failed")


#The user can also send the data to the configured failover event with the Python script component.

kaf_obj.kafka_produce(df, "@EVENT.FAILEVENT", "failed")

Please Note:

  • If using @EVENT.OUTEVENT as an Event_name, the Python script component must be connected with the Kafka Event to send the data to the connected event.

  • If using a specific "Event_Name" in the custom Kafka producer, it is not mandatory to connect the Kafka event with the component. It will send data directly to that specified Kafka event.

  • The Python Script component must be used in real-time when using Custom Kafka Producer to send data to the Kafka topic. Using it in batch mode can result in improper functionality of the monitoring page and potential WebSocket issues.

Custom Logger

The Python Component has a custom logger feature that allows users to write their own custom logs, which will be displayed in the logs panel. Please refer to the code below for the custom logger:

log_obj.info("Executing logging info")
log_obj.info(f"Executing logging info-{key1}, {key2}, {key3}")


#Here,
#key1, key2, key3: Any parameter passed to the function from the Input Data section 
#of the metadata info of the Python script component.

Please Note: Using this feature, the user cannot get the logs which contain environment variables.

Sample Python code to produce data using custom producer and custom logger:

Here,

  • df: Previous event data in the form of List or DataFrame connected to the Python component.

  • key1, key2, key3: Any parameter passed to the function from the Input Data section of the metadata info of the Python script component.

  • log_obj.info(): It is for custom logging and takes a string message as input.

  • kaf_obj.kafka_produce(): It is for the custom Kafka producer and takes the following parameters:

    • df: Data to produce – pandas.DataFrame and List of Dict types are supported.

    • Event name: Any Kafka event name in string format. If @EVENT.OUTEVENT is given, it sends data to the connected out event. If @EVENT.FAILEVENT is given, it sends the data to the connected failover event with the Python script component.

    • Any Failed Message: A message in string format can be given to append to the output data. The same message will be appended to all rows of data (this field is optional).

Please Note: If the data is produced to a Failover Event using custom Kafka Producer then that data will not be considered as failed data and it will not be listed on the Failure Analysis page as failed data and it will be reflected in green color as processed records on the Data Metrics page.

Python Script Examples

The Custom Python Script transform component supports 3 types of scripts in the Data Pipeline.

1. As Reader Component: If you don’t have any in Event then you can use no argument function. For Example,

import json
import requests
import pandas as pd
def getmovies_result():
    data = requests.get("http://www.omdbapi.com/?s=water&apikey=ba5d53d4")
    loaded_json = json.loads(data.content)
    data = loaded_json['Search']
    df = pd.DataFrame.from_dict(data, orient='columns')
    return df

2. As Transformation Component: If you have data to execute some operation, then use the first argument as data or a list of dictionaries. For Example,

Here the df holds the data coming from the previous event as argument to the pram of the method.

def getdata(df):
    cond1 = df['Unit Price'] > 450
    filter_df = df[cond1]
    return filter_df

3. Custom Argument with Data: If there is a custom argument with the data-frame i.e. the data is coming from the previous event and we have passed the custom argument to the parameter of the function. here df will hold the data from the previous event and the second param: arg range can be given in the input data section of the component.


def getdata(df, range):
    cond1 = df['Unit Price'] >  range
    filter_df = df[cond1]
    return filter_df

Please Note:

  • The Custom Kafka producer in batch mode will not trigger next component, if the actual Kafka event name is given in place of @EVENT.OUTEVENT/@EVENT.FAILEVENT

​​

​​

Script: The Exported script appears under this space. The user can also validate the script by Clicking on Validate Script option in this field. For more information to export the script from DSLab module, please refer the following link: .

It is possible for a Data Pipeline user to . The user can Push a version of the Python script to VCS and Pull a version of the Python script from VCS.

​Basic Information​
Resource Configuration​
Exporting a Script from DSLab
keep different versions of the Python script in VCS
Python Script Component as DSLab Script as Execution Type
Meta information as Custom Script as Execution Type
Meta information as DSLab Script as Execution Type
Python Script Component as Custom Script as Execution Type