"""
Star Aristocracy - Network Intrusion Detection System v0.4
Phosphor Group - Cyber Operations
Simulates network traffic monitoring with threat detection
Expandable to real packet capture with Scapy/pcap
"""
import random
import time
import csv
from datetime import datetime
from collections import defaultdict, deque
class NetworkPacket:
"""Simulated network packet"""
def __init__(self, src_ip, dst_ip, port, protocol, size):
self.timestamp = datetime.now()
self.src_ip = src_ip
self.dst_ip = dst_ip
self.port = port
self.protocol = protocol
self.size = size
def to_dict(self):
return {
'timestamp': self.timestamp.isoformat(),
'src_ip': self.src_ip,
'dst_ip': self.dst_ip,
'port': self.port,
'protocol': self.protocol,
'size_bytes': self.size
}
class TrafficGenerator:
"""Generate realistic and malicious network traffic"""
def __init__(self):
self.internal_ips = [f"192.168.1.{i}" for i in range(10, 20)]
self.external_ips = [f"203.0.113.{i}" for i in range(1, 50)]
self.common_ports = [80, 443, 22, 3389, 53, 21]
self.attack_ips = ["198.51.100.66", "198.51.100.77"] # Known bad actors
def generate_normal_traffic(self):
"""Generate benign traffic"""
return NetworkPacket(
src_ip=random.choice(self.internal_ips),
dst_ip=random.choice(self.external_ips),
port=random.choice(self.common_ports),
protocol=random.choice(['TCP', 'UDP']),
size=random.randint(64, 1500)
)
def generate_attack_traffic(self):
"""Generate various attack patterns"""
attack_type = random.choice(['port_scan', 'dos', 'data_exfil'])
if attack_type == 'port_scan':
# Port scanning: same source, multiple ports
return NetworkPacket(
src_ip=random.choice(self.attack_ips),
dst_ip=random.choice(self.internal_ips),
port=random.randint(1, 65535),
protocol='TCP',
size=64
)
elif attack_type == 'dos':
# DoS: high volume from single source
return NetworkPacket(
src_ip=random.choice(self.attack_ips),
dst_ip=random.choice(self.internal_ips),
port=80,
protocol='TCP',
size=random.randint(1000, 1500)
)
else: # data_exfil
# Large outbound transfers
return NetworkPacket(
src_ip=random.choice(self.internal_ips),
dst_ip=random.choice(self.attack_ips),
port=443,
protocol='TCP',
size=random.randint(5000, 10000)
)
class IntrusionDetectionSystem:
def __init__(self):
self.packet_log = []
self.alerts = []
self.connection_tracker = defaultdict(lambda: deque(maxlen=100))
self.port_scan_tracker = defaultdict(set)
self.rate_limiter = defaultdict(lambda: deque(maxlen=50))
def analyze_packet(self, packet):
"""Multi-layer threat detection"""
threats = []
# Rule 1: Known malicious IPs
if packet.src_ip in ["198.51.100.66", "198.51.100.77"]:
threats.append("Known malicious source IP")
# Rule 2: Port scanning detection
self.port_scan_tracker[packet.src_ip].add(packet.port)
if len(self.port_scan_tracker[packet.src_ip]) > 10: # >10 unique ports
threats.append(f"Port scan detected from {packet.src_ip}")
self.port_scan_tracker[packet.src_ip].clear() # Reset after alert
# Rule 3: DoS detection (rate limiting)
self.rate_limiter[packet.src_ip].append(time.time())
if len(self.rate_limiter[packet.src_ip]) == 50:
time_span = self.rate_limiter[packet.src_ip][-1] - self.rate_limiter[packet.src_ip][0]
if time_span < 5: # 50 packets in <5 seconds
threats.append(f"Possible DoS attack from {packet.src_ip}")
# Rule 4: Unusual data exfiltration
if packet.size > 5000 and packet.dst_ip not in self.is_internal(packet.src_ip):
threats.append(f"Large outbound transfer detected: {packet.size} bytes")
# Rule 5: Suspicious ports
if packet.port in [4444, 31337, 12345]: # Common backdoor ports
threats.append(f"Suspicious port access: {packet.port}")
return threats
def is_internal(self, ip):
"""Check if IP is internal"""
return ip.startswith("192.168") or ip.startswith("10.")
def process_packet(self, packet):
"""Log packet and check for threats"""
self.packet_log.append(packet.to_dict())
threats = self.analyze_packet(packet)
if threats:
alert = {
'timestamp': packet.timestamp.isoformat(),
'src_ip': packet.src_ip,
'dst_ip': packet.dst_ip,
'threats': "; ".join(threats)
}
self.alerts.append(alert)
self._trigger_alert(alert)
def _trigger_alert(self, alert):
"""Display security alert"""
print(f"🚨 [THREAT DETECTED] {alert['timestamp']}")
print(f" Source: {alert['src_ip']} → Destination: {alert['dst_ip']}")
print(f" Threat: {alert['threats']}\n")
def save_logs(self):
"""Export logs and alerts"""
# Save packet log
if self.packet_log:
with open('network_traffic.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=self.packet_log[0].keys())
writer.writeheader()
writer.writerows(self.packet_log)
print("[IDS] Traffic log saved to network_traffic.csv")
# Save alerts
if self.alerts:
with open('security_alerts.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=self.alerts[0].keys())
writer.writeheader()
writer.writerows(self.alerts)
print("[IDS] Security alerts saved to security_alerts.csv")
def main():
print("=" * 60)
print("STAR ARISTOCRACY - NETWORK IDS v0.4")
print("Digital PMC - Cyber Operations Division")
print("=" * 60)
ids = IntrusionDetectionSystem()
generator = TrafficGenerator()
print("Monitoring network traffic...\n")
# Simulate traffic (90% normal, 10% attacks)
for i in range(200):
if random.random() < 0.1:
packet = generator.generate_attack_traffic()
else:
packet = generator.generate_normal_traffic()
ids.process_packet(packet)
# Status update every 50 packets
if (i + 1) % 50 == 0:
print(f"[INFO] Processed {i + 1} packets, "
f"{len(ids.alerts)} threats detected\n")
time.sleep(0.02) # Simulate packet processing time
# Summary
print("=" * 60)
print("Monitoring Session Complete")
print(f" Total packets analyzed: {len(ids.packet_log)}")
print(f" Threats detected: {len(ids.alerts)}")
print(f" Threat rate: {len(ids.alerts)/len(ids.packet_log)*100:.2f}%")
print("=" * 60)
ids.save_logs()
if __name__ == "__main__":
main()
Leave a Reply