Wed Mar 18 2026

The project consists of two parts: flight delay model that predicts the chances missed-connecton, and the second part that, in case of missed-connection, finds an alternate route for booking. The prediction model accounts only for the delay due to the weather conditons. The route retrieval uses graph algorithm to find the shortest route.

Dataset

1. Domain: Travel Disruption Management

The complete inference engine encompasses the following three micro-services.

  • Flight State Monitor: This service is responsible with handling and serving the data to the inference model periodically. It includes the following.

    1. Registers traveler’s flight detail
    2. Polls weather details for the second leg o journey at the required time.
    3. Perform the TL data pipeline phase.
    4. Send the combined flight detail and weather report to the Risk inference model.
  • Risk Inference

    1. Takes flight detail and weather report
    2. Predicts the delay probability scores.
  • Re-accommodation

    1. Flight detail input
    2. Returns alternative route for the second leg of the journey.

The following is a flowchart of the inference pipeline.

%%{init: {'graph': {'rankSpacing': 10, 'nodeSpacing': 10}}}%%
graph TB

subgraph Container
direction LR
	subgraph Mpipeline[Model Pipeline]
		ETL[ETL]
		Model1[Risk Inference]
		Model2[Route Retrieval]
	end
	subgraph DataLake["Data Lake (S3/GCS)"]
		Storage1[Raw CSVs]
		ModelWeights[Model Weights]
	end
	subgraph Serving[Serving Layer]
		Storage2[(PostgreSQL)]
		API[FastAPI Interface]
	end

	ModelWeights -.-> Model1
	ModelWeights -.-> Model2
	Storage1 --> ETL
	ETL --> Model1
	Model1 -- "p >= 0.8" --> Model2
	Model2 --> Storage2
	Model1 -- "p < 0.8" --> Storage2
	Storage2 --> API
end
subgraph Baseplate["&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Airflow&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"]
end

Container ~~~ Baseplate

style Container fill:#fff,stroke:#fff;
%%style Baseplate fill:#f9f9f9;

1.2. ETL Pipeline

The ETL pipeline is as follows

graph TD
    subgraph etl[Production ETL Pipeline]
		direction TB
	    storage1["S3/GCS"]
	    subgraph flight[Flight ETL]
	        flight_storage_bronze[("Bronze (Flight)")]
	        flight_extract[Flight Extract]
	        val[Schema Validation]
	        clean[Clean & Deduplicate]
	        discarded["Discarded Data &<br> Audit Logs"]
	        flight_storage_silver[("Silver (Flight)")]
	        flight_transform[Flight Transform & <br>Feature Engineering]
	    end
	    
        flight_storage_garbage[("Discarded (S3/GCS)")]
        flight_storage_gold[("Gold (Flight)")]
        airport_api[airports API]
        subgraph airport_etl[Airport ETL]
	        airport_codes[Airport Codes]
	        airports[Airports Location]
        end
        
        weather_api[openmeteo API]
        subgraph weather[Weather ETL]
	        weather_extract[Weather Extract]
	        weather_storage_bronze[("Bronze (Weather)")]
	        weather_transform[Weather Clean and<br> Transform]
	    end
        weather_storage_silver[("Silver (Weather)")]
        
        flight_weather[Combine & Join]
        flight_weather_storage_gold[("Gold (Joint Flight <br>and Weather)")]
        indexing[Index Categories]
        scaling[Scaling &<br> Cyclical Encoding]
        storage2[(PostgreSQL / <br>Feature Store)]
        
        storage1 --"S3/BigQuery Select"--> flight_extract
        flight_extract --> flight_storage_bronze
        flight_storage_bronze --"File path" --> val
        weather_api --> weather_extract
        airport_api --> airports
        val -- "Pass - File path" --> clean
        clean --> flight_storage_silver
        val -- "Fail/Alert" --> discarded
        clean --> discarded
        
        flight_storage_silver --"File path"--> flight_transform
        flight_transform --> flight_storage_gold
        flight_storage_gold --> airport_codes
        discarded --> flight_storage_garbage
        airport_codes --> airports
        airports --> weather_extract
        weather_extract --> weather_storage_bronze
        weather_storage_bronze --"File path"--> weather_transform
        weather_transform --> weather_storage_silver
        weather_storage_silver --"File path"--> flight_weather
        flight_storage_gold --"File path"--> flight_weather
        flight_weather --> flight_weather_storage_gold
        
        flight_weather_storage_gold --"File path"--> indexing
        indexing --> scaling
        indexing --"Categories"--> storage2
        scaling --> storage2
    end
sequenceDiagram
	participant er as Error Log
	participant wr as Warning Log
	participant os as S3/GCS
	participant extract as Extract
	participant flight_sch_val as Schema Validation
	participant flight_clean as Clean and De-duplication
	
	os->>extract: S3/BigQuery Select
	alt File not found / Connection failed
		extract ->> er: Error occured
	else Warning
		extract ->> wr: Columns missing
	else File found
		extract ->> os: Bronze (Flight)
		os ->> flight_sch_val: Bronze (Flight)
	end
kanban
  column1[Extract]
    task1[Input: File path]
    task2[Output: File path]
    task3[Error: <br>1. Unsupported file type<br> 2. Failed to load file <br> 3. Connection failed]

1.3. The Risk Inference Model

Following are the stages for the predictor pipeline. Data extraction, Data cleaning, Data processing, model selection, model training, and deployment.

Data sources

  • A static raw dataset from Kaggle that consists of past flight details for a subset of airports in US. The dataset consists of aircraft details, and the journey detail like the route and the flight time, etc. The dataset is unprocessed.
  • Airport API to find the location of airports (latitude and longitude) that are present in flight detail dataset.
  • Open-meteo API to find the weather detail at the airport locations around the flight time.

1.4. Routing Model

Graph Vertices - IATA codes Edges - Dataframe indices

Setting up File Directory

The file structure for the project is created as follows.

File Structure

Delay
├── config
│   ├── config.yaml
│   ├── data_dependency.yaml
│   └── schema.yaml
├── dags
├── data
│   ├── airlines.csv
│   ├── airports.csv
│   ├── bronze
│   ├── flights.csv
│   ├── gold
│   └── silver
├── docker
│   ├── Dockerfile.etl
│   └── Dockerfile.ml
├── logs
├── pyproject.toml
├── README.md
├── requirements.txt
├── src
│   ├── etl
│   │   ├── airports.py
│   │   ├── date_encoding.py
│   │   ├── flight
│   │   │   ├── clean.py
│   │   │   ├── extract.py
│   │   │   ├── __init__.py
│   │   │   ├── schema_validation.py
│   │   │   └── transform.py
│   │   ├── index_categories.py
│   │   ├── __init__.py
│   │   ├── join.py
│   │   ├── scale.py
│   │   └── weather
│   │       ├── clean.py
│   │       ├── extract.py
│   │       ├── __init__.py
│   │       └── transform.py
│   ├── models
│   │   └── __init__.py
│   ├── retriever
│   │   └── __init__.py
│   └── utils
│       ├── config_loader.py
│       ├── constants.py
│       ├── db_client.py
│       ├── __init__.py
│       ├── loaders.py
│       └── settings.py
├── structure.md
└── tests
    ├── etl_pipeline.py
    ├── flight_delay.ipynb
    ├── test_etl.py
    └── test_models.py

Iteration 1 Deliverable

1. Data Pipeline

The data pipeline consists of the following stages.

  1. Data Source
    1. Flight - Local Storage
    2. Airport locations - API
    3. Airport weather - API
  2. Generic Data Cleaning
    1. Schema validation
    2. Filter columns
    3. Drop Cancelled / Diverted
    4. Deduplicate
    5. Type mismatch
    6. Categorical Standardization
    7. Typos
    8. Value range consistency
    9. Value format consistency
    10. Cross-column consistency
    11. Deduplicate
    12. Outlier detection
    13. Null values
  3. Data Processing
    1. Recalculate SCH_ARI_MIN and ARI_MIN.
  4. Feature Engineering
    1. Decomposition of SCH_DEPARTURE, DEPARTURE_TIME, SCHEDULE_ARRIVAL, and ARRIVAL_TIME to SCH_DEP_HOUR, SCH_DEP_MIN, DEP_HOUR, DEP_MIN, SCH_ARI_HOUR, SCH_ARI_MIN, and ARI_HOUR, ARI_MIN.
    2. SCH_DEP_DATE and SCH_ARI_DATE.

1.1. Flight Data

We start with the flight data. The dataset consists of a total of samples and features.

1.1.1. Flight Data Extraction

The flight data is present in the local storage. To avoid unnecessary memory usage, we use Select on Read method to load only those features that are used in the ETL pipeline and are required by the model. We do so by passing a list of columns to read to the read_csv() method.

def loadFlightData(filepath: str, columns: list = None) -> pd.DataFrame:
	if columns is None:
		df = pd.read_csv(filepath)
	else:
		df = pd.read_csv(filepath, usecols=columns)
	return df

and passing

load_columns = """YEAR
MONTH
DAY
AIRLINE
FLIGHT_NUMBER
TAIL_NUMBER
ORIGIN_AIRPORT
DESTINATION_AIRPORT
SCHEDULED_DEPARTURE
DEPARTURE_TIME
DEPARTURE_DELAY
SCHEDULED_TIME
ELAPSED_TIME
AIR_TIME
DISTANCE
SCHEDULED_ARRIVAL
ARRIVAL_TIME
ARRIVAL_DELAY
CANCELLED"""

extractFlightData(filepath, columns=load_columns.split())

1.1.2. Flight Data Processing

  • Cancelled Flights Since we are dealing with the delay prediction model, we drop the rows for which flights are cancelled. The total percentage of cancelled flights is The number of cancelled flights is very small compared to the total number of flights. So, we can safely drop the cancelled flights. Cancelled flights are dropped by using CANCELLED==0 in the query() method to select only flights are not cancelled.

  • Null Values

  • Type Mismatching Columns may contain entries of type different than the type prescribed in schema. The schema contains types of either strings or numeric. We separately scan through each of the two types of columns and drop rows which differ in the data type. The schema of the dataset is given below.

ColumnFormat
YEARint8
MONTHint8
DAYint8
AIRLINEstring - 2 uppercase characters
FLIGHT_NUMBERint16 - 4 digit number
TAIL_NUMBERstring - 6 digit alphanumeric
ORIGIN_AIRPORTstring - 3 uppercase characters
DESTINATION_AIRPORTstring - 3 uppercase characters
SCHEDULED_DEPARTUREfloat32 - first two digits for hour, last two digits for minutes
DEPARTURE_TIMEfloat32 - first two digits for hour, last two digits for minutes
DEPARTURE_DELAYfloat32 - minutes
SCHEDULED_TIMEfloat32 - minutes
ELAPSED_TIMEfloat32 - minutes
AIR_TIMEfloat32 - minute
DISTANCEfloat32 - km
SCHEDULED_ARRIVALfloat32 - first two digits for hour, last two digits for minutes
ARRIVAL_TIMEfloat32 - first two digits for hour, last two digits for minutes
ARRIVAL_DELAYfloat32 - minutes
CANCELLEDint8
The rows with entries that fail to match the schema are selected using pd.to_numeric(df[column], errors="coerce").isna() and dropped subsequently. For the string datatype columns, the selection code is negated with ~ to catch numeric entries.

The actual formats of the columns are as follows.

AIRLINEobject
FLIGHT_NUMBERint64
TAIL_NUMBERobject
ORIGIN_AIRPORTobject
DESTINATION_AIRPORTobject
SCHEDULED_DEPARTUREint64
DEPARTURE_TIMEfloat64
DEPARTURE_DELAYfloat64
AIR_TIMEfloat64
DISTANCEint64
SCHEDULED_ARRIVALint64
ARRIVAL_TIMEfloat64
ARRIVAL_DELAYfloat64
WEATHER_DELAYfloat64
DATEdatetime64[ns]

Creating a New Feature

  • Time Decomposition The time schedule and actual times of arrival are stored as float. The float value is interpreted in four digit format. The first two digit as hour and the other two digits as minutes. We split SCHEDULED_DEPARTURE and DEPARTURE_TIME into SCH_DEP_HOUR, SCH_DEP_MIN, DEP_HOUR, and DEP_MIN. Similarly for SCHEDULED_ARRIVAL and ARRIVAL_TIME.
  • Date Reconstruction We combine YEAR, MONTH, DAY, SCH_DEP_HOUR, SCH_DEP_MIN to create date SCH_DEP_DATE feature using the pandas datetime objects. We estimate the arrival date by adding the flight duration to the SCH_DEP_DATE. For the present, we have ignore the timezone difference.

1.1.3. Airport Location Data

1.2. Weather Data

1.2.1. Weather Data Extraction

We use the Open-Meteo API to fetch historical hourly weather. Because of API rate limits, the pipeline implements a Rate Limiter and Splitter.

  • Weight Calculation: API weight is determined by nLocations * (nDays / 14) * (nVariables / 10).
  • Request Strategy: The system calculates numDataSplits to stay under the 600/min limit and uses time.sleep to respect the API’s window.

1.2.2. Weather Data Processing

The raw API responses are expanded into a MultiIndex DataFrame (IATA, DATE, HOUR).

  • Hourly Alignment: Features like TEMPERATURE_2M and WIND_SPEED_10M are mapped to the specific hour of departure/arrival.

1.3. Complete Data Pipeline

  • The Weather Join: We perform a “Dual Join.” The flight record is enriched with weather from the Origin Airport at departure time and the Destination Airport at arrival time.
  • Categorical Encoding: Final string columns (AIRLINE, AIRPORT) are converted to CategoricalDtype and mapped to integer codes to prepare for the Machine Learning model.

Data Pipeline - Cycle 2

Metadata-Driven Pipeline We adhere to the standard strategy of separating the data from the logic. We do so by moving the explicit column names, or any parameter values used in the code to config files.

As a first step, we move the columns to load to the config file by defining the following in the config.yaml file.

pipeline:
load_columns:
- 'YEAR'
- 'MONTH'
- 'DAY'
- 'AIRLINE'
- 'FLIGHT_NUMBER'
- 'TAIL_NUMBER'
- 'ORIGIN_AIRPORT'
- 'DESTINATION_AIRPORT'
- 'SCHEDULED_DEPARTURE'
- 'DEPARTURE_TIME'
- 'DEPARTURE_DELAY'
- 'SCHEDULED_TIME'
- 'ELAPSED_TIME'
- 'AIR_TIME'
- 'DISTANCE'
- 'SCHEDULED_ARRIVAL'
- 'ARRIVAL_TIME'
- 'ARRIVAL_DELAY'
- 'CANCELLED'

We can load the file and read the load_columns as

config = load_yaml(config_path)
load_columns = config["pipeline"]["load_columns"]

Improving efficiency of dropTypeMismatchRows() Change 1: Instead of hardcoding the string and numeric columns in the function, we pass the schema of the data and let the function create the list of numeric and string columns using the following code.

string_columns = []
numeric_columns = []

for key in schema:
	if pd.api.types.is_numeric_dtype(pd.api.types.pandas_dtype(schema[key])):
		numeric_columns.append(key)
	else:
		string_columns.append(key)

The previous version used index dropping the mismatch rows for every column. This creates a memory bottleneck as the drop creates a copy of the dataset with bad rows removed. Also, the data is reassigned from the copy creating a performance slowdown. The astype() call becomes unnecessary after the data is already loaded with the specified schema.

To overcome the problems, we can use boolean masking for the mismatch data so that the a boolean series keeps track of the rows with type mismatch during the column iteration.

is_bad = pd.Series(False, index=df.index)
for col in df.columns:
	is_numeric = pd.to_numeric(df[col], errors='coerce').notna()

	if col in string_columns:
		is_bad |= is_numeric
	elif col in numeric_columns:
		is_bad |= ~is_numeric

Once, the columns are scanned, the boolean mask can be used to split the data into clean and corrupted data using df[~is_bad], df[is_bad]. The discarded data is appended to discarded_list, which is passes to the function as argument, using

if discarded_list is not None:
	discarded_list.append(df[is_bad].copy())

Fixing Path Problems Instead of manually writing the paths, we let the python create the path of the files that are needed.

  • The paths in the config files should be relative to the project folder. For instance the data path is raw_path: "data/flights.csv".
  • A separate python file constants.py in the utils folder finds the absolute path of project folder as
    PROJECT_ROOT = Path(__file__).parent.parent.parent.resolve()
    Data and Config directories path are computed as
    DATA_DIR = PROJECT_ROOT / "data"
    CONFIG_DIR = PROJECT_ROOT / "config"

File Loading Utilities Functions to load the files in the config folder are defined in utils/loaders.py.

def getConfigResourcePath(file_name):
	config_path = CONFIG_DIR / f"{file_name}.yaml"
	if config_path.exists():
		return config_path
	else:
		raise FileNotFoundError(f"Missing {file_name} at {config_path}")

Pipeline

graph LR
	subgraph Inference Pipeline
		direction LR
		data[Flight]
		airport[Airport]
		weather[Weather]
		dp[Data Pipeline]
		gold[(Gold)]
		model[Model]
		routing[Routing]
		interface[Interface]

		data --"fl_m_cols"--> dp
		airport --> dp
		weather --"w_n_cols"--> dp
		dp --"fl_w_p_cols"--> gold
		gold --"fl_w_p_cols"--> model
		model --"p_{risk} < p_0"--> interface
		model --"p_{risk} >= p_0"--> routing
		routing --> interface
		gold --"fl_q_cols"--> routing
	end