Building & Automating a Structured Data Pipeline with MySQL, FastAPI, and Python

Objective

The goal of this project is to automate the generation, ingestion, and storage of structured data related to Indian cars. By leveraging Python, FastAPI, and MySQL, we will create a real-time data pipeline that continuously inserts car data into a database after every 15 seconds.

This project demonstrates how to build a scalable and efficient data pipeline, which is essential in various industries such as automotive management, e-commerce, and IoT applications. This project will serve as a foundation for understanding how to automate data workflows, which is crucial in various industrial and commercial applications.

Project Structure

Here’s the updated project structure:

car_data_pipeline/
├── backend.py # FastAPI backend
├── generate_car_data.py # Python script to generate and send car data
├── requirements.txt # Dependencies list
└── README.md # Project documentation

Project Architecture Overview

+-----------------+       +-------------------+       +------------------+
|                 |       |                   |       |                  |
|  Python Script  | ----> |  FastAPI Backend  | ----> |  MySQL Database  |
|                 |       |                   |       |                  |
+-----------------+       +-------------------+       +------------------+

Step 1: Setting Up the MySQL Database

  1. Install MySQL:
  2. Create the Database and Table:
    • Open the MySQL command line or a MySQL client and execute the following commands:
CREATE DATABASE car_db;
USE car_db;

CREATE TABLE cars (
    id INT AUTO_INCREMENT PRIMARY KEY,
    brand VARCHAR(50),
    model VARCHAR(50),
    fuel_type VARCHAR(20),
    year INT,
    price VARCHAR(20)
);

Step 2: Setting Up the FastAPI Backend

a. Install FastAPI and Dependencies:

Create a requirements.txt file and add the following dependencies:

fastapi
uvicorn
pymysql

Install the required packages:

pip install -r requirements.txt

b. Create the FastAPI Application:

Create a file named backend.py and add the following code:

from fastapi import FastAPI
import pymysql

app = FastAPI()

# Database connection
db = pymysql.connect(
    host="localhost",
    port = 3306,
    user="your_username",
    password="your_password",
    database="car_db"
)
cursor = db.cursor()

@app.post("/cars")
def add_car(car: dict):
    sql = "INSERT INTO cars (brand, model, fuel_type, year, price) VALUES (%s, %s, %s, %s, %s)"
    val = (car["brand"], car["model"], car["fuel_type"], car["year"], car["price"])
    cursor.execute(sql, val)
    db.commit()
    return {"status": "success", "car": car}

c. Run FastAPI Server:

Start the FastAPI server:

uvicorn main:app --reload

Step 3: Writing the Python Script to Generate and Send Data

a. Create the Python Script:

Create a file named generate_car_data.py with the following code:

import random
import time
import requests

car_brands = {
    "Tata": ["Nexon", "Harrier", "Safari", "Altroz", "Tiago"],
    "Maruti Suzuki": ["Swift", "Baleno", "Alto", "Ciaz", "Dzire"],
    "Hyundai": ["i20", "Creta", "Venue", "Verna", "Santro"],
    "Mahindra": ["Thar", "XUV500", "Scorpio", "Bolero", "Marazzo"],
    "Honda": ["City", "Amaze", "Jazz", "WR-V", "Civic"],
    "Toyota": ["Fortuner", "Innova", "Yaris", "Glanza", "Urban Cruiser"]
}

fuel_types = ["Petrol", "Diesel", "CNG", "Electric"]

api_url = "http://127.0.0.1:8000/cars"

def generate_random_car_data():
    brand = random.choice(list(car_brands.keys()))
    model = random.choice(car_brands[brand])
    fuel_type = random.choice(fuel_types)
    year = random.randint(2015, 2024)
    price = round(random.uniform(3.5, 30.0), 2)

    return {
        "brand": brand,
        "model": model,
        "fuel_type": fuel_type,
        "year": year,
        "price": f"{price} Lakhs"
    }

def send_data_to_api(car_data):
    response = requests.post(api_url, json=car_data)
    if response.status_code == 200:
        print("Data sent successfully:", car_data)
    else:
        print("Failed to send data:", response.status_code, response.text)

def main():
    while True:
        car_data = generate_random_car_data()
        send_data_to_api(car_data)
        time.sleep(15)

if __name__ == "__main__":
    main()

b. Run the Script:

Execute the script:

python generate_car_data.py

Step 4: Monitoring Data Insertion

You can monitor the MySQL database to see the data being inserted every 15 seconds. Use a MySQL client or the command line to check the cars table in the car_db database.

Conclusion

By following these steps, we’ve successfully built and automated a structured data pipeline for Indian cars using MySQL, FastAPI, and Python. This pipeline continuously generates and stores car data, making it easier to manage and analyze. Currently we have set 15 seconds delay to generate the data but you can change as per your requirements.

This setup is highly applicable in industries where real-time data processing is essential, such as in automotive management, IoT applications, and e-commerce platforms.

Leave a Reply