Appendix A: Python Scripts

This section provides the python scripts used in our use cases.

A.1. PostgreSQL to Amazon S3

import psycopg2
import pandas as pd
import boto3
from io import BytesIO
from datetime import datetime, timedelta
import logging

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler()]
)

def extract_data_from_postgres(conn, query):
    try:
        cursor = conn.cursor()
        cursor.execute(query)
        rows = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description]
        df = pd.DataFrame(rows, columns=columns)
        logging.info(f"Data extracted: {len(df)} records.")
        return df
    except Exception as e:
        logging.error(f"Error during data extraction: {e}")
        raise
        return None

def upload_to_s3(df, aws_access_key_id, aws_secret_access_key, s3_bucket_name, s3_prefix, file_label):
    try:
        buffer = BytesIO()
        df.to_parquet(buffer, index=False, engine='pyarrow')
        buffer.seek(0)

        today_str = (datetime.today() + timedelta(days=1)).strftime('%Y-%m-%d')
        timestamp_str = datetime.today().strftime('%Y%m%d_%H%M%S')

        folder_path = f"{s3_prefix}/{file_label}/insertion_date={today_str}/"
        file_name = f"{file_label}-{timestamp_str}.parquet"
        s3_path = folder_path + file_name

        s3 = boto3.client(
            's3',
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key
        )
        s3.upload_fileobj(buffer, s3_bucket_name, s3_path)

        logging.info(f"Uploaded to s3://{s3_bucket_name}/{s3_path}")
    except Exception as e:
        logging.error(f"Error uploading to S3 for {file_label}: {e}")
        raise

def main(aws_access_key_id, aws_secret_access_key, s3_bucket_name, s3_prefix,
         host, port, database, username, password, start_date=None, end_date=None,day=None):
    if day is not None and start_date is None and end_date is None:
        # Calculate 'current date - 1 day'
        current_date = datetime.now().date() # Get only the date part
        calculated_date = current_date - timedelta(days=int(day)) # Use the 'day' parameter here

        start_date = calculated_date
        end_date = calculated_date
    elif start_date is not None and end_date is None:
        # If only start_date is given, assume end_date is the same
        end_date = start_date
    try:
        conn = psycopg2.connect(
            dbname=database,
            user=username,
            password=password,
            host=host,
            port=port
        )
        logging.info("Connected to PostgreSQL.")

        queries = [
            {
                "label": "sale",
                "sql": fr"""  select column1,2,3..... from sale
                            WHERE s.transaction_date::date BETWEEN DATE '{start_date}' AND DATE '{end_date}' """
            },
            {
                "label": "sale_item",
                "sql":fr"""   select ........... """
            },
            {
                "label": "transaction",
                "sql":f"""  ....."""}

            ]

        for query_info in queries:
            label = query_info["label"]
            sql = query_info["sql"]
            logging.info(f"Running query for: {label}")
            df = extract_data_from_postgres(conn, sql)
            if df is None or df.empty:
                logging.warning(f"No data found for: {label}")
                continue
            upload_to_s3(df, aws_access_key_id, aws_secret_access_key, s3_bucket_name, s3_prefix, label)

    except Exception as e:
        logging.error(f"Error in main(): {e}")
        raise
    finally:
        if 'conn' in locals():
            conn.close()
            logging.info("PostgreSQL connection closed.")

A.2. Amazon S3 to PostgreSQL

Last updated