Network Intrusion Detection System v0.4

"""
Star Aristocracy - Network Intrusion Detection System v0.4
Phosphor Group - Cyber Operations
Simulates network traffic monitoring with threat detection
Expandable to real packet capture with Scapy/pcap
"""

import random
import time
import csv
from datetime import datetime
from collections import defaultdict, deque

class NetworkPacket:
    """Simulated network packet"""
    def __init__(self, src_ip, dst_ip, port, protocol, size):
        self.timestamp = datetime.now()
        self.src_ip = src_ip
        self.dst_ip = dst_ip
        self.port = port
        self.protocol = protocol
        self.size = size
        
    def to_dict(self):
        return {
            'timestamp': self.timestamp.isoformat(),
            'src_ip': self.src_ip,
            'dst_ip': self.dst_ip,
            'port': self.port,
            'protocol': self.protocol,
            'size_bytes': self.size
        }

class TrafficGenerator:
    """Generate realistic and malicious network traffic"""
    def __init__(self):
        self.internal_ips = [f"192.168.1.{i}" for i in range(10, 20)]
        self.external_ips = [f"203.0.113.{i}" for i in range(1, 50)]
        self.common_ports = [80, 443, 22, 3389, 53, 21]
        self.attack_ips = ["198.51.100.66", "198.51.100.77"]  # Known bad actors
        
    def generate_normal_traffic(self):
        """Generate benign traffic"""
        return NetworkPacket(
            src_ip=random.choice(self.internal_ips),
            dst_ip=random.choice(self.external_ips),
            port=random.choice(self.common_ports),
            protocol=random.choice(['TCP', 'UDP']),
            size=random.randint(64, 1500)
        )
    
    def generate_attack_traffic(self):
        """Generate various attack patterns"""
        attack_type = random.choice(['port_scan', 'dos', 'data_exfil'])
        
        if attack_type == 'port_scan':
            # Port scanning: same source, multiple ports
            return NetworkPacket(
                src_ip=random.choice(self.attack_ips),
                dst_ip=random.choice(self.internal_ips),
                port=random.randint(1, 65535),
                protocol='TCP',
                size=64
            )
        elif attack_type == 'dos':
            # DoS: high volume from single source
            return NetworkPacket(
                src_ip=random.choice(self.attack_ips),
                dst_ip=random.choice(self.internal_ips),
                port=80,
                protocol='TCP',
                size=random.randint(1000, 1500)
            )
        else:  # data_exfil
            # Large outbound transfers
            return NetworkPacket(
                src_ip=random.choice(self.internal_ips),
                dst_ip=random.choice(self.attack_ips),
                port=443,
                protocol='TCP',
                size=random.randint(5000, 10000)
            )

class IntrusionDetectionSystem:
    def __init__(self):
        self.packet_log = []
        self.alerts = []
        self.connection_tracker = defaultdict(lambda: deque(maxlen=100))
        self.port_scan_tracker = defaultdict(set)
        self.rate_limiter = defaultdict(lambda: deque(maxlen=50))
        
    def analyze_packet(self, packet):
        """Multi-layer threat detection"""
        threats = []
        
        # Rule 1: Known malicious IPs
        if packet.src_ip in ["198.51.100.66", "198.51.100.77"]:
            threats.append("Known malicious source IP")
        
        # Rule 2: Port scanning detection
        self.port_scan_tracker[packet.src_ip].add(packet.port)
        if len(self.port_scan_tracker[packet.src_ip]) > 10:  # >10 unique ports
            threats.append(f"Port scan detected from {packet.src_ip}")
            self.port_scan_tracker[packet.src_ip].clear()  # Reset after alert
        
        # Rule 3: DoS detection (rate limiting)
        self.rate_limiter[packet.src_ip].append(time.time())
        if len(self.rate_limiter[packet.src_ip]) == 50:
            time_span = self.rate_limiter[packet.src_ip][-1] - self.rate_limiter[packet.src_ip][0]
            if time_span < 5:  # 50 packets in <5 seconds
                threats.append(f"Possible DoS attack from {packet.src_ip}")
        
        # Rule 4: Unusual data exfiltration
        if packet.size > 5000 and packet.dst_ip not in self.is_internal(packet.src_ip):
            threats.append(f"Large outbound transfer detected: {packet.size} bytes")
        
        # Rule 5: Suspicious ports
        if packet.port in [4444, 31337, 12345]:  # Common backdoor ports
            threats.append(f"Suspicious port access: {packet.port}")
        
        return threats
    
    def is_internal(self, ip):
        """Check if IP is internal"""
        return ip.startswith("192.168") or ip.startswith("10.")
    
    def process_packet(self, packet):
        """Log packet and check for threats"""
        self.packet_log.append(packet.to_dict())
        threats = self.analyze_packet(packet)
        
        if threats:
            alert = {
                'timestamp': packet.timestamp.isoformat(),
                'src_ip': packet.src_ip,
                'dst_ip': packet.dst_ip,
                'threats': "; ".join(threats)
            }
            self.alerts.append(alert)
            self._trigger_alert(alert)
    
    def _trigger_alert(self, alert):
        """Display security alert"""
        print(f"🚨 [THREAT DETECTED] {alert['timestamp']}")
        print(f"   Source: {alert['src_ip']} → Destination: {alert['dst_ip']}")
        print(f"   Threat: {alert['threats']}\n")
    
    def save_logs(self):
        """Export logs and alerts"""
        # Save packet log
        if self.packet_log:
            with open('network_traffic.csv', 'w', newline='') as f:
                writer = csv.DictWriter(f, fieldnames=self.packet_log[0].keys())
                writer.writeheader()
                writer.writerows(self.packet_log)
            print("[IDS] Traffic log saved to network_traffic.csv")
        
        # Save alerts
        if self.alerts:
            with open('security_alerts.csv', 'w', newline='') as f:
                writer = csv.DictWriter(f, fieldnames=self.alerts[0].keys())
                writer.writeheader()
                writer.writerows(self.alerts)
            print("[IDS] Security alerts saved to security_alerts.csv")

def main():
    print("=" * 60)
    print("STAR ARISTOCRACY - NETWORK IDS v0.4")
    print("Digital PMC - Cyber Operations Division")
    print("=" * 60)
    
    ids = IntrusionDetectionSystem()
    generator = TrafficGenerator()
    
    print("Monitoring network traffic...\n")
    
    # Simulate traffic (90% normal, 10% attacks)
    for i in range(200):
        if random.random() < 0.1:
            packet = generator.generate_attack_traffic()
        else:
            packet = generator.generate_normal_traffic()
        
        ids.process_packet(packet)
        
        # Status update every 50 packets
        if (i + 1) % 50 == 0:
            print(f"[INFO] Processed {i + 1} packets, "
                  f"{len(ids.alerts)} threats detected\n")
        
        time.sleep(0.02)  # Simulate packet processing time
    
    # Summary
    print("=" * 60)
    print("Monitoring Session Complete")
    print(f"  Total packets analyzed: {len(ids.packet_log)}")
    print(f"  Threats detected: {len(ids.alerts)}")
    print(f"  Threat rate: {len(ids.alerts)/len(ids.packet_log)*100:.2f}%")
    print("=" * 60)
    
    ids.save_logs()

if __name__ == "__main__":
    main()

Leave a Reply

Your email address will not be published. Required fields are marked *