Appendix A: Python Scripts
This section provides the python scripts used in our use cases.
A.1. PostgreSQL to Amazon S3
import psycopg2
import pandas as pd
import boto3
from io import BytesIO
from datetime import datetime, timedelta
import logging
# Set up logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()]
)
def extract_data_from_postgres(conn, query):
try:
cursor = conn.cursor()
cursor.execute(query)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
df = pd.DataFrame(rows, columns=columns)
logging.info(f"Data extracted: {len(df)} records.")
return df
except Exception as e:
logging.error(f"Error during data extraction: {e}")
raise
return None
def upload_to_s3(df, aws_access_key_id, aws_secret_access_key, s3_bucket_name, s3_prefix, file_label):
try:
buffer = BytesIO()
df.to_parquet(buffer, index=False, engine='pyarrow')
buffer.seek(0)
today_str = (datetime.today() + timedelta(days=1)).strftime('%Y-%m-%d')
timestamp_str = datetime.today().strftime('%Y%m%d_%H%M%S')
folder_path = f"{s3_prefix}/{file_label}/insertion_date={today_str}/"
file_name = f"{file_label}-{timestamp_str}.parquet"
s3_path = folder_path + file_name
s3 = boto3.client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key
)
s3.upload_fileobj(buffer, s3_bucket_name, s3_path)
logging.info(f"Uploaded to s3://{s3_bucket_name}/{s3_path}")
except Exception as e:
logging.error(f"Error uploading to S3 for {file_label}: {e}")
raise
def main(aws_access_key_id, aws_secret_access_key, s3_bucket_name, s3_prefix,
host, port, database, username, password, start_date=None, end_date=None,day=None):
if day is not None and start_date is None and end_date is None:
# Calculate 'current date - 1 day'
current_date = datetime.now().date() # Get only the date part
calculated_date = current_date - timedelta(days=int(day)) # Use the 'day' parameter here
start_date = calculated_date
end_date = calculated_date
elif start_date is not None and end_date is None:
# If only start_date is given, assume end_date is the same
end_date = start_date
try:
conn = psycopg2.connect(
dbname=database,
user=username,
password=password,
host=host,
port=port
)
logging.info("Connected to PostgreSQL.")
queries = [
{
"label": "sale",
"sql": fr""" select column1,2,3..... from sale
WHERE s.transaction_date::date BETWEEN DATE '{start_date}' AND DATE '{end_date}' """
},
{
"label": "sale_item",
"sql":fr""" select ........... """
},
{
"label": "transaction",
"sql":f""" ....."""}
]
for query_info in queries:
label = query_info["label"]
sql = query_info["sql"]
logging.info(f"Running query for: {label}")
df = extract_data_from_postgres(conn, sql)
if df is None or df.empty:
logging.warning(f"No data found for: {label}")
continue
upload_to_s3(df, aws_access_key_id, aws_secret_access_key, s3_bucket_name, s3_prefix, label)
except Exception as e:
logging.error(f"Error in main(): {e}")
raise
finally:
if 'conn' in locals():
conn.close()
logging.info("PostgreSQL connection closed.")A.2. Amazon S3 to PostgreSQL
Last updated