AI Features

Solution: Build a Sensor Monitoring System

Learn how to implement the coded solution of a sensor data monitoring system using concurrency.

Implement sensor simulation

Define sensor classes (TemperatureSensor, PressureSensor, and VibrationSensor) that simulate sensor data and detect anomalies, producing alerts based on specific thresholds and tracking last alerts and counts.

Python 3.10.4
import random
import time
class Sensor:
def __init__(self, sensor_id):
self.sensor_id = sensor_id
self.last_alert = None
self.alert_count = 0
def generate_data(self):
raise NotImplementedError("Subclasses must implement generate_data method")
def process_data(self, data):
raise NotImplementedError("Subclasses must implement process_data method")
class TemperatureSensor(Sensor):
def generate_data(self):
temperature = random.uniform(20, 30) # Simulate temperature readings
return {"sensor_id": self.sensor_id, "temperature": temperature}
def process_data(self, data):
temperature = data["temperature"]
if temperature > 28 and self.last_alert != "high_temperature":
self.last_alert = "high_temperature"
self.alert_count += 1
return f"High temperature detected! Sensor {data['sensor_id']} reading: {temperature:.2f}°C"
elif temperature <= 28 and self.last_alert == "high_temperature":
self.last_alert = None
return "Temperature back to normal."
class PressureSensor(Sensor):
def generate_data(self):
pressure = random.uniform(100, 150) # Simulate pressure readings
return {"sensor_id": self.sensor_id, "pressure": pressure}
def process_data(self, data):
pressure = data["pressure"]
if pressure > 130 and self.last_alert != "high_pressure":
self.last_alert = "high_pressure"
self.alert_count += 1
return f"High pressure detected! Sensor {data['sensor_id']} reading: {pressure:.2f} psi"
elif pressure <= 130 and self.last_alert == "high_pressure":
self.last_alert = None
return "Pressure back to normal."
class VibrationSensor(Sensor):
def generate_data(self):
vibration = random.uniform(0.5, 2.5) # Simulate vibration readings
return {"sensor_id": self.sensor_id, "vibration": vibration}
def process_data(self, data):
vibration = data["vibration"]
if vibration > 2.0 and self.last_alert != "excessive_vibration":
self.last_alert = "excessive_vibration"
self.alert_count += 1
return f"Excessive vibration detected! Sensor {data['sensor_id']} reading: {vibration:.2f}"
elif vibration <= 2.0 and self.last_alert == "excessive_vibration":
self.last_alert = None
return "Vibration back to normal."

Code explanation

  • ...

Ask