Environmental Sensor Node Monitor v0.3

"""
Star Aristocracy - Environmental Sensor Node Monitor v0.3
Prometheus Energy - Distributed energy systems
Simulates IoT sensor network with anomaly detection
Hardware-ready: Compatible with DHT22, MQ-135, Raspberry Pi
"""

import random
import time
import csv
from datetime import datetime
from collections import deque

class SensorNode:
    def __init__(self, node_id, location):
        self.node_id = node_id
        self.location = location
        self.temp_baseline = 22.0  # Celsius
        self.humidity_baseline = 45.0  # %
        self.air_quality_baseline = 50  # PPM CO2 equivalent
        self.history = deque(maxlen=20)  # Rolling window for anomaly detection
        
    def read_sensors(self):
        """Simulate sensor readings (replace with actual GPIO/I2C reads)"""
        # Add realistic drift and noise
        temp = self.temp_baseline + random.gauss(0, 2)
        humidity = self.humidity_baseline + random.gauss(0, 5)
        air_quality = self.air_quality_baseline + random.gauss(0, 10)
        
        # Occasionally simulate anomalies
        if random.random() < 0.05:  # 5% anomaly rate
            temp += random.choice([10, -8])
            air_quality += random.choice([100, -30])
        
        reading = {
            'timestamp': datetime.now().isoformat(),
            'node_id': self.node_id,
            'location': self.location,
            'temperature': round(temp, 2),
            'humidity': round(max(0, min(100, humidity)), 2),
            'air_quality_ppm': round(max(0, air_quality), 0)
        }
        
        self.history.append(reading)
        return reading
    
    def detect_anomaly(self, reading):
        """Simple statistical anomaly detection"""
        if len(self.history) < 10:
            return None
        
        # Calculate mean and std dev from history
        temps = [r['temperature'] for r in list(self.history)[:-1]]
        aq_vals = [r['air_quality_ppm'] for r in list(self.history)[:-1]]
        
        temp_mean = sum(temps) / len(temps)
        temp_std = (sum((x - temp_mean)**2 for x in temps) / len(temps))**0.5
        
        aq_mean = sum(aq_vals) / len(aq_vals)
        aq_std = (sum((x - aq_mean)**2 for x in aq_vals) / len(aq_vals))**0.5
        
        # Flag if reading is >2 standard deviations from mean
        anomalies = []
        if abs(reading['temperature'] - temp_mean) > 2 * temp_std:
            anomalies.append(f"Temperature spike: {reading['temperature']}°C")
        if abs(reading['air_quality_ppm'] - aq_mean) > 2 * aq_std:
            anomalies.append(f"Air quality alert: {reading['air_quality_ppm']} PPM")
        
        return anomalies if anomalies else None

class SensorNetwork:
    def __init__(self):
        self.nodes = [
            SensorNode("NODE_ALPHA", "Facility_A - Server Room"),
            SensorNode("NODE_BRAVO", "Facility_A - Laboratory"),
            SensorNode("NODE_CHARLIE", "Facility_B - Warehouse"),
        ]
        self.all_readings = []
        
    def poll_network(self):
        """Poll all sensor nodes"""
        readings = []
        for node in self.nodes:
            reading = node.read_sensors()
            anomaly = node.detect_anomaly(reading)
            
            if anomaly:
                reading['alert'] = "; ".join(anomaly)
                self._trigger_alert(reading)
            else:
                reading['alert'] = None
            
            readings.append(reading)
            self.all_readings.append(reading)
        
        return readings
    
    def _trigger_alert(self, reading):
        """Log security alert for anomalous readings"""
        print(f"\n⚠️  [ALERT] {reading['node_id']} - {reading['location']}")
        print(f"    {reading['alert']}")
        print(f"    Time: {reading['timestamp']}\n")
    
    def save_data(self, filename='sensor_network_log.csv'):
        """Export all readings to CSV"""
        if not self.all_readings:
            return
        
        keys = self.all_readings[0].keys()
        with open(filename, 'w', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=keys)
            writer.writeheader()
            writer.writerows(self.all_readings)
        print(f"[NETWORK] Data saved to {filename}")

def main():
    print("=" * 60)
    print("STAR ARISTOCRACY - ENVIRONMENTAL SENSOR NETWORK v0.3")
    print("=" * 60)
    
    network = SensorNetwork()
    print(f"Active nodes: {len(network.nodes)}")
    for node in network.nodes:
        print(f"  • {node.node_id}: {node.location}")
    print("\nMonitoring started...\n")
    
    # Run monitoring loop
    for cycle in range(50):
        readings = network.poll_network()
        
        # Display summary every 10 cycles
        if cycle % 10 == 0:
            print(f"[CYCLE {cycle}] Network Status:")
            for reading in readings:
                status = "⚠️  ALERT" if reading['alert'] else "✓ Normal"
                print(f"  {reading['node_id']}: {reading['temperature']}°C, "
                      f"{reading['humidity']}% RH, "
                      f"{reading['air_quality_ppm']} PPM - {status}")
            print()
        
        time.sleep(0.1)  # Simulate sensor polling interval
    
    network.save_data()
    
    # Summary statistics
    total_alerts = sum(1 for r in network.all_readings if r['alert'])
    print(f"\n{'=' * 60}")
    print(f"Monitoring Complete:")
    print(f"  Total readings: {len(network.all_readings)}")
    print(f"  Alerts triggered: {total_alerts}")
    print(f"  Alert rate: {total_alerts/len(network.all_readings)*100:.1f}%")

if __name__ == "__main__":
    main()

Leave a Reply

Your email address will not be published. Required fields are marked *