The project consists of two parts: flight delay model that predicts the chances missed-connecton, and the second part that, in case of missed-connection, finds an alternate route for booking. The prediction model accounts only for the delay due to the weather conditons. The route retrieval uses graph algorithm to find the shortest route.
Dataset
1. Domain: Travel Disruption Management
The complete inference engine encompasses the following three micro-services.
-
Flight State Monitor: This service is responsible with handling and serving the data to the inference model periodically. It includes the following.
- Registers traveler’s flight detail
- Polls weather details for the second leg o journey at the required time.
- Perform the TL data pipeline phase.
- Send the combined flight detail and weather report to the Risk inference model.
-
Risk Inference
- Takes flight detail and weather report
- Predicts the delay probability scores.
-
Re-accommodation
- Flight detail input
- Returns alternative route for the second leg of the journey.
The following is a flowchart of the inference pipeline.
%%{init: {'graph': {'rankSpacing': 10, 'nodeSpacing': 10}}}%%
graph TB
subgraph Container
direction LR
subgraph Mpipeline[Model Pipeline]
ETL[ETL]
Model1[Risk Inference]
Model2[Route Retrieval]
end
subgraph DataLake["Data Lake (S3/GCS)"]
Storage1[Raw CSVs]
ModelWeights[Model Weights]
end
subgraph Serving[Serving Layer]
Storage2[(PostgreSQL)]
API[FastAPI Interface]
end
ModelWeights -.-> Model1
ModelWeights -.-> Model2
Storage1 --> ETL
ETL --> Model1
Model1 -- "p >= 0.8" --> Model2
Model2 --> Storage2
Model1 -- "p < 0.8" --> Storage2
Storage2 --> API
end
subgraph Baseplate[" Airflow "]
end
Container ~~~ Baseplate
style Container fill:#fff,stroke:#fff;
%%style Baseplate fill:#f9f9f9;
1.2. ETL Pipeline
The ETL pipeline is as follows
graph TD
subgraph etl[Production ETL Pipeline]
direction TB
storage1["S3/GCS"]
subgraph flight[Flight ETL]
flight_storage_bronze[("Bronze (Flight)")]
flight_extract[Flight Extract]
val[Schema Validation]
clean[Clean & Deduplicate]
discarded["Discarded Data &<br> Audit Logs"]
flight_storage_silver[("Silver (Flight)")]
flight_transform[Flight Transform & <br>Feature Engineering]
end
flight_storage_garbage[("Discarded (S3/GCS)")]
flight_storage_gold[("Gold (Flight)")]
airport_api[airports API]
subgraph airport_etl[Airport ETL]
airport_codes[Airport Codes]
airports[Airports Location]
end
weather_api[openmeteo API]
subgraph weather[Weather ETL]
weather_extract[Weather Extract]
weather_storage_bronze[("Bronze (Weather)")]
weather_transform[Weather Clean and<br> Transform]
end
weather_storage_silver[("Silver (Weather)")]
flight_weather[Combine & Join]
flight_weather_storage_gold[("Gold (Joint Flight <br>and Weather)")]
indexing[Index Categories]
scaling[Scaling &<br> Cyclical Encoding]
storage2[(PostgreSQL / <br>Feature Store)]
storage1 --"S3/BigQuery Select"--> flight_extract
flight_extract --> flight_storage_bronze
flight_storage_bronze --"File path" --> val
weather_api --> weather_extract
airport_api --> airports
val -- "Pass - File path" --> clean
clean --> flight_storage_silver
val -- "Fail/Alert" --> discarded
clean --> discarded
flight_storage_silver --"File path"--> flight_transform
flight_transform --> flight_storage_gold
flight_storage_gold --> airport_codes
discarded --> flight_storage_garbage
airport_codes --> airports
airports --> weather_extract
weather_extract --> weather_storage_bronze
weather_storage_bronze --"File path"--> weather_transform
weather_transform --> weather_storage_silver
weather_storage_silver --"File path"--> flight_weather
flight_storage_gold --"File path"--> flight_weather
flight_weather --> flight_weather_storage_gold
flight_weather_storage_gold --"File path"--> indexing
indexing --> scaling
indexing --"Categories"--> storage2
scaling --> storage2
end
sequenceDiagram
participant er as Error Log
participant wr as Warning Log
participant os as S3/GCS
participant extract as Extract
participant flight_sch_val as Schema Validation
participant flight_clean as Clean and De-duplication
os->>extract: S3/BigQuery Select
alt File not found / Connection failed
extract ->> er: Error occured
else Warning
extract ->> wr: Columns missing
else File found
extract ->> os: Bronze (Flight)
os ->> flight_sch_val: Bronze (Flight)
end
kanban
column1[Extract]
task1[Input: File path]
task2[Output: File path]
task3[Error: <br>1. Unsupported file type<br> 2. Failed to load file <br> 3. Connection failed]
1.3. The Risk Inference Model
Following are the stages for the predictor pipeline. Data extraction, Data cleaning, Data processing, model selection, model training, and deployment.
Data sources
- A static raw dataset from Kaggle that consists of past flight details for a subset of airports in US. The dataset consists of aircraft details, and the journey detail like the route and the flight time, etc. The dataset is unprocessed.
- Airport API to find the location of airports (latitude and longitude) that are present in flight detail dataset.
- Open-meteo API to find the weather detail at the airport locations around the flight time.
1.4. Routing Model
Graph Vertices - IATA codes Edges - Dataframe indices
Setting up File Directory
The file structure for the project is created as follows.
File Structure
Delay
├── config
│ ├── config.yaml
│ ├── data_dependency.yaml
│ └── schema.yaml
├── dags
├── data
│ ├── airlines.csv
│ ├── airports.csv
│ ├── bronze
│ ├── flights.csv
│ ├── gold
│ └── silver
├── docker
│ ├── Dockerfile.etl
│ └── Dockerfile.ml
├── logs
├── pyproject.toml
├── README.md
├── requirements.txt
├── src
│ ├── etl
│ │ ├── airports.py
│ │ ├── date_encoding.py
│ │ ├── flight
│ │ │ ├── clean.py
│ │ │ ├── extract.py
│ │ │ ├── __init__.py
│ │ │ ├── schema_validation.py
│ │ │ └── transform.py
│ │ ├── index_categories.py
│ │ ├── __init__.py
│ │ ├── join.py
│ │ ├── scale.py
│ │ └── weather
│ │ ├── clean.py
│ │ ├── extract.py
│ │ ├── __init__.py
│ │ └── transform.py
│ ├── models
│ │ └── __init__.py
│ ├── retriever
│ │ └── __init__.py
│ └── utils
│ ├── config_loader.py
│ ├── constants.py
│ ├── db_client.py
│ ├── __init__.py
│ ├── loaders.py
│ └── settings.py
├── structure.md
└── tests
├── etl_pipeline.py
├── flight_delay.ipynb
├── test_etl.py
└── test_models.py
Iteration 1 Deliverable
- Working data, model, and routing pipeline
- Airflow orchestration
- Docker containerization the inference model
- FastAPI interface for streaming Iteration 2 Deliverable
- Feature engineer several features that improve accuracy
- Compare few models to choose from
- Perform evaluation of model performance for regression and distributional regression to introduce uncertainty modelling.
- Statistical characterization of airplane delays
- Dynamically forecasting airline departure delay probability distributions for individual flights using supervised learning
- Analyzing and Predicting Airline Delays: A Comprehensive Data Science Approach
- Estimating Flight Departure Delay Distributions - a Statistical Approach with Long-Term Trend and Short-Term Pattern
- Universal patterns in passenger flight departure delays
1. Data Pipeline
The data pipeline consists of the following stages.
- Data Source
- Flight - Local Storage
- Airport locations - API
- Airport weather - API
- Generic Data Cleaning
- Schema validation
- Filter columns
- Drop Cancelled / Diverted
- Deduplicate
- Type mismatch
- Categorical Standardization
- Typos
- Value range consistency
- Value format consistency
- Cross-column consistency
- Deduplicate
- Outlier detection
- Null values
- Missing values for continuous feature can be replaced by its mean, whereas for categorical feature by 0 A hybrid machine learning-based model for predicting flight delay through aviation big data.
- Data Processing
- Recalculate
SCH_ARI_MINandARI_MIN.
- Recalculate
- Feature Engineering
- Decomposition of
SCH_DEPARTURE,DEPARTURE_TIME,SCHEDULE_ARRIVAL, andARRIVAL_TIMEtoSCH_DEP_HOUR, SCH_DEP_MIN,DEP_HOUR, DEP_MIN,SCH_ARI_HOUR, SCH_ARI_MIN, andARI_HOUR, ARI_MIN. SCH_DEP_DATEandSCH_ARI_DATE.
- Decomposition of
1.1. Flight Data
We start with the flight data. The dataset consists of a total of
1.1.1. Flight Data Extraction
The flight data is present in the local storage. To avoid unnecessary memory usage, we use Select on Read method to load only those features that are used in the ETL pipeline and are required by the model. We do so by passing a list of columns to read to the read_csv() method.
def loadFlightData(filepath: str, columns: list = None) -> pd.DataFrame:
if columns is None:
df = pd.read_csv(filepath)
else:
df = pd.read_csv(filepath, usecols=columns)
return df
and passing
load_columns = """YEAR
MONTH
DAY
AIRLINE
FLIGHT_NUMBER
TAIL_NUMBER
ORIGIN_AIRPORT
DESTINATION_AIRPORT
SCHEDULED_DEPARTURE
DEPARTURE_TIME
DEPARTURE_DELAY
SCHEDULED_TIME
ELAPSED_TIME
AIR_TIME
DISTANCE
SCHEDULED_ARRIVAL
ARRIVAL_TIME
ARRIVAL_DELAY
CANCELLED"""
extractFlightData(filepath, columns=load_columns.split())
1.1.2. Flight Data Processing
-
Cancelled Flights Since we are dealing with the delay prediction model, we drop the rows for which flights are cancelled. The total percentage of cancelled flights is
The number of cancelled flights is very small compared to the total number of flights. So, we can safely drop the cancelled flights. Cancelled flights are dropped by using CANCELLED==0in thequery()method to select only flights are not cancelled. -
Null Values
-
Type Mismatching Columns may contain entries of type different than the type prescribed in schema. The schema contains types of either strings or numeric. We separately scan through each of the two types of columns and drop rows which differ in the data type. The schema of the dataset is given below.
| Column | Format |
|---|---|
YEAR | int8 |
MONTH | int8 |
DAY | int8 |
AIRLINE | string - 2 uppercase characters |
FLIGHT_NUMBER | int16 - 4 digit number |
TAIL_NUMBER | string - 6 digit alphanumeric |
ORIGIN_AIRPORT | string - 3 uppercase characters |
DESTINATION_AIRPORT | string - 3 uppercase characters |
SCHEDULED_DEPARTURE | float32 - first two digits for hour, last two digits for minutes |
DEPARTURE_TIME | float32 - first two digits for hour, last two digits for minutes |
DEPARTURE_DELAY | float32 - minutes |
SCHEDULED_TIME | float32 - minutes |
ELAPSED_TIME | float32 - minutes |
AIR_TIME | float32 - minute |
DISTANCE | float32 - km |
SCHEDULED_ARRIVAL | float32 - first two digits for hour, last two digits for minutes |
ARRIVAL_TIME | float32 - first two digits for hour, last two digits for minutes |
ARRIVAL_DELAY | float32 - minutes |
CANCELLED | int8 |
The rows with entries that fail to match the schema are selected using pd.to_numeric(df[column], errors="coerce").isna() and dropped subsequently. For the string datatype columns, the selection code is negated with ~ to catch numeric entries. |
The actual formats of the columns are as follows.
| AIRLINE | object |
|---|---|
| FLIGHT_NUMBER | int64 |
| TAIL_NUMBER | object |
| ORIGIN_AIRPORT | object |
| DESTINATION_AIRPORT | object |
| SCHEDULED_DEPARTURE | int64 |
| DEPARTURE_TIME | float64 |
| DEPARTURE_DELAY | float64 |
| AIR_TIME | float64 |
| DISTANCE | int64 |
| SCHEDULED_ARRIVAL | int64 |
| ARRIVAL_TIME | float64 |
| ARRIVAL_DELAY | float64 |
| WEATHER_DELAY | float64 |
| DATE | datetime64[ns] |
Creating a New Feature
- Time Decomposition
The time schedule and actual times of arrival are stored as float. The float value is interpreted in four digit format. The first two digit as hour and the other two digits as minutes. We split
SCHEDULED_DEPARTUREandDEPARTURE_TIMEintoSCH_DEP_HOUR,SCH_DEP_MIN,DEP_HOUR, andDEP_MIN. Similarly forSCHEDULED_ARRIVALandARRIVAL_TIME. - Date Reconstruction
We combine
YEAR, MONTH, DAY, SCH_DEP_HOUR, SCH_DEP_MINto create dateSCH_DEP_DATEfeature using the pandas datetime objects. We estimate the arrival date by adding the flight duration to theSCH_DEP_DATE. For the present, we have ignore the timezone difference.
1.1.3. Airport Location Data
1.2. Weather Data
1.2.1. Weather Data Extraction
We use the Open-Meteo API to fetch historical hourly weather. Because of API rate limits, the pipeline implements a Rate Limiter and Splitter.
- Weight Calculation: API weight is determined by
nLocations * (nDays / 14) * (nVariables / 10). - Request Strategy: The system calculates
numDataSplitsto stay under the 600/min limit and usestime.sleepto respect the API’s window.
1.2.2. Weather Data Processing
The raw API responses are expanded into a MultiIndex DataFrame (IATA, DATE, HOUR).
- Hourly Alignment: Features like
TEMPERATURE_2MandWIND_SPEED_10Mare mapped to the specific hour of departure/arrival.
1.3. Complete Data Pipeline
- The Weather Join: We perform a “Dual Join.” The flight record is enriched with weather from the Origin Airport at departure time and the Destination Airport at arrival time.
- Categorical Encoding: Final string columns (
AIRLINE,AIRPORT) are converted toCategoricalDtypeand mapped to integer codes to prepare for the Machine Learning model.
Data Pipeline - Cycle 2
Metadata-Driven Pipeline We adhere to the standard strategy of separating the data from the logic. We do so by moving the explicit column names, or any parameter values used in the code to config files.
As a first step, we move the columns to load to the config file by defining the following in the config.yaml file.
pipeline:
load_columns:
- 'YEAR'
- 'MONTH'
- 'DAY'
- 'AIRLINE'
- 'FLIGHT_NUMBER'
- 'TAIL_NUMBER'
- 'ORIGIN_AIRPORT'
- 'DESTINATION_AIRPORT'
- 'SCHEDULED_DEPARTURE'
- 'DEPARTURE_TIME'
- 'DEPARTURE_DELAY'
- 'SCHEDULED_TIME'
- 'ELAPSED_TIME'
- 'AIR_TIME'
- 'DISTANCE'
- 'SCHEDULED_ARRIVAL'
- 'ARRIVAL_TIME'
- 'ARRIVAL_DELAY'
- 'CANCELLED'
We can load the file and read the load_columns as
config = load_yaml(config_path)
load_columns = config["pipeline"]["load_columns"]
Improving efficiency of dropTypeMismatchRows()
Change 1: Instead of hardcoding the string and numeric columns in the function, we pass the schema of the data and let the function create the list of numeric and string columns using the following code.
string_columns = []
numeric_columns = []
for key in schema:
if pd.api.types.is_numeric_dtype(pd.api.types.pandas_dtype(schema[key])):
numeric_columns.append(key)
else:
string_columns.append(key)
The previous version used index dropping the mismatch rows for every column. This creates a memory bottleneck as the drop creates a copy of the dataset with bad rows removed. Also, the data is reassigned from the copy creating a performance slowdown. The astype() call becomes unnecessary after the data is already loaded with the specified schema.
To overcome the problems, we can use boolean masking for the mismatch data so that the a boolean series keeps track of the rows with type mismatch during the column iteration.
is_bad = pd.Series(False, index=df.index)
for col in df.columns:
is_numeric = pd.to_numeric(df[col], errors='coerce').notna()
if col in string_columns:
is_bad |= is_numeric
elif col in numeric_columns:
is_bad |= ~is_numeric
Once, the columns are scanned, the boolean mask can be used to split the data into clean and corrupted data using df[~is_bad], df[is_bad]. The discarded data is appended to discarded_list, which is passes to the function as argument, using
if discarded_list is not None:
discarded_list.append(df[is_bad].copy())
Fixing Path Problems Instead of manually writing the paths, we let the python create the path of the files that are needed.
- The paths in the config files should be relative to the project folder. For instance the data path is
raw_path: "data/flights.csv". - A separate python file
constants.pyin theutilsfolder finds the absolute path of project folder as
Data and Config directories path are computed asPROJECT_ROOT = Path(__file__).parent.parent.parent.resolve()DATA_DIR = PROJECT_ROOT / "data" CONFIG_DIR = PROJECT_ROOT / "config"
File Loading Utilities
Functions to load the files in the config folder are defined in utils/loaders.py.
def getConfigResourcePath(file_name):
config_path = CONFIG_DIR / f"{file_name}.yaml"
if config_path.exists():
return config_path
else:
raise FileNotFoundError(f"Missing {file_name} at {config_path}")
Pipeline
graph LR
subgraph Inference Pipeline
direction LR
data[Flight]
airport[Airport]
weather[Weather]
dp[Data Pipeline]
gold[(Gold)]
model[Model]
routing[Routing]
interface[Interface]
data --"fl_m_cols"--> dp
airport --> dp
weather --"w_n_cols"--> dp
dp --"fl_w_p_cols"--> gold
gold --"fl_w_p_cols"--> model
model --"p_{risk} < p_0"--> interface
model --"p_{risk} >= p_0"--> routing
routing --> interface
gold --"fl_q_cols"--> routing
end