Python Script

Python script component allows users to run custom Python code on their data. The Python Script component may take data as input, which are then processed by the user's Python script.

The Python Script component works as a normal Python compile.

Please Note: The Python Script component can be used as a Reader, an API Data Ingestion component, a Transformation, or a writer component. The component can only return either in pandas df or a list of dictionaries.

Check out the given demonstration to understand the configuration steps involved in the Python Script.

Configuring Python Script Component

All component configurations are classified broadly into 3 section

Steps to configure Python Script (Custom Python Script)

  • Drag and drop the Python Script to the Workflow Editor.

  • The Python Script runner component will require an Event to pass on the data in a Pipeline Workflow.

  • Click the dragged Python Script component to get the component properties tabs.

Basic Information Tab

It is the default tab to open for the Python Script component while configuring the component.

  • Invocation Type: Select an Invocation Type from the drop-down menu to confirm the running mode of the reader component. The supported invocation type for this component is Real-time.

  • Deployment Type: It displays the deployment type for the component. This field comes pre-selected.

  • Batch Size: Provide the maximum number of records to be processed in one execution cycle (Min limit for this field is 10).

  • Failover Event: Select a failover Event from the drop-down menu.

  • Container Image Version: It displays the image version for the docker container. This field comes pre-selected.

  • Intelligent Scaling: By selecting the Real-Time as invocation type the Intelligent Scaling option appears. By enabling this option helps the component to scale up to the max number of instances by automatically reducing the data processing.

  • Description: Provide description of the component.

Meta Information Tab

Open the Meta Information Tab to open the fields and configure them.

  • Component Name: Provide a name for the Python Script component.

Please Note: The component name should be without space and special characters. Use the underscore symbol to show space in between words.

Please Note: Do not provide 'test' as a component name or the component name should not starts with 'test' in component name field in the Meta information of Python Script component. The word 'test' is being used at the backend for some development process.

  • Component Name: Provide a name to the component.

  • Start Function Name: It displays all the function names used in the python script in a drop-down menu. Select one function name with which you want to start.

  • In Event Data Type: Provide input data type as a Data Frame or List.

  • External Libraries: Provide the external library name in this field. Insert multiple library names separated by commas.

  • Script: The user can write their Python script in this space. Insert the Python script containing at least one function. The function should not have an argument, data frame argument, or custom argument. The user can also keep different versions of the script using the version control options.

    • It allows you to keep different versions of the script in version control systems. Anytime you need the older version of the script you can get from VCS which will replace the existing script with the committed script.

      • Pull script from VCS: It allows the user to pull desired committed script from the VCS.

      • Push script to VCS: It allow the user to commit different versions of a script to the VCS.

Please Note:

  • If the script in the component is same as the committed script it won't commit again. You can push any number of different scripts by giving different commit message.

  • The version of the committed message will be listed as V1,V2, and so on.

  • The user can verify the written script by using the verification icon provided for it.

  • A success notification message appears if the script is correct.

  • The user can use the Save Component icon to save the Python Script component.

  • Input Data: Use custom argument names as keys and provide the required value to configure the Script component.

Saving the Component Configuration

  • Click the Save Component in Storage icon.

  • Click the Update Pipeline icon to save the Pipeline workflow. (After getting the success message).

  • Activate the Pipeline workflow.

  • The user gets notified once the Logs start loading. Open the Logs section to see the logs.

  • The Python Script component is ready to read the data coming from the input event, it transforms the data and returns output data.

Please Note: The below-given instructions should be followed while writing a Python script in the Data Pipeline:

  • The Python script needs to be written inside a valid Python function. E.g., The entire code body should be inside the proper indentation of the function (Use 4 spaces per indentation level).

  • The Python script should have at least one main function. Multiple functions are acceptable, and one function can call another function.

    • It should be written above the calling function body (if the called function is an outer function).

    • It should be written above the calling statement (if called function is an inner function)·

  • Spaces are the preferred indentation method.

  • Do not use 'type' as the function argument as it is a predefined keyword.

  • The code in the core Python distribution should always use UTF-8.

  • Single-quoted strings and double-quoted strings are considered the same in Python.

  • All the packages used in the function need to import explicitly before writing the function.

  • The Python script should return data in the form of a DataFrame or List only. The form of data should be defined while writing the function.

  • If the user uses some Kafka event data for transformation, then the first argument of the function should be a DataFrame or List.

  • If the user needs to use some external library, the user needs to mention the library name in the external libraries field. If the user wants to use multiple external libraries, the library names should be separated by a comma.

  • If the user needs to pass some external input in your main function, then you can use the input data field. The key name should be the same according to the variable's name and value that is put as per the requirement.

  • The user can use that component as a reader, transformation, and writer.

Sending Data to Kafka Event or Data Sync using custom Kafka Producer by Python Script

Using custom Kafka producer, the data can be sent to the Kafka event as well Data Sync Event.. Here is the code for custom Kafka producer:

kaf_obj.kafka_produce(df, "Event_name", "failure_message")

df: It is the data Either in the form of DataFrame or List of dictionary.

Event_name: Enter the Kafka or Data Sync event name (please do not use Display name) where the data has to be produced. The event name should be inside quotes. The user can send data to out event either giving the event name in this field or writing @EVENT.OUTEVENT in this field. For eg:

kaf_obj.kafka_produce(df, "@EVENT.OUTEVENT", "failed")

kaf_obj.kafka_produce(df, "@EVENT.FAILEVENT", "failed")

Please Note:

  • If using @EVENT.OUTEVENT as an Event_name then the Python script component must be connected with the Kafka Event to send the data to the connected event.

  • Python Script component must be used in real time when using Custom Kafka Producer to send the data to the Kafka topic. Using it in batch mode can result in improper functionality of the monitoring page and potential WebSocket issues.

@EVENT.FAILEVENT: It will send the data to connected failover event with the Python script component.

Failure_message: This field is optional. Enter the message which needs to be send.

Custom Logger

Python Component has custom logger feature which will be used in script for sending the user's custom logs in the logs panel. Please refer the below code for custom logger:

log_obj.info("Executing logging info")

log_obj.info(f"Executing logging info-{key1}, {key2}, {key3}")

Please Note: Using this feature, the user cannot get the logs which contain environment variables.

Sample Python code to produce data using custom producer and custom logger:

Here,

df : is in-event data from kafka event connected to python component.

key1 , key2 , key3: Any parameter passed to the function from Input Data section of metadata info of python script component.

log_obj.info(): It is for custom logger and takes string message as input.

kaf_obj.kafka_produce(): It is for custom kafka producer and takes parameter:

  • df as data to produce – pandas.DataFrame and List of Dict type is supported.

  • Event name – any Kafka event name in string format has to be given.- If@EVENT.OUTEVENT is given, then it sends data to connected out event. If @EVENT.FAILEVENT is given, then it will send the data to connected failover event with the Python script component.

  • Any Failed Message - in String format can be given, to append it in output data, same message will be appended to all the rows of data. (This field is optional).

Please Note: If the data is produced to a Failover Event using custom Kafka Producer then that data will not be considered as failed data and it will not be listed on the Failure Analysis page as failed data and it will be reflected in green color as processed records on the Data Metrics page.

Python Script Examples

The Custom Python Script transform component supports 3 types of scripts in the Data Pipeline.

1. As Reader Component: If you don’t have any in Event then you can use no argument function. For Example,

import json
import requests
import pandas as pd
def getmovies_result():
    data = requests.get("http://www.omdbapi.com/?s=water&apikey=ba5d53d4")
    loaded_json = json.loads(data.content)
    data = loaded_json['Search']
    df = pd.DataFrame.from_dict(data, orient='columns')
    return df

2. As Transformation Component: If you have data to execute some operation, then use the first argument as data or a list of dictionaries. For Example,

Here the df holds the data coming from the previous event as argument to the pram of the method.

def getdata(df):
    cond1 = df['Unit Price'] > 450
    filter_df = df[cond1]
    return filter_df

3. Custom Argument with Data: If there is a custom argument with the data-frame i.e. the data is coming from the previous event and we have passed the custom argument to the parameter of the function. here df will hold the data from the previous event and the second param: arg range can be given in the input data section of the component.


def getdata(df, range):
    cond1 = df['Unit Price'] >  range
    filter_df = df[cond1]
    return filter_df

Please Note:

  • It is possible for a Data Pipeline user to keep different versions of the Python script in VCS. The user can Push a version of the Python script to VCS and Pull a version of the Python script from VCS.

  • The Custom Kafka producer in batch mode will not trigger next component, if the actual Kafka event name is given in place of @EVENT.OUTEVENT/@EVENT.FAILEVENT

Sending Data to Kafka Event or Data Sync using Custom Kafka Producer by Python Script

Using custom Kafka producer, the data can be sent to the Kafka event as well Data Sync Event.. Here is the code for custom Kafka producer:

kaf_obj.kafka_produce(df, "Event_name", "failure_message")

df: It is the data Either in the form of DataFrame or List of dictionary.

Event_name: Enter the Kafka or Data Sync event name (please do not use Display name) where the data has to be produced. The event name should be inside quotes. The user can send data to out event either giving the event name in this field or writing @EVENT.OUTEVENT in this field. For E.g.,

kaf_obj.kafka_produce(df, "@EVENT.OUTEVENT", "failed")
kaf_obj.kafka_produce(df, "@EVENT.FAILEVENT", "failed")

Please Note:

  • If using @EVENT.OUTEVENT as an Event_name then the Python script component must be connected with the Kafka Event to send the data to the connected event.

  • Custom Producer must be used in real time. Using it in batch mode can result in improper functionality of the monitoring page and potential WebSocket issues.

@EVENT.FAILEVENT: It will send the data to connected failover event with the Python script component.

Failure_message: This field is optional. Enter the message which needs to be send.

Custom Logger

Python Component has custom logger feature which will be used in script for sending the user's custom logs in the logs panel. Please refer the below code for custom logger:

log_obj.info("Executing logging info")
log_obj.info(f"Executing logging info-{key1}, {key2}, {key3}")

Please Note: Using this feature, the user cannot get the logs which contain environment variables.

Sample Python code to produce data using custom producer and custom logger:

Here,

df : is in-event data from kafka event connected to python component.

key1 , key2 , key3: Any parameter passed to the function from Input Data section of metadata info of python script component.

log_obj.info(): It is for custom logger and takes string message as input.

kaf_obj.kafka_produce(): It is for custom kafka producer and takes parameter:

  • df as data to produce – pandas.DataFrame and List of Dict type is supported.

  • Event name – any Kafka event name in string format has to be given.- If@EVENT.OUTEVENT is given, then it sends data to connected out event. If @EVENT.FAILEVENT is given, then it will send the data to connected failover event with the Python script component.

  • Any Failed Message - in String format can be given, to append it in output data, same message will be appended to all the rows of data. (This field is optional).

Please Note: If the data is produced to a Failover Event using custom Kafka Producer then that data will not be considered as failed data and it will not be listed on the Failure Analysis page as failed data and it will be reflected in green color as processed records on the Data Metrics page.

Python Script Examples

The Custom Python Script transform component supports 3 types of scripts in the Data Pipeline.

1. As Reader Component: If you don’t have any in Event then you can use no argument function. For Example,

import json
import requests
import pandas as pd
def getmovies_result():
    data = requests.get("http://www.omdbapi.com/?s=water&apikey=ba5d53d4")
    loaded_json = json.loads(data.content)
    data = loaded_json['Search']
    df = pd.DataFrame.from_dict(data, orient='columns')
    return df

2. As Transformation Component: If you have data to execute some operation, then use the first argument as data or a list of dictionaries. For Example,

Here the df holds the data coming from the previous event as argument to the pram of the method.

def getdata(df):
    cond1 = df['Unit Price'] > 450
    filter_df = df[cond1]
    return filter_df

3. Custom Argument with Data: If there is a custom argument with the data-frame i.e. the data is coming from the previous event and we have passed the custom argument to the parameter of the function. here df will hold the data from the previous event and the second param: arg range can be given in the input data section of the component.


def getdata(df, range):
    cond1 = df['Unit Price'] >  range
    filter_df = df[cond1]
    return filter_df

Please Note:

  • It is possible for a Data Pipeline user to keep different versions of the Python script in VCS. The user can Push a version of the Python script to VCS and Pull a version of the Python script from VCS.

  • The Custom Kafka producer in batch mode will not trigger next component, if the actual Kafka event name is given in place of @EVENT.OUTEVENT/@EVENT.FAILEVENT

Last updated