import ipaddress import socket import subprocess import threading import concurrent.futures from typing import List, Set, Union, Dict import time import os from urllib.parse import urlparse import json from collections import defaultdict active_ips: Set[str] = set() found_cameras: Dict[str, Dict[str, str]] = defaultdict(dict) # 使用defaultdict,简化内层字典的初始化 ip_lock = threading.Lock() camera_lock = threading.Lock() # --- Configuration --- MAX_WORKERS = 100 # Number of concurrent threads for scanning COMMON_CAMERA_PORTS = [ 80, # HTTP (web interface) 443, # HTTPS (secure web interface) 554, # RTSP (Real Time Streaming Protocol) 8000, # Often used by Hikvision (SDK/HTTP) 8080, # Alternative HTTP/RTSP 8001, # Hikvision stream port 37777, # Dahua primary port 37778, # Dahua secondary port 8002, # Often used for camera APIs or secondary streams ] SSH_PORTS = [22] # Potential SSH access for some cameras ONVIF_AVAILABLE = False try: import psutil import onvif try: from onvif import discovery _discovery_method = discovery.find_device print("ONVIF: Using onvif.discovery.find_device for discovery.") except (ImportError, AttributeError): if hasattr(onvif, 'discover') and callable(onvif.discover): _discovery_method = onvif.discover print("ONVIF: Using top-level onvif.discover() for discovery.") else: _discovery_method = None print("ONVIF: No suitable ONVIF discovery method found in 'onvif' module.") raise ImportError("No ONVIF discovery method.") # Force into the outer except block ONVIF_AVAILABLE = True except ImportError as e: print(f"Warning: Required libraries for full ONVIF functionality could not be imported.") print(f" Error: {e}") print(f" Please ensure 'psutil' and 'onvif-zeep' are installed:") print(f" pip install psutil onvif-zeep") print(f" ONVIF discovery will be skipped.") try: import psutil except ImportError: psutil = None _discovery_method = None # --- Imports for SSH service detection --- PARAMIKO_AVAILABLE = False try: import paramiko PARAMIKO_AVAILABLE = True except ImportError: print("Warning: 'paramiko' not installed. SSH service detection will be skipped. " "Install with 'pip install paramiko' for full functionality.") def get_local_ip() -> str: """Gets the local IP address of the machine.""" s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # Connect to a dummy address. Doesn't actually send data. s.connect(('10.255.255.255', 1)) IP = s.getsockname()[0] except Exception: IP = '127.0.0.1' finally: s.close() return IP def get_all_local_networks() -> List[str]: """ Uses psutil to find all active network interfaces and their associated network ranges. Returns a list of network CIDR strings (e.g., '192.168.1.0/24'). """ if not psutil: print("Warning: 'psutil' not available. Only scanning the /24 subnet of the local IP address.") local_ip = get_local_ip() if local_ip == '127.0.0.1': return [] # Cannot determine external network from localhost return [str(ipaddress.IPv4Network(f"{local_ip}/24", strict=False))] networks = set() try: for interface, snics in psutil.net_if_addrs().items(): for snic in snics: if snic.family == socket.AF_INET: # IPv4 address ip_address = snic.address netmask = snic.netmask if ip_address and netmask and ip_address != '127.0.0.1': try: network_obj = ipaddress.IPv4Network(f"{ip_address}/{netmask}", strict=False) networks.add(str(network_obj)) except ipaddress.AddressValueError as e: print(f"Warning: Could not parse IP address or netmask for {interface}: {ip_address}, {netmask}. Error: {e}") return list(networks) except Exception as e: print(f"Error getting network interfaces with psutil: {e}") print("Falling back to scanning only the /24 subnet of the local IP address.") local_ip = get_local_ip() if local_ip == '127.0.0.1': return [] return [str(ipaddress.IPv4Network(f"{local_ip}/24", strict=False))] def onvif_discovery_task() -> None: """Performs ONVIF WS-Discovery to find compatible devices.""" global ONVIF_AVAILABLE, _discovery_method if not ONVIF_AVAILABLE or _discovery_method is None: print("ONVIF: Skipping discovery due to 'psutil' or 'onvif-zeep' not being available or no suitable discovery method.") return print("ONVIF: Starting discovery. This may take a few seconds...") try: discovered_device_xaddrs: List[str] = _discovery_method(timeout=5) if not isinstance(discovered_device_xaddrs, list): discovered_device_xaddrs = [discovered_device_xaddrs] if discovered_device_xaddrs else [] discovered_ips_via_onvif = [] for xaddr in discovered_device_xaddrs: try: parsed_url = urlparse(xaddr) device_ip = parsed_url.hostname if device_ip and device_ip not in discovered_ips_via_onvif: discovered_ips_via_onvif.append(device_ip) except Exception as url_e: print(f"ONVIF: Warning: Could not parse IP from device XAddr '{xaddr}': {url_e}") if discovered_ips_via_onvif: print(f"ONVIF: Found {len(discovered_ips_via_onvif)} potential ONVIF devices via WS-Discovery.") for device_ip in discovered_ips_via_onvif: with ip_lock: active_ips.add(device_ip) with camera_lock: found_cameras[device_ip]['ONVIF_Discovery'] = "ONVIF Device (WS-Discovery)" else: print("ONVIF: No ONVIF devices found via WS-Discovery.") except Exception as e: print(f"ONVIF: Error during ONVIF discovery (using _discovery_method): {e}") def check_socket(ip: str, port: int, timeout: float = 0.5) -> bool: """Attempts to connect to a specific port on an IP address.""" try: with socket.create_connection((ip, port), timeout) as sock: sock.shutdown(socket.SHUT_RDWR) # Gracefully close connection return True except (socket.timeout, ConnectionRefusedError, OSError): return False except Exception as e: return False def check_ssh_banner(ip: str, port: int, timeout: float = 0.5) -> Union[str, bool]: """Attempts to get SSH banner to verify SSH service.""" if not PARAMIKO_AVAILABLE: return False try: transport = paramiko.Transport((ip, port)) transport.connect(timeout=timeout) banner = transport.get_banner() transport.close() return banner.strip() except (paramiko.SSHException, socket.error, socket.timeout): return False except Exception as e: return False def service_scan_task(ip: str) -> None: """Scans common camera ports and SSH ports on a given IP and updates found_cameras.""" for port in COMMON_CAMERA_PORTS: if check_socket(ip, port): with camera_lock: if port == 80: found_cameras[ip]['HTTP'] = f"Open on port {port}" elif port == 443: found_cameras[ip]['HTTPS'] = f"Open on port {port}" elif port == 554: found_cameras[ip]['RTSP'] = f"Open on port {port}" elif port == 8000 or port == 8001: # Common for Hikvision found_cameras[ip]['Hikvision_Service'] = f"Open on port {port}" elif port == 37777 or port == 37778: # Common for Dahua found_cameras[ip]['Dahua_Service'] = f"Open on port {port}" else: found_cameras[ip][f'TCP_{port}'] = f"Open on port {port}" for port in SSH_PORTS: banner = check_ssh_banner(ip, port) if banner: with camera_lock: found_cameras[ip]['SSH'] = f"Open on port {port} (Banner: {banner})" def main(): start_time = time.time() print("--- Starting Network Camera Discovery on RK3588 ---") local_ip = get_local_ip() print(f"Local IP Address: {local_ip}") # Get all local networks for scanning all_local_networks = get_all_local_networks() if not all_local_networks: print("No local networks detected for scanning. Exiting.") return print("Detected local networks for scanning:") for net in all_local_networks: print(f" - {net}") # --- EXCLUDE Docker internal networks --- # If running inside a Docker container, often these are internal. # Adjust these prefixes based on your Docker network configuration if different. all_local_networks_filtered = [] DOCKER_NETWORK_PREFIXES = ["172.17.", "172.18.", "172.19.", "172.20."] # Add more if your Docker uses other 172.x ranges for net_cidr in all_local_networks: is_docker_internal = False for prefix in DOCKER_NETWORK_PREFIXES: if net_cidr.startswith(prefix): is_docker_internal = True break if not is_docker_internal: all_local_networks_filtered.append(net_cidr) else: print(f" - Excluding Docker internal network: {net_cidr}") if not all_local_networks_filtered: print("No external local networks found for scanning after filtering Docker networks. Exiting.") return print("\nFiltered local networks for scanning:") for net in all_local_networks_filtered: print(f" - {net}") # --- ONVIF Discovery (runs independently) --- onvif_discovery_task() # --- Prepare IPs for Service Scan (without relying on 'ping') --- all_ips_for_service_scan = set() # Use the filtered list of networks for network_str in all_local_networks_filtered: try: network = ipaddress.IPv4Network(network_str, strict=False) for ip_obj in network.hosts(): all_ips_for_service_scan.add(str(ip_obj)) except Exception as e: print(f"Error processing network {network_str}: {e}") continue total_ips_for_service_scan = len(all_ips_for_service_scan) print(f"Proceeding directly to service scan of {total_ips_for_service_scan} IPs on common camera ports.") # --- Service Scan for common camera ports --- if total_ips_for_service_scan > 0: with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # Map the service scan task to all IPs identified by local networks executor.map(service_scan_task, list(all_ips_for_service_scan)) # Wait for service scan to complete using concurrent.futures # (executor.map is blocking here, which is fine) # --- Discovery Results --- print("\n--- Discovery Results ---") if found_cameras: print(f"Found {len(found_cameras)} potential camera devices and services:") for ip, services in found_cameras.items(): print(f" IP: {ip}") # Sort services for consistent output for service, details in sorted(services.items()): print(f" - {service}: {details}") else: print("No network cameras or detectable services found based on ONVIF or port scanning.") print(f"\n--- Discovery Finished in {time.time() - start_time:.2f} seconds ---") if __name__ == "__main__": main()