Implementation Steps

Step 1: Database Connection Setup

The pipeline uses psycopg2 to connect to PostgreSQL. Please ensure:

  • Your database is accessible from your execution environment.

  • It has the required tables and views.

  • User has SELECT permissions on target tables.

Step 2: Query Configuration

Define your queries in the queries list within the main() function:

queries =

[
 {
        "label": "table_name",  # Used for S3 file naming
        "sql": """
            SELECT column1, column2, column3
            FROM your_schema.your_table
            WHERE date_column::date BETWEEN '{start_date}' AND '{end_date}'
        """
    }
]
circle-info

Important Query Guidelines:

  • Use f-string formatting with {start_date} and {end_date} placeholders.

  • Include proper date filtering for incremental loads.

  • Use meaningful labels that will become S3 file prefixes.

  • Ensure queries are optimized for performance.

  • Handle NULL values appropriately in transformations.

Step 3: S3 Structure Planning

The pipeline creates the following S3 structure:

s3://bucket-name/

└── s3-prefix/

└── table-label/

└── insertion_date=YYYY-MM-DD/

└── table-label-YYYYMMDD_HHMMSS.parquet

Step 4: Date Parameter Configuration

The pipeline supports three date modes

  1. Relative Date (using day parameter):

main(..., day=1) # Yesterday's data

  1. Date Range:

main(..., start_date='2024-01-01', end_date='2024-01-31')

  1. Single Date:

main(..., start_date='2024-01-01') # end_date defaults to start_date

Last updated