Thu Apr 16 2026

Sources:

1. Architecting the Pipeline

The core requireement of any pipeline is scaling in two dimensions. A production system scales in data size in the vertical dimension, and the service types in the horizontal axis. So, scaling an application consists of scaling it horizontally or vertically.

An application’s core lifecycle encompasses its development and deployment stages. An application routinely undergoes through this lifecycle. As the application scales, it becomes critically important to make the lifecycle compatible with the scaling requirement so that the app lifecycle progresses independent of the scaling.

The Twelve-Factor-App principles dictate the design principle for applications to meet these two scaling demands by designing applications that can be repeated along the hroizontal or vertical dimensions without requiring a single application to grow in size. Each applications are loosely coupled to each other.

The pipeline requires certain files for it’s configuration and observability. Following resources need to be placed for the pipeline to work.

  • Config file: The configuration for the pipeline is defined in a separate schema.yaml file and is placed in at /etc/etl/config/ directory.
  • Log files: The pipelin writes the logs at /etc/etl/logs/.
  • Data: The data available to the pipeline can be mounted either from cloud storage or locally. For cloud storage the bucket address need to be passed. For local storage the path of the root data directory needs to be passed.

2. The ETL Pipeline

The ETL pipeline is shown below. The complete pipeline consists of three different dataset: flight details, airport details, and the weather details. These dataset are ingested and processed at different stages of the pipeline process. The core components of the pipeline are Flight ETL, Airport ETL, Weather ETL, and Merger ETL.

The weather data comes through an external API. Since the data provided by the different vendors come in format that may differ between vendors, the weather data processing stage is best handled as an external service and is included as an adapter into the pipeline rather than being a part of the core pipeline.

There are different ways one can handle this. At interprise scale, this requires shifting the weather data processing as a separate microservice. In this project, we include the weather processing as an adapter that interfaces with the core pipeline using a data contract. The data that the weather adapter hands to the core pipeline is defined by the silver weather schema. It is the responsibility of the weather adapter to ensure the data it provides meets the silver weather schema defined by the core pipeline.

The below flowchart depicts the complete pipeline in terms of the four blocks.

graph TD
	subgraph "<b>ETL Pipeline</b>"
		direction TB
        subgraph flight_etl[" "]
		    flight_dependency["<b>Dependency/Config</b><br><br><b>Clean</b><br>- flight silver schema<br>- flight bronze filepath<br>- flight silver filepath<br>- discarded filepath<br><br><b>Transform</b><br>- flight gold schema<br>- flight silver filepath<br>- flight gold filepath"]
            flight_bronze[(Flight Bronze)]
		end
        style flight_etl stroke:none
        flight@{ shape: div-rect, label: "Flight ETL"}
		subgraph flight_store [" "]
			flight_discarded[(Flight Discarded)]
			flight_silver[(Flight Silver)]
			flight_gold[(Flight Gold)]
		end
        style flight_store stroke:none

		airport_dependency["<b>Dependency/Config</b><br><br><b>Extract</b><br>- flight gold filepath<br>- airport location for<br>weather filepath<br>- features"]
		
		airport@{ shape: div-rect, label: "Airport ETL"}
        subgraph airport_store [" "]
		airport_locations[("Airports<br>[Airport codes,<br>Latitude,<br>Longitude]")]
		airport_locations_for_weather[("[Airport Codes,<br>Date range]")]
        end
        style airport_store stroke:none

        weather_dependency["<b>Dependency/Config</b><br><br><b>Extract</b><br>- airports filepath<br>- airport location for<br>weather filepath<br>- weather bronze filepath<br>- weather features<br>- API url<br><br><b>Transform</b><br>- airport location for<br>weather filepath<br>- weather bronze filepath<br>- weather silver filepath<br>- weather features"]
        weather@{ shape: div-rect, label: "Weather Adapter"}

        subgraph weather_store[" "]
            weather_bronze[(Weather Bronze)]
            weather_silver[(weather Silver)]
        end
        style weather_store stroke:none

        merger_dependency["<b>Dependency/Config</b><br><br><b>Join</b><br>- flight gold filepath<br>- weather silver filepath<br>- flight-weather gold<br>filepath<br>- flight-weather join-on<br>feature map<br><br><b>Transform</b><br>- airport location for<br>weather filepath<br>- weather bronze filepath<br>- weather silver filepath<br>- weather features"]
		merger@{ shape: div-rect, label: "Merger"}
        subgraph target [" "]
            merger_gold[(Flight-weather Gold)]
            target_featureset[(Feature Store)]
        end
            style target stroke:none

		flight_dependency -.-> flight
		flight_bronze ==> flight
        weather_dependency -.-> weather
		flight --"Discarded"--> flight_discarded
		flight =="Cleaned data"==> flight_silver
		flight =="Transformed data"==> flight_gold
		airport_dependency -.-> airport
		flight_gold ==> airport 
		airport ==> airport_locations_for_weather

        airport_locations ==> weather
		airport_locations_for_weather ==> weather
		weather =="Raw data"==> weather_bronze
        merger_dependency -.-> merger
		weather ==> weather_silver
        
		
		weather_silver ==> merger
		merger =="Join"==> merger_gold
        flight ==> merger
		merger =="Transformed"==> target_featureset
	end
graph TD
    subgraph etl[<b>Production ETL Pipeline</b>]
		direction TB
		flight_storage_bronze[("Bronze (Flight)<br/>S3/GCS<br>(csv)")]
	    subgraph flight[Flight ETL]
			subgraph cleaning[Cleaning]
				val[Schema Validation]
				clean[Clean & Deduplicate]
			end
	        flight_storage_silver[("Silver (Flight)<br>(parquet)")]
	        flight_transform[Flight Transform & <br>Feature Engineering]
	    end
	    
        airport_api[airports API]
		weather_api[openmeteo API]
		
        flight_storage_garbage[("Discarded (S3/GCS)<br>(csv)")]
        flight_storage_gold[("Gold (Flight)<br>(parquet)")]
        subgraph airport_etl[Airport ETL]
			airport_extract[Extract<br>unique IATA]
			airports_for_weather[("Distinct IATA<br>date range<br>(json)")]
	        airports[("Airports Location<br>(parquet)")]
        end
        
        
        subgraph weather[Weather Adapter]
	        weather_extract[Weather Extract]
	        weather_storage_bronze[("Bronze (Weather)<br>(bin)")]
	        weather_transform[Weather Clean and<br> Transform]
	    end
        weather_storage_silver[("Silver (Weather)<br>(parquet)")]
        
		subgraph Merge
        	flight_weather[Join]
		end
        storage2[("Feature Store<br>(parquet)")]
        
		
        weather_api --> weather_extract
        airport_api --"If location missing"--> weather_extract
        flight_storage_bronze --"S3/BigQuery Select"--> val
		val -- "Failed Data<br>Alert" --> flight_storage_garbage
        clean -- "Failed Data<br>Alert" --> flight_storage_garbage
		val --> clean
        clean --> flight_storage_silver
        flight_storage_silver --> flight_transform
        flight_transform --> flight_storage_gold

		flight_storage_gold --"ORIGIN_AIRPORT<br>DESTINATION_AIRPORT"--> airport_extract
		airport_extract --> airports_for_weather

		airports --> weather_extract
		airports_for_weather --> weather_extract
        weather_extract --> weather_storage_bronze
        weather_storage_bronze --> weather_transform
		airports_for_weather --> weather_transform
        weather_transform --> weather_storage_silver

		weather_storage_silver --> flight_weather
        flight_storage_gold --> flight_weather
        flight_weather --> storage2
    end

Before we proceed to discuss each component in detail, we elaborate the data requirement of the complete pipeline. We follow the Medillian data storage architecture to distingish the different stages of the data during the pipeline.

As the data progresses downstream in the pipeline, it goes through several stages of transformation. The change in the data broadly consists of cleaning and transformation. The raw data is classified as Bronze layer. The data is cleaned for it’s syntactic and symantic purity. The data in crossing this stage is classified into Silver layer. The next stage is where the data is transformed to a form suitable for it’s usage either in analytics or ML ingestion. In this stage the data is often enriched with new features that are either created from the existing features or from some other data to enhance the usability of the data for further analysis or ML prediction.

In this project the data comes from three sources: flight, airport, and weather. The pipeline appends each flight details with the weather details at the time of departure and arrival of the flight at the respective airport locations. The flight details contains the airport IATA codes. The pipeline uses a separate source of data to fetch the location of the airports, which it then uses to fetch the weather details for an appropriate duration.

The data from the three sources are stored separately in the respective directories. After the data from inividual sources have been processed, the data from these sources are merged together. The final merged data is handled in the merged directory. Below, we systematize the data requirement into the directory structure we adopt in this project.

aviation-etl-storage/
├── flight/
│   ├── bronze/
│   │   ├── bronze_flight_TIMESTAMP.csv
│   ├── gold/
│   │   ├── gold_flight_TIMESTAMP.parquet
│   ├── silver/
│   │   └── silver_flight_TIMESTAMP.parquet
│   └── discarded/
│       └── discarded_flight_TIMESTAMP.parquet
├── airport/
│   ├── airport-for-weather/
│   │   └── airport_for_weather_TIMESTAMP.json
│   └── airport_location.parquet
├── weather/
│   ├── bronze/
│   │   └── bronze_weather_TIMESTAMP.bin
│   └── silver/
│       └── silver_weather_TIMESTAMP.parquet
├── feature-store/
│   └── feature_TIMESTAMP.parquet
└── categorical_feature.json

2.1. Flight ETL

This stage include processing the flight data starting from the raw format in the bronze flight storage to the processed and transformed state stored in the gold flight storage. The raw data is first cleaned and the silver flight schema is enforced before storing it to silver flight storage. The data in the silver flight storage is then transformed and feature engineered to produce data most relevant to the ML model. The flight data in this stage consists of features that posses predict behavior for the flight delay risk.

2.1.1. Bronze Layer

The raw data schema is given below.

Bronze Schema
#ColumnDtype
0YEARint64
1MONTHint64
2DAYint64
3DAY_OF_WEEKint64
4AIRLINEstr
5FLIGHT_NUMBERint64
6TAIL_NUMBERstr
7ORIGIN_AIRPORTobject
8DESTINATION_AIRPORTobject
9SCHEDULED_DEPARTUREint64
10DEPARTURE_TIMEfloat64
11DEPARTURE_DELAYfloat64
12TAXI_OUTfloat64
13WHEELS_OFFfloat64
14SCHEDULED_TIMEfloat64
15ELAPSED_TIMEfloat64
16AIR_TIMEfloat64
17DISTANCEint64
18WHEELS_ONfloat64
19TAXI_INfloat64
20SCHEDULED_ARRIVALint64
21ARRIVAL_TIMEfloat64
22ARRIVAL_DELAYfloat64
23DIVERTEDint64
24CANCELLEDint64
25CANCELLATION_REASONstr
26AIR_SYSTEM_DELAYfloat64
27SECURITY_DELAYfloat64
28AIRLINE_DELAYfloat64
29LATE_AIRCRAFT_DELAYfloat64
30WEATHER_DELAYfloat64

2.1.2 Silver Layer

We define a silver schema that the cleaning stage enforces on the features.

Silver Schema
#columndatatype
0YEARint16
1MONTHint16
2DAYint16
3DAY_OF_WEEKint16
4AIRLINEstr
5FLIGHT_NUMBERint64
6TAIL_NUMBERstr
7ORIGIN_AIRPORTstr
8DESTINATION_AIRPORTstr
9SCHEDULED_DEPARTUREfloat32
10DEPARTURE_TIMEfloat32
11DEPARTURE_DELAYfloat32
12TAXI_OUTfloat32
13WHEELS_OFFfloat32
14SCHEDULED_TIMEfloat32
15ELAPSED_TIMEfloat32
16AIR_TIMEfloat32
17DISTANCEfloat32
18WHEELS_ONfloat32
19TAXI_INfloat32
20SCHEDULED_ARRIVALfloat32
21ARRIVAL_TIMEfloat32
22ARRIVAL_DELAYfloat32
23DIVERTEDint8
24CANCELLEDint8
25CANCELLATION_REASONstr
26AIR_SYSTEM_DELAYfloat32
27SECURITY_DELAYfloat32
28AIRLINE_DELAYfloat32
29LATE_AIRCRAFT_DELAYfloat32
30WEATHER_DELAYfloat32

We have downcasted various features to reduce the memory footprint.

The data quality checks are done in the silver layer. The features are checked standalone and together in the context of other features to verify the features are consistent with each other. The cleaning stage consists of the following checks. (Add reindexing?)

Schema validation: This is the entry gate of the silver stage. It consists of enforcing the entries in the features are of the type declared in the schema.

Dropping duplicate entries: The rows that contain all features duplicate to the other some row are dropped to ensure the each row is unique.

Dropping null values: The features that are relevant to the downstream tasks are checked for their values.

Enfocing feature format: Some features follow a standard format. For instance, the airline and airport values are standard IATA codes. The airline is a two digit code and the airport codes are three digit string values. The pipeline must ensure the data are syntactically correct.

Enforcing feature range: Most of the values that appear in given system are structurally or practically bounded, i.e., they take values within particular ranges. The month values is restricted to be a positive integer less than or equal to 12. If a certain row has missing entry for relevant features, the row is dropped. It is also possible to impute the missing values using some statistical values. We choose to simply drop those entries, however.

Once the above checks are done on the data, the data is stored in the silver storage.


Categorical Features: Following are the categorical features in the dataset.

  • AIRLINE
  • FLIGHT_NUMBER
  • TAIL_NUMBER
  • ORIGIN_AIRPORT
  • DESTINATION_AIRPORT
  • DIVERTED
  • CANCELLED
  • CANCELLATION_REASON

We use the pandas CategoricalDtype to convert these features into categorical datatype. The categories for each feature is stored in a file categories.json. The categorical columns are converted to categorical datatype based on this ground truth.

Out of these categorical features, some of the categories are volatile and change over time. The categories FLIGHT_NUMBER, TAIL_NUMBER do not remain same and new entries are added or removed with time.

For other features that are relatively less volatile like AIRLINE, ORIGIN_AIRPORT, and DESTINATION_AIRPORT, the ideal situation is to gather the list of their possible values separately as they have well defined fixed number of values. However, the possibility remained. What if in future there is a new airline registered? Because the model was trained on the data that had a well-defined category values because all the categories are present, the model never learnt what to do when an unseen category value is encounterd. The issue becomes more serious once we go back and look at our data. For ex, below we list a few categorical features along with the airlines present in the data along with their frequency in the data.

AIRLINECOUNT
WN1261855
DL875881
AA725984
OO588353
EV571977
UA515723
MQ294632
B6267048
US198715
AS172521
NK117379
F990836
HA76272
VX61903
CANCELLATION_REASONCOUNT
B48851
A25262
C15749
D22
CANCELLEDCOUNT
05729195
189884
DIVERTEDCOUNT
05803892
115187

There are around more that 50 US passenger airlines. If we create AIRLINE category based on the ground truth, the model would never learn how to handle data with airlines that were not present in the training data. The model will fail to generalize even for the present active airlines if those airlines were not present in the training data. The issue is similar to the case of volatile categorical features.

To mitigate this, we choose to work only with a subset of categories present in the training dataset. The rest of the categories are treated with a single UNKNOWN label. The size of the category is choosen such that it covers significantly fraction of the dataset while also leaving some room for the model to learn the pattern for UNKNOWN category. This improves generalization and forces the model to learn the pattern based on the other features better.

Note that the other categories like DIVERTED, CANCELLED, and CANCELLATION_REASON do not enter the model stage. So, we do not have to worry about the generalization problem with these features. So, for these categories, we keep all the possible categorical values present in the data and append an UNKNOWN entry in each of them to handle unseen values in the future dataset.

To create the ground truth for category features, we create a dictionary of the categories below.

categorical_features = {"FLIGHT_NUMBER": [], "TAIL_NUMBER": [], "ORIGIN_AIRPORT": [], "DESTINATION_AIRPORT": [], "DIVERTED": [], "CANCELLED": [], "CANCELLATION_REASON": []}

We calculate the relative frequency with which each category appears in the dataset. A subset of the category is chosen such that the susbset covers a significant fraction of the dataset. The subset is selected with a threshold frequency weight. For the airport codes, for ex, we find that choosing threshold keeps around airport codes out of 322 and account for around data.

categorical_features = {"AIRLINE": None, "FLIGHT_NUMBER": None, "TAIL_NUMBER": None, 
                        'ORIGIN_AIRPORT': None, 'DESTINATION_AIRPORT': None}
"""
the thresholds covers the below percentage of the dataset
Column               threshold     % of dataset
------               ---------     ------------
AIRLINE               0.015        97.60
FLIGHT_NUMBER         1.e-5        99.79
TAIL_NUMBER           5.e-5        98.93
ORIGIN_AIRPORT        0.0002       98.93
DESTINATION_AIRPORT   0.0002       98.93
"""
threshold = [0.015, 1.e-5, 5.e-5, 0.0002, 0.0002]

for i, feature in enumerate(categorical_features.keys()):
    feature_col = flight[feature].astype(str)

    # calculate the relative weight of each value based on their frequency in the dataset.
    feature_weights = feature_col.value_counts(normalize=True)

    # map each feature values to their corresponding weights
    feature_with_weights = feature_col.map(feature_weights)
    
    categorical_features[feature] = feature_col[feature_with_weights>=threshold[i]].drop_duplicates().to_list()

The cleaned dataset doesn’t have any non-null value for the CANCELLATION_REASON feature. The non-null rows happened to have null values for not-nullable features so, they were dropped from the dataset.

For the remaining categories, CANCELLATION_REASON, CANCELLED, and DIVERTED, we select the possible categories from the raw dataset. We, then, append the UNKNOWN category to each of the categories in the complete categorical_features. The categories in the categorical_features provides us with a ground truth for the categorical datatypes that we would use later in the pipeline to cast the categorical features to categorical datatype.

2.1.3 Gold Layer

The silver layer enhances the quality of the data by performing various quality checks ensuring that the objective information in the dataset are in prestined form. The next stage is gold layer. In this layer, the data is transformed to a suitable format for model ingestion, and to improve the predictive power of the model new features are engineered using the features already present in the data or by combining extra features.

The following new features are engineered to fetch the weather details.

  • SCH_DEP_DATE
  • SCH_DEP_HOUR
  • SCH_DEP_MIN
  • DEP_HOUR
  • DEP_MIN
  • SCH_ARI_DATE
  • SCH_ARI_DAY
  • SCH_ARI_HOUR
  • SCH_ARI_MIN
  • ARI_HOUR
  • ARI_MIN

The features dropped:

  • SCHEDULED_DEPARTURE
  • DEPARTURE_TIME
  • SCHEDULED_ARRIVAL
  • ARRIVAL_TIME

The columns renamed:

  • DAY SCH_DEP_DAY

The features present in the gold flight storage is, however, an incomplete story. To predict the flight delay, another factor that influences it is the weather condition. The weather data is fetched from the weather API and transformed to a form required by the ML model.

Gold Schema
#ColumnDtype
0YEARint16
1MONTHint8
2DAY_OF_WEEKint8
3SCH_DEP_DATEstr
4SCH_DEP_DAYint8
5SCH_DEP_HOURint8
6SCH_DEP_MINint8
7SCH_ARI_DATEstr
8SCH_ARI_DAYint8
9SCH_ARI_HOURint8
10SCH_ARI_MINint8
11AIRLINEstr
12FLIGHT_NUMBERstr
13TAIL_NUMBERstr
14ORIGIN_AIRPORTstr
15DESTINATION_AIRPORTstr
16ARRIVAL_DELAYfloat32

2.2 Airport ETL

2.3 Weather ETL

2.4 Merger ETL

3. Setting up File Directory

The file structure for the project is created as follows.

ETL/
├── flight-etl
│   ├── data/
│   │   ├── flight/
│   │   │   ├── bronze/
│   │   │   │   └── bronze_flight_TIMESTAMP.csv
│   │   │   ├── gold/
│   │   │   │   └── gold_flight_TIMESTAMP.parquet
│   │   │   ├── silver/
│   │   │   │   └── silver_flight_TIMESTAMP.parquet
│   │   │   └── discarded/
│   │   │       └── discarded_flight_TIMESTAMP.parquet
│   │   ├── airport/
│   │   │   ├── airport-for-weather/
│   │   │   │   └── airport_for_weather_TIMESTAMP.json
│   │   │   └── airport_location.parquet
│   │   ├── weather/
│   │   │   ├── bronze/
│   │   │   │   └── bronze_weather_TIMESTAMP.bin
│   │   │   └── silver/
│   │   │       └── silver_weather_TIMESTAMP.parquet
│   │   ├── feature-store/
│   │   │   └── feature_TIMESTAMP.parquet
│   │   └── categorical_feature.json
│   ├── config/
│   │   └── schema.yaml
│   ├── src/
│   │   └── etl/
│   │       ├── adapters/
│   │       │   └── openmeteo.py
│   │       ├── flight/
│   │       │   ├── extract.py
│   │       │   ├── clean.py
│   │       │   ├── schema_validation.py
│   │       │   ├── transform.py
│   │       │   └── __init__.py
│   │       ├── airport/
│   │       │   ├── extract.py
│   │       │   └── __init__.py
│   │       ├── merge/
│   │       │   ├── join.py
│   │       │   └── __init__.py
│   │       ├── utils/
│   │       │   ├── constants.py
│   │       │   ├── helper.py
│   │       │   ├── loaders.py
│   │       │   ├── logger.py
│   │       │   ├── settings.py
│   │       │   └── __init__.py
│   │       └── __init__.py
│   ├── tests/
│   │   └── etl.ipynb
│   ├── pyproject.toml
│   └── requirements.txt
├── airflow-components/
│   ├── dags/
│   │   ├── flight_etl_dag.py
│   │   ├── airport_etl_dag.py
│   │   ├── weather_etl_dag.py
│   │   └── flight_weather_merger_dag.py
│   ├── logs/
│   ├── plugins/
│   └── airflow.env
├── compose.yaml
├── Dockerfile
├── .env
└── README.Docker.md

Iteration 1 Deliverable

  • Working data pipeline
  • Airflow orchestration
  • Docker containerization of the Pipeline

Iteration 2 Deliverable

1. Data Pipeline

The data pipeline consists of the following stages.

  1. Data Source
    1. Flight - Local Storage
    2. Airport locations - API
    3. Airport weather - API
  2. Generic Data Cleaning
    1. Schema validation
    2. Filter columns
    3. Drop Cancelled / Diverted
    4. Deduplicate
    5. Type mismatch
    6. Categorical Standardization
    7. Typos
    8. Value range consistency
    9. Value format consistency
    10. Cross-column consistency
    11. Deduplicate
    12. Outlier detection
    13. Null values
  3. Data Processing
    1. Recalculate SCH_ARI_MIN and ARI_MIN.
  4. Feature Engineering
    1. Decomposition of SCH_DEPARTURE, DEPARTURE_TIME, SCHEDULE_ARRIVAL, and ARRIVAL_TIME to SCH_DEP_HOUR, SCH_DEP_MIN, DEP_HOUR, DEP_MIN, SCH_ARI_HOUR, SCH_ARI_MIN, and ARI_HOUR, ARI_MIN.
    2. SCH_DEP_DATE and SCH_ARI_DATE.

1.1. Flight Data

We start with the flight data. The dataset consists of a total of samples and features.

1.1.1. Flight Data Extraction

The flight data is present in the local storage. To avoid unnecessary memory usage, we use Select on Read method to load only those features that are used in the ETL pipeline and are required by the model. We do so by passing a list of columns to read to the read_csv() method.

def loadFlightData(filepath: str, columns: list = None) -> pd.DataFrame:
	if columns is None:
		df = pd.read_csv(filepath)
	else:
		df = pd.read_csv(filepath, usecols=columns)
	return df

and passing

load_columns = """YEAR
MONTH
DAY
AIRLINE
FLIGHT_NUMBER
TAIL_NUMBER
ORIGIN_AIRPORT
DESTINATION_AIRPORT
SCHEDULED_DEPARTURE
DEPARTURE_TIME
DEPARTURE_DELAY
SCHEDULED_TIME
ELAPSED_TIME
AIR_TIME
DISTANCE
SCHEDULED_ARRIVAL
ARRIVAL_TIME
ARRIVAL_DELAY
CANCELLED"""

extractFlightData(filepath, columns=load_columns.split())

1.1.2. Flight Data Processing

  • Cancelled Flights Since we are dealing with the delay prediction model, we drop the rows for which flights are cancelled. The total percentage of cancelled flights is The number of cancelled flights is very small compared to the total number of flights. So, we can safely drop the cancelled flights. Cancelled flights are dropped by using CANCELLED==0 in the query() method to select only flights are not cancelled.

  • Null Values

  • Type Mismatching Columns may contain entries of type different than the type prescribed in schema. The schema contains types of either strings or numeric. We separately scan through each of the two types of columns and drop rows which differ in the data type. The schema of the dataset is given below.

ColumnFormat
YEARint8
MONTHint8
DAYint8
AIRLINEstring - 2 uppercase characters
FLIGHT_NUMBERint16 - 4 digit number
TAIL_NUMBERstring - 6 digit alphanumeric
ORIGIN_AIRPORTstring - 3 uppercase characters
DESTINATION_AIRPORTstring - 3 uppercase characters
SCHEDULED_DEPARTUREfloat32 - first two digits for hour, last two digits for minutes
DEPARTURE_TIMEfloat32 - first two digits for hour, last two digits for minutes
DEPARTURE_DELAYfloat32 - minutes
SCHEDULED_TIMEfloat32 - minutes
ELAPSED_TIMEfloat32 - minutes
AIR_TIMEfloat32 - minute
DISTANCEfloat32 - km
SCHEDULED_ARRIVALfloat32 - first two digits for hour, last two digits for minutes
ARRIVAL_TIMEfloat32 - first two digits for hour, last two digits for minutes
ARRIVAL_DELAYfloat32 - minutes
CANCELLEDint8
The rows with entries that fail to match the schema are selected using pd.to_numeric(df[column], errors="coerce").isna() and dropped subsequently. For the string datatype columns, the selection code is negated with ~ to catch numeric entries.

The actual formats of the columns are as follows.

AIRLINEobject
FLIGHT_NUMBERint64
TAIL_NUMBERobject
ORIGIN_AIRPORTobject
DESTINATION_AIRPORTobject
SCHEDULED_DEPARTUREint64
DEPARTURE_TIMEfloat64
DEPARTURE_DELAYfloat64
AIR_TIMEfloat64
DISTANCEint64
SCHEDULED_ARRIVALint64
ARRIVAL_TIMEfloat64
ARRIVAL_DELAYfloat64
WEATHER_DELAYfloat64
DATEdatetime64[ns]

Creating a New Feature

  • Time Decomposition The time schedule and actual times of arrival are stored as float. The float value is interpreted in four digit format. The first two digit as hour and the other two digits as minutes. We split SCHEDULED_DEPARTURE and DEPARTURE_TIME into SCH_DEP_HOUR, SCH_DEP_MIN, DEP_HOUR, and DEP_MIN. Similarly for SCHEDULED_ARRIVAL and ARRIVAL_TIME.
  • Date Reconstruction We combine YEAR, MONTH, DAY, SCH_DEP_HOUR, SCH_DEP_MIN to create date SCH_DEP_DATE feature using the pandas datetime objects. We estimate the arrival date by adding the flight duration to the SCH_DEP_DATE. For the present, we have ignore the timezone difference.

1.1.3. Airport Location Data

1.2. Weather Data

1.2.1. Weather Data Extraction

We use the Open-Meteo API to fetch historical hourly weather. Because of API rate limits, the pipeline implements a Rate Limiter and Splitter.

  • Weight Calculation: API weight is determined by nLocations * (nDays / 14) * (nVariables / 10).
  • Request Strategy: The system calculates numDataSplits to stay under the 600/min limit and uses time.sleep to respect the API’s window.

1.2.2. Weather Data Processing

The raw API responses are expanded into a MultiIndex DataFrame (IATA, DATE, HOUR).

  • Hourly Alignment: Features like TEMPERATURE_2M and WIND_SPEED_10M are mapped to the specific hour of departure/arrival.

1.3. Complete Data Pipeline

  • The Weather Join: We perform a “Dual Join.” The flight record is enriched with weather from the Origin Airport at departure time and the Destination Airport at arrival time.
  • Categorical Encoding: Final string columns (AIRLINE, AIRPORT) are converted to CategoricalDtype and mapped to integer codes to prepare for the Machine Learning model.

Data Pipeline - Cycle 2

Metadata-Driven Pipeline We adhere to the standard strategy of separating the data from the logic. We do so by moving the explicit column names, or any parameter values used in the code to config files.

As a first step, we move the columns to load to the config file by defining the following in the config.yaml file.

pipeline:
load_columns:
- 'YEAR'
- 'MONTH'
- 'DAY'
- 'AIRLINE'
- 'FLIGHT_NUMBER'
- 'TAIL_NUMBER'
- 'ORIGIN_AIRPORT'
- 'DESTINATION_AIRPORT'
- 'SCHEDULED_DEPARTURE'
- 'DEPARTURE_TIME'
- 'DEPARTURE_DELAY'
- 'SCHEDULED_TIME'
- 'ELAPSED_TIME'
- 'AIR_TIME'
- 'DISTANCE'
- 'SCHEDULED_ARRIVAL'
- 'ARRIVAL_TIME'
- 'ARRIVAL_DELAY'
- 'CANCELLED'

We can load the file and read the load_columns as

config = load_yaml(config_path)
load_columns = config["pipeline"]["load_columns"]

Improving efficiency of dropTypeMismatchRows() Change 1: Instead of hardcoding the string and numeric columns in the function, we pass the schema of the data and let the function create the list of numeric and string columns using the following code.

string_columns = []
numeric_columns = []

for key in schema:
	if pd.api.types.is_numeric_dtype(pd.api.types.pandas_dtype(schema[key])):
		numeric_columns.append(key)
	else:
		string_columns.append(key)

The previous version used index dropping the mismatch rows for every column. This creates a memory bottleneck as the drop creates a copy of the dataset with bad rows removed. Also, the data is reassigned from the copy creating a performance slowdown. The astype() call becomes unnecessary after the data is already loaded with the specified schema.

To overcome the problems, we can use boolean masking for the mismatch data so that the a boolean series keeps track of the rows with type mismatch during the column iteration.

is_bad = pd.Series(False, index=df.index)
for col in df.columns:
	is_numeric = pd.to_numeric(df[col], errors='coerce').notna()

	if col in string_columns:
		is_bad |= is_numeric
	elif col in numeric_columns:
		is_bad |= ~is_numeric

Once, the columns are scanned, the boolean mask can be used to split the data into clean and corrupted data using df[~is_bad], df[is_bad]. The discarded data is appended to discarded_list, which is passes to the function as argument, using

if discarded_list is not None:
	discarded_list.append(df[is_bad].copy())

Fixing Path Problems Instead of manually writing the paths, we let the python create the path of the files that are needed.

  • The paths in the config files should be relative to the project folder. For instance the data path is raw_path: "data/flights.csv".
  • A separate python file constants.py in the utils folder finds the absolute path of project folder as
    PROJECT_ROOT = Path(__file__).parent.parent.parent.resolve()
    Data and Config directories path are computed as
    DATA_DIR = PROJECT_ROOT / "data"
    CONFIG_DIR = PROJECT_ROOT / "config"

File Loading Utilities Functions to load the files in the config folder are defined in utils/loaders.py.

def getConfigResourcePath(file_name):
	config_path = CONFIG_DIR / f"{file_name}.yaml"
	if config_path.exists():
		return config_path
	else:
		raise FileNotFoundError(f"Missing {file_name} at {config_path}")

Pipeline

graph LR
	subgraph Inference Pipeline
		direction LR
		data[Flight]
		airport[Airport]
		weather[Weather]
		dp[Data Pipeline]
		gold[(Gold)]
		model[Model]
		routing[Routing]
		interface[Interface]

		data --"fl_m_cols"--> dp
		airport --> dp
		weather --"w_n_cols"--> dp
		dp --"fl_w_p_cols"--> gold
		gold --"fl_w_p_cols"--> model
		model --"p_{risk} < p_0"--> interface
		model --"p_{risk} >= p_0"--> routing
		routing --> interface
		gold --"fl_q_cols"--> routing
	end

Logging the Pipeline

The utils.constants.py file reads te environment variables and set them as constants. The metric for logging is naturally whether the variable exists in the env file. The following function reads the env variable and logs a warning in case the variable is not set.

logger = logging.getLogger("airflow.task")
load_dotenv()

def get_env_var(var_name: str) -> str:
    env_var = os.getenv(var_name)
    if env_var is None:
        logger.warning(f"The environment variable {var_name} is not set.")
        logger.warning(f"Setting {var_name} to empty string.")
    return str(env_var)

The file_handler.py file has the log metrics as following.

  • file exist status
logger = logging.getLogger("airflow.task")


def loader(func):
    def error_wrapper(filepath, *args, **kwargs):
        try:
            logger.info(f"Reading the file {filepath}.")
            content = func(filepath, *args, **kwargs)
            logger.debug(f"File read successfully: {filepath}.")
            return content
        except Exception as e:
            logger.error(f"Error occured while loading the file {filepath}: {e}")
            raise

    return error_wrapper


def dumper(func):
    def error_wrapper(filepath, *args, **kwargs):
        try:
            logger.info(f"Writing to the file {filepath}")
            func(filepath, *args, **kwargs)
            logger.debug(f"File written successfully: {filepath}.")
        except Exception as e:
            logger.error(f"Error occured during the file write {filepath}: {e}")
            raise

    return error_wrapper