"""
Star Aristocracy - Environmental Sensor Node Monitor v0.3
Prometheus Energy - Distributed energy systems
Simulates IoT sensor network with anomaly detection
Hardware-ready: Compatible with DHT22, MQ-135, Raspberry Pi
"""
import random
import time
import csv
from datetime import datetime
from collections import deque
class SensorNode:
def __init__(self, node_id, location):
self.node_id = node_id
self.location = location
self.temp_baseline = 22.0 # Celsius
self.humidity_baseline = 45.0 # %
self.air_quality_baseline = 50 # PPM CO2 equivalent
self.history = deque(maxlen=20) # Rolling window for anomaly detection
def read_sensors(self):
"""Simulate sensor readings (replace with actual GPIO/I2C reads)"""
# Add realistic drift and noise
temp = self.temp_baseline + random.gauss(0, 2)
humidity = self.humidity_baseline + random.gauss(0, 5)
air_quality = self.air_quality_baseline + random.gauss(0, 10)
# Occasionally simulate anomalies
if random.random() < 0.05: # 5% anomaly rate
temp += random.choice([10, -8])
air_quality += random.choice([100, -30])
reading = {
'timestamp': datetime.now().isoformat(),
'node_id': self.node_id,
'location': self.location,
'temperature': round(temp, 2),
'humidity': round(max(0, min(100, humidity)), 2),
'air_quality_ppm': round(max(0, air_quality), 0)
}
self.history.append(reading)
return reading
def detect_anomaly(self, reading):
"""Simple statistical anomaly detection"""
if len(self.history) < 10:
return None
# Calculate mean and std dev from history
temps = [r['temperature'] for r in list(self.history)[:-1]]
aq_vals = [r['air_quality_ppm'] for r in list(self.history)[:-1]]
temp_mean = sum(temps) / len(temps)
temp_std = (sum((x - temp_mean)**2 for x in temps) / len(temps))**0.5
aq_mean = sum(aq_vals) / len(aq_vals)
aq_std = (sum((x - aq_mean)**2 for x in aq_vals) / len(aq_vals))**0.5
# Flag if reading is >2 standard deviations from mean
anomalies = []
if abs(reading['temperature'] - temp_mean) > 2 * temp_std:
anomalies.append(f"Temperature spike: {reading['temperature']}°C")
if abs(reading['air_quality_ppm'] - aq_mean) > 2 * aq_std:
anomalies.append(f"Air quality alert: {reading['air_quality_ppm']} PPM")
return anomalies if anomalies else None
class SensorNetwork:
def __init__(self):
self.nodes = [
SensorNode("NODE_ALPHA", "Facility_A - Server Room"),
SensorNode("NODE_BRAVO", "Facility_A - Laboratory"),
SensorNode("NODE_CHARLIE", "Facility_B - Warehouse"),
]
self.all_readings = []
def poll_network(self):
"""Poll all sensor nodes"""
readings = []
for node in self.nodes:
reading = node.read_sensors()
anomaly = node.detect_anomaly(reading)
if anomaly:
reading['alert'] = "; ".join(anomaly)
self._trigger_alert(reading)
else:
reading['alert'] = None
readings.append(reading)
self.all_readings.append(reading)
return readings
def _trigger_alert(self, reading):
"""Log security alert for anomalous readings"""
print(f"\n⚠️ [ALERT] {reading['node_id']} - {reading['location']}")
print(f" {reading['alert']}")
print(f" Time: {reading['timestamp']}\n")
def save_data(self, filename='sensor_network_log.csv'):
"""Export all readings to CSV"""
if not self.all_readings:
return
keys = self.all_readings[0].keys()
with open(filename, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=keys)
writer.writeheader()
writer.writerows(self.all_readings)
print(f"[NETWORK] Data saved to {filename}")
def main():
print("=" * 60)
print("STAR ARISTOCRACY - ENVIRONMENTAL SENSOR NETWORK v0.3")
print("=" * 60)
network = SensorNetwork()
print(f"Active nodes: {len(network.nodes)}")
for node in network.nodes:
print(f" • {node.node_id}: {node.location}")
print("\nMonitoring started...\n")
# Run monitoring loop
for cycle in range(50):
readings = network.poll_network()
# Display summary every 10 cycles
if cycle % 10 == 0:
print(f"[CYCLE {cycle}] Network Status:")
for reading in readings:
status = "⚠️ ALERT" if reading['alert'] else "✓ Normal"
print(f" {reading['node_id']}: {reading['temperature']}°C, "
f"{reading['humidity']}% RH, "
f"{reading['air_quality_ppm']} PPM - {status}")
print()
time.sleep(0.1) # Simulate sensor polling interval
network.save_data()
# Summary statistics
total_alerts = sum(1 for r in network.all_readings if r['alert'])
print(f"\n{'=' * 60}")
print(f"Monitoring Complete:")
print(f" Total readings: {len(network.all_readings)}")
print(f" Alerts triggered: {total_alerts}")
print(f" Alert rate: {total_alerts/len(network.all_readings)*100:.1f}%")
if __name__ == "__main__":
main()
Leave a Reply