Sources:
1. Architecting the Pipeline
The core requireement of any pipeline is scaling in two dimensions. A production system scales in data size in the vertical dimension, and the service types in the horizontal axis. So, scaling an application consists of scaling it horizontally or vertically.
An application’s core lifecycle encompasses its development and deployment stages. An application routinely undergoes through this lifecycle. As the application scales, it becomes critically important to make the lifecycle compatible with the scaling requirement so that the app lifecycle progresses independent of the scaling.
The Twelve-Factor-App principles dictate the design principle for applications to meet these two scaling demands by designing applications that can be repeated along the hroizontal or vertical dimensions without requiring a single application to grow in size. Each applications are loosely coupled to each other.
The pipeline requires certain files for it’s configuration and observability. Following resources need to be placed for the pipeline to work.
- Config file: The configuration for the pipeline is defined in a separate
schema.yamlfile and is placed in at/etc/etl/config/directory. - Log files: The pipelin writes the logs at
/etc/etl/logs/. - Data: The data available to the pipeline can be mounted either from cloud storage or locally. For cloud storage the bucket address need to be passed. For local storage the path of the root data directory needs to be passed.
2. The ETL Pipeline
The ETL pipeline is shown below. The complete pipeline consists of three different dataset: flight details, airport details, and the weather details. These dataset are ingested and processed at different stages of the pipeline process. The core components of the pipeline are Flight ETL, Airport ETL, Weather ETL, and Merger ETL.
The weather data comes through an external API. Since the data provided by the different vendors come in format that may differ between vendors, the weather data processing stage is best handled as an external service and is included as an adapter into the pipeline rather than being a part of the core pipeline.
There are different ways one can handle this. At interprise scale, this requires shifting the weather data processing as a separate microservice. In this project, we include the weather processing as an adapter that interfaces with the core pipeline using a data contract. The data that the weather adapter hands to the core pipeline is defined by the silver weather schema. It is the responsibility of the weather adapter to ensure the data it provides meets the silver weather schema defined by the core pipeline.
The below flowchart depicts the complete pipeline in terms of the four blocks.
graph TD
subgraph "<b>ETL Pipeline</b>"
direction TB
subgraph flight_etl[" "]
flight_dependency["<b>Dependency/Config</b><br><br><b>Clean</b><br>- flight silver schema<br>- flight bronze filepath<br>- flight silver filepath<br>- discarded filepath<br><br><b>Transform</b><br>- flight gold schema<br>- flight silver filepath<br>- flight gold filepath"]
flight_bronze[(Flight Bronze)]
end
style flight_etl stroke:none
flight@{ shape: div-rect, label: "Flight ETL"}
subgraph flight_store [" "]
flight_discarded[(Flight Discarded)]
flight_silver[(Flight Silver)]
flight_gold[(Flight Gold)]
end
style flight_store stroke:none
airport_dependency["<b>Dependency/Config</b><br><br><b>Extract</b><br>- flight gold filepath<br>- airport location for<br>weather filepath<br>- features"]
airport@{ shape: div-rect, label: "Airport ETL"}
subgraph airport_store [" "]
airport_locations[("Airports<br>[Airport codes,<br>Latitude,<br>Longitude]")]
airport_locations_for_weather[("[Airport Codes,<br>Date range]")]
end
style airport_store stroke:none
weather_dependency["<b>Dependency/Config</b><br><br><b>Extract</b><br>- airports filepath<br>- airport location for<br>weather filepath<br>- weather bronze filepath<br>- weather features<br>- API url<br><br><b>Transform</b><br>- airport location for<br>weather filepath<br>- weather bronze filepath<br>- weather silver filepath<br>- weather features"]
weather@{ shape: div-rect, label: "Weather Adapter"}
subgraph weather_store[" "]
weather_bronze[(Weather Bronze)]
weather_silver[(weather Silver)]
end
style weather_store stroke:none
merger_dependency["<b>Dependency/Config</b><br><br><b>Join</b><br>- flight gold filepath<br>- weather silver filepath<br>- flight-weather gold<br>filepath<br>- flight-weather join-on<br>feature map<br><br><b>Transform</b><br>- airport location for<br>weather filepath<br>- weather bronze filepath<br>- weather silver filepath<br>- weather features"]
merger@{ shape: div-rect, label: "Merger"}
subgraph target [" "]
merger_gold[(Flight-weather Gold)]
target_featureset[(Feature Store)]
end
style target stroke:none
flight_dependency -.-> flight
flight_bronze ==> flight
weather_dependency -.-> weather
flight --"Discarded"--> flight_discarded
flight =="Cleaned data"==> flight_silver
flight =="Transformed data"==> flight_gold
airport_dependency -.-> airport
flight_gold ==> airport
airport ==> airport_locations_for_weather
airport_locations ==> weather
airport_locations_for_weather ==> weather
weather =="Raw data"==> weather_bronze
merger_dependency -.-> merger
weather ==> weather_silver
weather_silver ==> merger
merger =="Join"==> merger_gold
flight ==> merger
merger =="Transformed"==> target_featureset
end
graph TD
subgraph etl[<b>Production ETL Pipeline</b>]
direction TB
flight_storage_bronze[("Bronze (Flight)<br/>S3/GCS<br>(csv)")]
subgraph flight[Flight ETL]
subgraph cleaning[Cleaning]
val[Schema Validation]
clean[Clean & Deduplicate]
end
flight_storage_silver[("Silver (Flight)<br>(parquet)")]
flight_transform[Flight Transform & <br>Feature Engineering]
end
airport_api[airports API]
weather_api[openmeteo API]
flight_storage_garbage[("Discarded (S3/GCS)<br>(csv)")]
flight_storage_gold[("Gold (Flight)<br>(parquet)")]
subgraph airport_etl[Airport ETL]
airport_extract[Extract<br>unique IATA]
airports_for_weather[("Distinct IATA<br>date range<br>(json)")]
airports[("Airports Location<br>(parquet)")]
end
subgraph weather[Weather Adapter]
weather_extract[Weather Extract]
weather_storage_bronze[("Bronze (Weather)<br>(bin)")]
weather_transform[Weather Clean and<br> Transform]
end
weather_storage_silver[("Silver (Weather)<br>(parquet)")]
subgraph Merge
flight_weather[Join]
end
storage2[("Feature Store<br>(parquet)")]
weather_api --> weather_extract
airport_api --"If location missing"--> weather_extract
flight_storage_bronze --"S3/BigQuery Select"--> val
val -- "Failed Data<br>Alert" --> flight_storage_garbage
clean -- "Failed Data<br>Alert" --> flight_storage_garbage
val --> clean
clean --> flight_storage_silver
flight_storage_silver --> flight_transform
flight_transform --> flight_storage_gold
flight_storage_gold --"ORIGIN_AIRPORT<br>DESTINATION_AIRPORT"--> airport_extract
airport_extract --> airports_for_weather
airports --> weather_extract
airports_for_weather --> weather_extract
weather_extract --> weather_storage_bronze
weather_storage_bronze --> weather_transform
airports_for_weather --> weather_transform
weather_transform --> weather_storage_silver
weather_storage_silver --> flight_weather
flight_storage_gold --> flight_weather
flight_weather --> storage2
end
Before we proceed to discuss each component in detail, we elaborate the data requirement of the complete pipeline. We follow the Medillian data storage architecture to distingish the different stages of the data during the pipeline.
As the data progresses downstream in the pipeline, it goes through several stages of transformation. The change in the data broadly consists of cleaning and transformation. The raw data is classified as Bronze layer. The data is cleaned for it’s syntactic and symantic purity. The data in crossing this stage is classified into Silver layer. The next stage is where the data is transformed to a form suitable for it’s usage either in analytics or ML ingestion. In this stage the data is often enriched with new features that are either created from the existing features or from some other data to enhance the usability of the data for further analysis or ML prediction.
In this project the data comes from three sources: flight, airport, and weather. The pipeline appends each flight details with the weather details at the time of departure and arrival of the flight at the respective airport locations. The flight details contains the airport IATA codes. The pipeline uses a separate source of data to fetch the location of the airports, which it then uses to fetch the weather details for an appropriate duration.
The data from the three sources are stored separately in the respective directories. After the data from inividual sources have been processed, the data from these sources are merged together.
The final merged data is handled in the merged directory.
Below, we systematize the data requirement into the directory structure we adopt in this project.
aviation-etl-storage/
├── flight/
│ ├── bronze/
│ │ ├── bronze_flight_TIMESTAMP.csv
│ ├── gold/
│ │ ├── gold_flight_TIMESTAMP.parquet
│ ├── silver/
│ │ └── silver_flight_TIMESTAMP.parquet
│ └── discarded/
│ └── discarded_flight_TIMESTAMP.parquet
├── airport/
│ ├── airport-for-weather/
│ │ └── airport_for_weather_TIMESTAMP.json
│ └── airport_location.parquet
├── weather/
│ ├── bronze/
│ │ └── bronze_weather_TIMESTAMP.bin
│ └── silver/
│ └── silver_weather_TIMESTAMP.parquet
├── feature-store/
│ └── feature_TIMESTAMP.parquet
└── categorical_feature.json
2.1. Flight ETL
This stage include processing the flight data starting from the raw format in the bronze flight storage to the processed and transformed state stored in the gold flight storage. The raw data is first cleaned and the silver flight schema is enforced before storing it to silver flight storage. The data in the silver flight storage is then transformed and feature engineered to produce data most relevant to the ML model. The flight data in this stage consists of features that posses predict behavior for the flight delay risk.
2.1.1. Bronze Layer
The raw data schema is given below.
Bronze Schema
| # | Column | Dtype |
|---|---|---|
| 0 | YEAR | int64 |
| 1 | MONTH | int64 |
| 2 | DAY | int64 |
| 3 | DAY_OF_WEEK | int64 |
| 4 | AIRLINE | str |
| 5 | FLIGHT_NUMBER | int64 |
| 6 | TAIL_NUMBER | str |
| 7 | ORIGIN_AIRPORT | object |
| 8 | DESTINATION_AIRPORT | object |
| 9 | SCHEDULED_DEPARTURE | int64 |
| 10 | DEPARTURE_TIME | float64 |
| 11 | DEPARTURE_DELAY | float64 |
| 12 | TAXI_OUT | float64 |
| 13 | WHEELS_OFF | float64 |
| 14 | SCHEDULED_TIME | float64 |
| 15 | ELAPSED_TIME | float64 |
| 16 | AIR_TIME | float64 |
| 17 | DISTANCE | int64 |
| 18 | WHEELS_ON | float64 |
| 19 | TAXI_IN | float64 |
| 20 | SCHEDULED_ARRIVAL | int64 |
| 21 | ARRIVAL_TIME | float64 |
| 22 | ARRIVAL_DELAY | float64 |
| 23 | DIVERTED | int64 |
| 24 | CANCELLED | int64 |
| 25 | CANCELLATION_REASON | str |
| 26 | AIR_SYSTEM_DELAY | float64 |
| 27 | SECURITY_DELAY | float64 |
| 28 | AIRLINE_DELAY | float64 |
| 29 | LATE_AIRCRAFT_DELAY | float64 |
| 30 | WEATHER_DELAY | float64 |
2.1.2 Silver Layer
We define a silver schema that the cleaning stage enforces on the features.
Silver Schema
| # | column | datatype |
|---|---|---|
| 0 | YEAR | int16 |
| 1 | MONTH | int16 |
| 2 | DAY | int16 |
| 3 | DAY_OF_WEEK | int16 |
| 4 | AIRLINE | str |
| 5 | FLIGHT_NUMBER | int64 |
| 6 | TAIL_NUMBER | str |
| 7 | ORIGIN_AIRPORT | str |
| 8 | DESTINATION_AIRPORT | str |
| 9 | SCHEDULED_DEPARTURE | float32 |
| 10 | DEPARTURE_TIME | float32 |
| 11 | DEPARTURE_DELAY | float32 |
| 12 | TAXI_OUT | float32 |
| 13 | WHEELS_OFF | float32 |
| 14 | SCHEDULED_TIME | float32 |
| 15 | ELAPSED_TIME | float32 |
| 16 | AIR_TIME | float32 |
| 17 | DISTANCE | float32 |
| 18 | WHEELS_ON | float32 |
| 19 | TAXI_IN | float32 |
| 20 | SCHEDULED_ARRIVAL | float32 |
| 21 | ARRIVAL_TIME | float32 |
| 22 | ARRIVAL_DELAY | float32 |
| 23 | DIVERTED | int8 |
| 24 | CANCELLED | int8 |
| 25 | CANCELLATION_REASON | str |
| 26 | AIR_SYSTEM_DELAY | float32 |
| 27 | SECURITY_DELAY | float32 |
| 28 | AIRLINE_DELAY | float32 |
| 29 | LATE_AIRCRAFT_DELAY | float32 |
| 30 | WEATHER_DELAY | float32 |
We have downcasted various features to reduce the memory footprint.
The data quality checks are done in the silver layer. The features are checked standalone and together in the context of other features to verify the features are consistent with each other. The cleaning stage consists of the following checks. (Add reindexing?)
Schema validation: This is the entry gate of the silver stage. It consists of enforcing the entries in the features are of the type declared in the schema.
Dropping duplicate entries: The rows that contain all features duplicate to the other some row are dropped to ensure the each row is unique.
Dropping null values: The features that are relevant to the downstream tasks are checked for their values.
Enfocing feature format: Some features follow a standard format. For instance, the airline and airport values are standard IATA codes. The airline is a two digit code and the airport codes are three digit string values. The pipeline must ensure the data are syntactically correct.
Enforcing feature range: Most of the values that appear in given system are structurally or practically bounded, i.e., they take values within particular ranges. The month values is restricted to be a positive integer less than or equal to 12. If a certain row has missing entry for relevant features, the row is dropped. It is also possible to impute the missing values using some statistical values. We choose to simply drop those entries, however.
Once the above checks are done on the data, the data is stored in the silver storage.
Categorical Features: Following are the categorical features in the dataset.
- AIRLINE
- FLIGHT_NUMBER
- TAIL_NUMBER
- ORIGIN_AIRPORT
- DESTINATION_AIRPORT
- DIVERTED
- CANCELLED
- CANCELLATION_REASON
We use the pandas CategoricalDtype to convert these features into categorical datatype.
The categories for each feature is stored in a file categories.json. The categorical columns are converted to categorical datatype based on this ground truth.
Out of these categorical features, some of the categories are volatile and change over time.
The categories FLIGHT_NUMBER, TAIL_NUMBER do not remain same and new entries are added or removed with time.
For other features that are relatively less volatile like AIRLINE, ORIGIN_AIRPORT, and DESTINATION_AIRPORT, the ideal situation is to gather the list of their possible values separately as they have well defined fixed number of values.
However, the possibility remained. What if in future there is a new airline registered?
Because the model was trained on the data that had a well-defined category values because all the categories are present, the model never learnt what to do when an unseen category value is encounterd.
The issue becomes more serious once we go back and look at our data. For ex, below we list a few categorical features along with the airlines present in the data along with their frequency in the data.
| AIRLINE | COUNT |
|---|---|
| WN | 1261855 |
| DL | 875881 |
| AA | 725984 |
| OO | 588353 |
| EV | 571977 |
| UA | 515723 |
| MQ | 294632 |
| B6 | 267048 |
| US | 198715 |
| AS | 172521 |
| NK | 117379 |
| F9 | 90836 |
| HA | 76272 |
| VX | 61903 |
| CANCELLATION_REASON | COUNT |
|---|---|
| B | 48851 |
| A | 25262 |
| C | 15749 |
| D | 22 |
| CANCELLED | COUNT |
|---|---|
| 0 | 5729195 |
| 1 | 89884 |
| DIVERTED | COUNT |
|---|---|
| 0 | 5803892 |
| 1 | 15187 |
There are around more that 50 US passenger airlines. If we create AIRLINE category based on the ground truth, the model would never learn how to handle data with airlines that were not present in the training data. The model will fail to generalize even for the present active airlines if those airlines were not present in the training data. The issue is similar to the case of volatile categorical features.
To mitigate this, we choose to work only with a subset of categories present in the training dataset.
The rest of the categories are treated with a single UNKNOWN label. The size of the category is choosen such that it covers significantly fraction of the dataset while also leaving some room for the model to learn the pattern for UNKNOWN category.
This improves generalization and forces the model to learn the pattern based on the other features better.
Note that the other categories like DIVERTED, CANCELLED, and CANCELLATION_REASON do not enter the model stage. So, we do not have to worry about the generalization problem with these features.
So, for these categories, we keep all the possible categorical values present in the data and append an UNKNOWN entry in each of them to handle unseen values in the future dataset.
To create the ground truth for category features, we create a dictionary of the categories below.
categorical_features = {"FLIGHT_NUMBER": [], "TAIL_NUMBER": [], "ORIGIN_AIRPORT": [], "DESTINATION_AIRPORT": [], "DIVERTED": [], "CANCELLED": [], "CANCELLATION_REASON": []}
We calculate the relative frequency with which each category appears in the dataset.
A subset of the category is chosen such that the susbset covers a significant fraction of the dataset.
The subset is selected with a threshold frequency weight.
For the airport codes, for ex, we find that choosing threshold
categorical_features = {"AIRLINE": None, "FLIGHT_NUMBER": None, "TAIL_NUMBER": None,
'ORIGIN_AIRPORT': None, 'DESTINATION_AIRPORT': None}
"""
the thresholds covers the below percentage of the dataset
Column threshold % of dataset
------ --------- ------------
AIRLINE 0.015 97.60
FLIGHT_NUMBER 1.e-5 99.79
TAIL_NUMBER 5.e-5 98.93
ORIGIN_AIRPORT 0.0002 98.93
DESTINATION_AIRPORT 0.0002 98.93
"""
threshold = [0.015, 1.e-5, 5.e-5, 0.0002, 0.0002]
for i, feature in enumerate(categorical_features.keys()):
feature_col = flight[feature].astype(str)
# calculate the relative weight of each value based on their frequency in the dataset.
feature_weights = feature_col.value_counts(normalize=True)
# map each feature values to their corresponding weights
feature_with_weights = feature_col.map(feature_weights)
categorical_features[feature] = feature_col[feature_with_weights>=threshold[i]].drop_duplicates().to_list()
The cleaned dataset doesn’t have any non-null value for the CANCELLATION_REASON feature.
The non-null rows happened to have null values for not-nullable features so, they were dropped from the dataset.
For the remaining categories, CANCELLATION_REASON, CANCELLED, and DIVERTED, we select the possible categories from the raw dataset.
We, then, append the UNKNOWN category to each of the categories in the complete categorical_features.
The categories in the categorical_features provides us with a ground truth for the categorical datatypes that we would use later in the pipeline to cast the categorical features to categorical datatype.
2.1.3 Gold Layer
The silver layer enhances the quality of the data by performing various quality checks ensuring that the objective information in the dataset are in prestined form. The next stage is gold layer. In this layer, the data is transformed to a suitable format for model ingestion, and to improve the predictive power of the model new features are engineered using the features already present in the data or by combining extra features.
The following new features are engineered to fetch the weather details.
- SCH_DEP_DATE
- SCH_DEP_HOUR
- SCH_DEP_MIN
- DEP_HOUR
- DEP_MIN
- SCH_ARI_DATE
- SCH_ARI_DAY
- SCH_ARI_HOUR
- SCH_ARI_MIN
- ARI_HOUR
- ARI_MIN
The features dropped:
- SCHEDULED_DEPARTURE
- DEPARTURE_TIME
- SCHEDULED_ARRIVAL
- ARRIVAL_TIME
The columns renamed:
- DAY
SCH_DEP_DAY
The features present in the gold flight storage is, however, an incomplete story. To predict the flight delay, another factor that influences it is the weather condition. The weather data is fetched from the weather API and transformed to a form required by the ML model.
Gold Schema
| # | Column | Dtype |
|---|---|---|
| 0 | YEAR | int16 |
| 1 | MONTH | int8 |
| 2 | DAY_OF_WEEK | int8 |
| 3 | SCH_DEP_DATE | str |
| 4 | SCH_DEP_DAY | int8 |
| 5 | SCH_DEP_HOUR | int8 |
| 6 | SCH_DEP_MIN | int8 |
| 7 | SCH_ARI_DATE | str |
| 8 | SCH_ARI_DAY | int8 |
| 9 | SCH_ARI_HOUR | int8 |
| 10 | SCH_ARI_MIN | int8 |
| 11 | AIRLINE | str |
| 12 | FLIGHT_NUMBER | str |
| 13 | TAIL_NUMBER | str |
| 14 | ORIGIN_AIRPORT | str |
| 15 | DESTINATION_AIRPORT | str |
| 16 | ARRIVAL_DELAY | float32 |
2.2 Airport ETL
2.3 Weather ETL
2.4 Merger ETL
3. Setting up File Directory
The file structure for the project is created as follows.
ETL/
├── flight-etl
│ ├── data/
│ │ ├── flight/
│ │ │ ├── bronze/
│ │ │ │ └── bronze_flight_TIMESTAMP.csv
│ │ │ ├── gold/
│ │ │ │ └── gold_flight_TIMESTAMP.parquet
│ │ │ ├── silver/
│ │ │ │ └── silver_flight_TIMESTAMP.parquet
│ │ │ └── discarded/
│ │ │ └── discarded_flight_TIMESTAMP.parquet
│ │ ├── airport/
│ │ │ ├── airport-for-weather/
│ │ │ │ └── airport_for_weather_TIMESTAMP.json
│ │ │ └── airport_location.parquet
│ │ ├── weather/
│ │ │ ├── bronze/
│ │ │ │ └── bronze_weather_TIMESTAMP.bin
│ │ │ └── silver/
│ │ │ └── silver_weather_TIMESTAMP.parquet
│ │ ├── feature-store/
│ │ │ └── feature_TIMESTAMP.parquet
│ │ └── categorical_feature.json
│ ├── config/
│ │ └── schema.yaml
│ ├── src/
│ │ └── etl/
│ │ ├── adapters/
│ │ │ └── openmeteo.py
│ │ ├── flight/
│ │ │ ├── extract.py
│ │ │ ├── clean.py
│ │ │ ├── schema_validation.py
│ │ │ ├── transform.py
│ │ │ └── __init__.py
│ │ ├── airport/
│ │ │ ├── extract.py
│ │ │ └── __init__.py
│ │ ├── merge/
│ │ │ ├── join.py
│ │ │ └── __init__.py
│ │ ├── utils/
│ │ │ ├── constants.py
│ │ │ ├── helper.py
│ │ │ ├── loaders.py
│ │ │ ├── logger.py
│ │ │ ├── settings.py
│ │ │ └── __init__.py
│ │ └── __init__.py
│ ├── tests/
│ │ └── etl.ipynb
│ ├── pyproject.toml
│ └── requirements.txt
├── airflow-components/
│ ├── dags/
│ │ ├── flight_etl_dag.py
│ │ ├── airport_etl_dag.py
│ │ ├── weather_etl_dag.py
│ │ └── flight_weather_merger_dag.py
│ ├── logs/
│ ├── plugins/
│ └── airflow.env
├── compose.yaml
├── Dockerfile
├── .env
└── README.Docker.md
Iteration 1 Deliverable
- Working data pipeline
- Airflow orchestration
- Docker containerization of the Pipeline
Iteration 2 Deliverable
- Feature engineer several features that improve accuracy
- Compare few models to choose from
- Perform evaluation of model performance for regression and distributional regression to introduce uncertainty modelling.
- Statistical characterization of airplane delays
- Dynamically forecasting airline departure delay probability distributions for individual flights using supervised learning
- Analyzing and Predicting Airline Delays: A Comprehensive Data Science Approach
- Estimating Flight Departure Delay Distributions - a Statistical Approach with Long-Term Trend and Short-Term Pattern
- Universal patterns in passenger flight departure delays
1. Data Pipeline
The data pipeline consists of the following stages.
- Data Source
- Flight - Local Storage
- Airport locations - API
- Airport weather - API
- Generic Data Cleaning
- Schema validation
- Filter columns
- Drop Cancelled / Diverted
- Deduplicate
- Type mismatch
- Categorical Standardization
- Typos
- Value range consistency
- Value format consistency
- Cross-column consistency
- Deduplicate
- Outlier detection
- Null values
- Missing values for continuous feature can be replaced by its mean, whereas for categorical feature by 0 A hybrid machine learning-based model for predicting flight delay through aviation big data.
- Data Processing
- Recalculate
SCH_ARI_MINandARI_MIN.
- Recalculate
- Feature Engineering
- Decomposition of
SCH_DEPARTURE,DEPARTURE_TIME,SCHEDULE_ARRIVAL, andARRIVAL_TIMEtoSCH_DEP_HOUR, SCH_DEP_MIN,DEP_HOUR, DEP_MIN,SCH_ARI_HOUR, SCH_ARI_MIN, andARI_HOUR, ARI_MIN. SCH_DEP_DATEandSCH_ARI_DATE.
- Decomposition of
1.1. Flight Data
We start with the flight data. The dataset consists of a total of
1.1.1. Flight Data Extraction
The flight data is present in the local storage. To avoid unnecessary memory usage, we use Select on Read method to load only those features that are used in the ETL pipeline and are required by the model. We do so by passing a list of columns to read to the read_csv() method.
def loadFlightData(filepath: str, columns: list = None) -> pd.DataFrame:
if columns is None:
df = pd.read_csv(filepath)
else:
df = pd.read_csv(filepath, usecols=columns)
return df
and passing
load_columns = """YEAR
MONTH
DAY
AIRLINE
FLIGHT_NUMBER
TAIL_NUMBER
ORIGIN_AIRPORT
DESTINATION_AIRPORT
SCHEDULED_DEPARTURE
DEPARTURE_TIME
DEPARTURE_DELAY
SCHEDULED_TIME
ELAPSED_TIME
AIR_TIME
DISTANCE
SCHEDULED_ARRIVAL
ARRIVAL_TIME
ARRIVAL_DELAY
CANCELLED"""
extractFlightData(filepath, columns=load_columns.split())
1.1.2. Flight Data Processing
-
Cancelled Flights Since we are dealing with the delay prediction model, we drop the rows for which flights are cancelled. The total percentage of cancelled flights is
The number of cancelled flights is very small compared to the total number of flights. So, we can safely drop the cancelled flights. Cancelled flights are dropped by using CANCELLED==0in thequery()method to select only flights are not cancelled. -
Null Values
-
Type Mismatching Columns may contain entries of type different than the type prescribed in schema. The schema contains types of either strings or numeric. We separately scan through each of the two types of columns and drop rows which differ in the data type. The schema of the dataset is given below.
| Column | Format |
|---|---|
YEAR | int8 |
MONTH | int8 |
DAY | int8 |
AIRLINE | string - 2 uppercase characters |
FLIGHT_NUMBER | int16 - 4 digit number |
TAIL_NUMBER | string - 6 digit alphanumeric |
ORIGIN_AIRPORT | string - 3 uppercase characters |
DESTINATION_AIRPORT | string - 3 uppercase characters |
SCHEDULED_DEPARTURE | float32 - first two digits for hour, last two digits for minutes |
DEPARTURE_TIME | float32 - first two digits for hour, last two digits for minutes |
DEPARTURE_DELAY | float32 - minutes |
SCHEDULED_TIME | float32 - minutes |
ELAPSED_TIME | float32 - minutes |
AIR_TIME | float32 - minute |
DISTANCE | float32 - km |
SCHEDULED_ARRIVAL | float32 - first two digits for hour, last two digits for minutes |
ARRIVAL_TIME | float32 - first two digits for hour, last two digits for minutes |
ARRIVAL_DELAY | float32 - minutes |
CANCELLED | int8 |
The rows with entries that fail to match the schema are selected using pd.to_numeric(df[column], errors="coerce").isna() and dropped subsequently. For the string datatype columns, the selection code is negated with ~ to catch numeric entries. |
The actual formats of the columns are as follows.
| AIRLINE | object |
|---|---|
| FLIGHT_NUMBER | int64 |
| TAIL_NUMBER | object |
| ORIGIN_AIRPORT | object |
| DESTINATION_AIRPORT | object |
| SCHEDULED_DEPARTURE | int64 |
| DEPARTURE_TIME | float64 |
| DEPARTURE_DELAY | float64 |
| AIR_TIME | float64 |
| DISTANCE | int64 |
| SCHEDULED_ARRIVAL | int64 |
| ARRIVAL_TIME | float64 |
| ARRIVAL_DELAY | float64 |
| WEATHER_DELAY | float64 |
| DATE | datetime64[ns] |
Creating a New Feature
- Time Decomposition
The time schedule and actual times of arrival are stored as float. The float value is interpreted in four digit format. The first two digit as hour and the other two digits as minutes. We split
SCHEDULED_DEPARTUREandDEPARTURE_TIMEintoSCH_DEP_HOUR,SCH_DEP_MIN,DEP_HOUR, andDEP_MIN. Similarly forSCHEDULED_ARRIVALandARRIVAL_TIME. - Date Reconstruction
We combine
YEAR, MONTH, DAY, SCH_DEP_HOUR, SCH_DEP_MINto create dateSCH_DEP_DATEfeature using the pandas datetime objects. We estimate the arrival date by adding the flight duration to theSCH_DEP_DATE. For the present, we have ignore the timezone difference.
1.1.3. Airport Location Data
1.2. Weather Data
1.2.1. Weather Data Extraction
We use the Open-Meteo API to fetch historical hourly weather. Because of API rate limits, the pipeline implements a Rate Limiter and Splitter.
- Weight Calculation: API weight is determined by
nLocations * (nDays / 14) * (nVariables / 10). - Request Strategy: The system calculates
numDataSplitsto stay under the 600/min limit and usestime.sleepto respect the API’s window.
1.2.2. Weather Data Processing
The raw API responses are expanded into a MultiIndex DataFrame (IATA, DATE, HOUR).
- Hourly Alignment: Features like
TEMPERATURE_2MandWIND_SPEED_10Mare mapped to the specific hour of departure/arrival.
1.3. Complete Data Pipeline
- The Weather Join: We perform a “Dual Join.” The flight record is enriched with weather from the Origin Airport at departure time and the Destination Airport at arrival time.
- Categorical Encoding: Final string columns (
AIRLINE,AIRPORT) are converted toCategoricalDtypeand mapped to integer codes to prepare for the Machine Learning model.
Data Pipeline - Cycle 2
Metadata-Driven Pipeline We adhere to the standard strategy of separating the data from the logic. We do so by moving the explicit column names, or any parameter values used in the code to config files.
As a first step, we move the columns to load to the config file by defining the following in the config.yaml file.
pipeline:
load_columns:
- 'YEAR'
- 'MONTH'
- 'DAY'
- 'AIRLINE'
- 'FLIGHT_NUMBER'
- 'TAIL_NUMBER'
- 'ORIGIN_AIRPORT'
- 'DESTINATION_AIRPORT'
- 'SCHEDULED_DEPARTURE'
- 'DEPARTURE_TIME'
- 'DEPARTURE_DELAY'
- 'SCHEDULED_TIME'
- 'ELAPSED_TIME'
- 'AIR_TIME'
- 'DISTANCE'
- 'SCHEDULED_ARRIVAL'
- 'ARRIVAL_TIME'
- 'ARRIVAL_DELAY'
- 'CANCELLED'
We can load the file and read the load_columns as
config = load_yaml(config_path)
load_columns = config["pipeline"]["load_columns"]
Improving efficiency of dropTypeMismatchRows()
Change 1: Instead of hardcoding the string and numeric columns in the function, we pass the schema of the data and let the function create the list of numeric and string columns using the following code.
string_columns = []
numeric_columns = []
for key in schema:
if pd.api.types.is_numeric_dtype(pd.api.types.pandas_dtype(schema[key])):
numeric_columns.append(key)
else:
string_columns.append(key)
The previous version used index dropping the mismatch rows for every column. This creates a memory bottleneck as the drop creates a copy of the dataset with bad rows removed. Also, the data is reassigned from the copy creating a performance slowdown. The astype() call becomes unnecessary after the data is already loaded with the specified schema.
To overcome the problems, we can use boolean masking for the mismatch data so that the a boolean series keeps track of the rows with type mismatch during the column iteration.
is_bad = pd.Series(False, index=df.index)
for col in df.columns:
is_numeric = pd.to_numeric(df[col], errors='coerce').notna()
if col in string_columns:
is_bad |= is_numeric
elif col in numeric_columns:
is_bad |= ~is_numeric
Once, the columns are scanned, the boolean mask can be used to split the data into clean and corrupted data using df[~is_bad], df[is_bad]. The discarded data is appended to discarded_list, which is passes to the function as argument, using
if discarded_list is not None:
discarded_list.append(df[is_bad].copy())
Fixing Path Problems Instead of manually writing the paths, we let the python create the path of the files that are needed.
- The paths in the config files should be relative to the project folder. For instance the data path is
raw_path: "data/flights.csv". - A separate python file
constants.pyin theutilsfolder finds the absolute path of project folder as
Data and Config directories path are computed asPROJECT_ROOT = Path(__file__).parent.parent.parent.resolve()DATA_DIR = PROJECT_ROOT / "data" CONFIG_DIR = PROJECT_ROOT / "config"
File Loading Utilities
Functions to load the files in the config folder are defined in utils/loaders.py.
def getConfigResourcePath(file_name):
config_path = CONFIG_DIR / f"{file_name}.yaml"
if config_path.exists():
return config_path
else:
raise FileNotFoundError(f"Missing {file_name} at {config_path}")
Pipeline
graph LR
subgraph Inference Pipeline
direction LR
data[Flight]
airport[Airport]
weather[Weather]
dp[Data Pipeline]
gold[(Gold)]
model[Model]
routing[Routing]
interface[Interface]
data --"fl_m_cols"--> dp
airport --> dp
weather --"w_n_cols"--> dp
dp --"fl_w_p_cols"--> gold
gold --"fl_w_p_cols"--> model
model --"p_{risk} < p_0"--> interface
model --"p_{risk} >= p_0"--> routing
routing --> interface
gold --"fl_q_cols"--> routing
end
Logging the Pipeline
The utils.constants.py file reads te environment variables and set them as constants.
The metric for logging is naturally whether the variable exists in the env file.
The following function reads the env variable and logs a warning in case the variable is not set.
logger = logging.getLogger("airflow.task")
load_dotenv()
def get_env_var(var_name: str) -> str:
env_var = os.getenv(var_name)
if env_var is None:
logger.warning(f"The environment variable {var_name} is not set.")
logger.warning(f"Setting {var_name} to empty string.")
return str(env_var)
The file_handler.py file has the log metrics as following.
- file exist status
logger = logging.getLogger("airflow.task")
def loader(func):
def error_wrapper(filepath, *args, **kwargs):
try:
logger.info(f"Reading the file {filepath}.")
content = func(filepath, *args, **kwargs)
logger.debug(f"File read successfully: {filepath}.")
return content
except Exception as e:
logger.error(f"Error occured while loading the file {filepath}: {e}")
raise
return error_wrapper
def dumper(func):
def error_wrapper(filepath, *args, **kwargs):
try:
logger.info(f"Writing to the file {filepath}")
func(filepath, *args, **kwargs)
logger.debug(f"File written successfully: {filepath}.")
except Exception as e:
logger.error(f"Error occured during the file write {filepath}: {e}")
raise
return error_wrapper