In today's data-driven landscape, analyzing extensive datasets is essential for deriving business insights. Our Taxi Data Analytics application leverages Airflow, Spark, Delta Lake, Debezium, Kafka, DBT, and Great Expectations to convert raw taxi trip data into actionable intelligence.
At the beginning, our operations encountered considerable difficulties in integrating various data sources. The diversity in systems and formats posed challenges in consolidating and thoroughly analyzing trip data.
This fragmentation led to incomplete insights and impeded our capability to make informed decisions based on data effectively. Hence, we required a robust solution to consolidate our data sources or streamline the analysis process.
.
├── airflow/ /* airflow folder including dags,.. /*
├── batch_processing/
│ └── datalake_to_dw.py /* ETL data from datalake to staging area /*
├── configs/ /* contain config files /*
│ ├── spark.yaml
│ └── datalake.yaml
├── data/ /* contain dataset /*
│ ├── 2020/
│ ├── 2021/
│ ├── 2022/
│ ├── green_tripdata_2022-01.parquet
│ ├── green_tripdata_2022-02.parquet
│ ├── green_tripdata_2022-03.parquet
│ ├── ...
│ ├── yellow_tripdata_2022-01.parquet
│ ├── yellow_tripdata_2022-02.parquet
│ ├── yellow_tripdata_2022-03.parquet
│ └── ...
│ ├── 2023/
│ └── 2024/
├── data_validation/ /* validate data before loading data warehouse /*
│ ├── gx/
│ ├── checkpoints/
│ ├── expectations/
│ ├── ...
│ └── great_expections.yml
│ ├── full_flow.ipynb
│ └── reload_and_validate.ipynb
├── dbt_nyc/ /* data transformation folder /*
├── debezium/ /* CDC folder /*
│ ├── configs/
│ └── taxi-nyc-cdc-json /* file config to connect between database and kafka through debezium /*
│ └── run.sh /* run create connector */
├── imgs/
├── jars/ /* JAR files for Spark version 3.5.1 */
├── scripts/
│ ├── data/
│ └── taxi_lookup.csv /* CSV file to look up latitude and longitude */
│ ├── extract_load.py /* upload data from local to 'raw' bucket (MinIO) */
│ ├── transform_data.py /* transform data to 'processed' bucket (MinIO) */
│ └── convert_to_delta.py /* convert data parquet file from 'processed' to 'delta' bucket (MinIO) */
├── streaming_processing/
│ ├── read_parquet_streaming.py
│ ├── schema_config.json
│ └── streaming_to_datalake.py /* read data stream in kafka topic and write to 'raw' bucket (Minio) */
├── trino/
│ ├── catalog/
│ └── datalake.properties
│ ├── etc/
│ ├── config.properties
│ ├── jvm.config
│ └── node.properties
├── utils/ /* functions /*
│ ├── create_schema.py
│ ├── create_table.py
│ ├── postgresql_client.py /* PostgreSQL Client: create connect, execute query, get columns in bucket /*
│ ├── helper.py
│ ├── minio_utils.py /* Minio Client: create connect, create bucket, list parquet files in bucket /*
│ ├── streaming_data_json.py /* stream data json format into kafka */
│ ├── streaming_data_db.py /* stream data into database */
│ └── trino_db_scripts_generate.py
├── .env
├── .gitignore
├── airflow-docker-compose.yaml
├── docker-compose.yaml
├── Makefile
├── README.md
├── requirements.txt
└── stream-docker-compose.yaml
Clone the repository:
git clone https://github.com/trannhatnguyen2/NYC_Taxi_Data_Pipeline
Start all infrastructures:
make run_all
This command will download the necessary Docker images, create containers, and start the services in detached mode.
Setup environment:
conda create -n bigdata python==3.9
y
conda activate bigdata
pip install -r requirements.txt
Activate your conda environment and install required packages
Access the Services:
5432
.http://localhost:9021
.http://localhost:8085
.http://localhost:9001
.http://localhost:8080
.Download Dataset: You can download and use this dataset in here: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
Download JAR files for Spark:
mkdir jars
cd jars
curl -O https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar
curl -O https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar
curl -O https://repo1.maven.org/maven2/org/postgresql/postgresql/42.4.3/postgresql-42.4.3.jar
curl -O https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.2.1/spark-sql-kafka-0-10_2.12-3.2.1.jar
raw
bucket - Datalake (MinIO): python src/local_to_raw.py
raw
to processed
bucket (MinIO): python src/raw_to_processed.py
python src/processed_to_delta.py
staging
, production
and table staging.nyc_taxi
in PostgreSQL python utils/create_schema.py
python utils/create_table.py
python batch_processing/datalake_to_dw.py
This command may take a little time to process.
cd data_validation
great_expectations init
Y
Then, run the file full_flow.ipynb
cd dbt_nyc
Read README.md
for the next steps
cd debezium/
bash run.sh register_connector configs/taxi-nyc-cdc.json
cd ..
python utils/create_schema.py
python utils/create_table.py
python utils/streaming_data_db.py
Access localhost:9021
to check the data stream in the device.iot.taxi_nyc_time_series
Topic.
python stream_processing/streaming_to_datalake.py
After putting your files to MinIO
, please execute trino
container by the following command:
docker exec -ti datalake-trino bash
trino
After that, run the following command to register a new schema for our data:
CREATE SCHEMA IF NOT EXISTS datalake.stream
WITH (location = 's3://raw/');
CREATE TABLE IF NOT EXISTS datalake.stream.nyc_taxi(
VendorID INT,
tpep_pickup_datetime TIMESTAMP,
tpep_dropoff_datetime TIMESTAMP,
passenger_count DOUBLE,
trip_distance DOUBLE,
RatecodeID DOUBLE,
store_and_fwd_flag VARCHAR,
PULocationID INT,
DOLocationID INT,
payment_type INT,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
congestion_surcharge DOUBLE,
airport_fee DOUBLE
) WITH (
external_location = 's3://raw/stream',
format = 'PARQUET'
);
cd airflow/
Read README.md
for the next steps