#!/usr/bin/env python3 import subprocess import json import sys import ipaddress import socket import re from typing import List, Set, Dict, Any import time # 导入 time import threading # 导入 threading # 尝试导入 psutil try: import psutil except ImportError: print("错误: 'psutil' 库未找到。请在 Dockerfile 中或使用 'pip install psutil' 安装。") sys.exit(1) # --- 配置 --- COMMON_CAMERA_PORTS = [ 80, # HTTP (web interface) 443, # HTTPS (secure web interface) 554, # RTSP (Real Time Streaming Protocol) 8000, # Often used by Hikvision (SDK/HTTP) 8080, # Alternative HTTP/RTSP 8001, # Hikvision stream port 37777, # Dahua primary port 37778, # Dahua secondary port 8002, # Often used for camera APIs or secondary streams ] NMAP_PORT_STRING = "T:" + ",".join(map(str, COMMON_CAMERA_PORTS)) # 用于过滤 Docker 网络的常见前缀 DOCKER_NETWORK_PREFIXES = ["172.17.", "172.18.", "172.19.", "172.20."] # 全局标志,用于停止进度线程 stop_progress_flag = threading.Event() start_time = time.time() def progress_indicator(): """在后台线程中运行,定期打印进度信息。""" interval = 10 # 每 10 秒报告一次 count = 0 # 打印一个提示,说明进度指示器已启动 print(f"--- Nmap 扫描进行中(每 {interval} 秒更新一次)---", file=sys.stderr) while not stop_progress_flag.is_set(): time_elapsed = time.time() - start_time # 使用 is_set() 检查标志,并等待最多 interval 秒 stop_progress_flag.wait(interval) if not stop_progress_flag.is_set(): count += 1 # 打印一个简单的状态信息 print(f"--- 状态更新 #{count}: 扫描已运行 {time_elapsed:.1f} 秒... ---", file=sys.stderr) def get_local_ip() -> str: """获取本机的本地 IP 地址。""" s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # 连接到一个虚拟地址 s.connect(('10.255.255.255', 1)) IP = s.getsockname()[0] except Exception: IP = '127.0.0.1' finally: s.close() return IP def get_all_local_networks() -> List[str]: """ 使用 psutil 查找所有活动的网络接口及其关联的网络范围。 返回一个网络 CIDR 字符串列表 (例如 '192.168.1.0/24')。 """ networks = set() try: for interface, snics in psutil.net_if_addrs().items(): for snic in snics: if snic.family == socket.AF_INET: # IPv4 ip_address = snic.address netmask = snic.netmask if ip_address and netmask and ip_address != '127.0.0.1': try: # strict=False 允许主机地址 network_obj = ipaddress.IPv4Network(f"{ip_address}/{netmask}", strict=False) networks.add(str(network_obj)) except ipaddress.AddressValueError as e: print(f"警告: 无法解析网络 {interface}: {ip_address}, {netmask}。错误: {e}", file=sys.stderr) return list(networks) except Exception as e: print(f"错误: 使用 psutil 获取网络接口时出错: {e}", file=sys.stderr) print("回退: 仅扫描本地 IP 的 /24 子网。", file=sys.stderr) local_ip = get_local_ip() if local_ip == '127.0.0.1': return [] return [str(ipaddress.IPv4Network(f"{local_ip}/24", strict=False))] def filter_docker_networks(all_networks: List[str]) -> List[str]: """过滤掉已知的 Docker 内部网络。""" filtered_networks = [] print("--- 检测到以下网络 ---", file=sys.stderr) for net_cidr in all_networks: print(f" - {net_cidr}", file=sys.stderr) for net_cidr in all_networks: is_docker_internal = False for prefix in DOCKER_NETWORK_PREFIXES: if net_cidr.startswith(prefix): is_docker_internal = True break if not is_docker_internal: filtered_networks.append(net_cidr) else: print(f" - 排除 Docker 内部网络: {net_cidr}", file=sys.stderr) if not filtered_networks and all_networks: print(f"警告: 所有检测到的网络似乎都是 Docker 内部网络。扫描可能不会发现外部设备。", file=sys.stderr) print(f"将继续扫描所有检测到的网络: {', '.join(all_networks)}", file=sys.stderr) return all_networks # 如果过滤后为空,但原来不为空,则返回原列表 return filtered_networks def run_nmap_scan(targets: List[str]) -> str: """ 对目标网络运行 nmap 扫描。 -sV: 服务版本检测 -p [ports]: 扫描指定的 TCP 端口 -T4: 加快扫描速度 -oJ -: 将 JSON 输出到 stdout """ print(f"\n--- 开始 Nmap 扫描 (目标: {', '.join(targets)}) ---", file=sys.stderr) print(f"--- 扫描端口: {NMAP_PORT_STRING} ---", file=sys.stderr) # 需要 sudo 权限来运行 nmap # 您的 Dockerfile 已经为 'dev' 用户设置了 NOPASSWD:ALL nmap_command = [ 'sudo', 'nmap', '-oJ', '-', # 输出 JSON 到 stdout '-T4', # 更快的时间模板 '--open', # 只显示打开的端口 '-sV', # 服务版本指纹识别 # --- 移除了导致错误的 ONVIF 脚本 --- # '--script=broadcast-onvif-discover', # --- --------------------------- --- '-p', NMAP_PORT_STRING, # 扫描特定端口 ] + targets start_time = time.time() stop_progress_flag.clear() # 确保标志是清除的 progress_thread = threading.Thread(target=progress_indicator, daemon=True) progress_thread.start() try: # 设置超时,例如10分钟,以防扫描卡住 result = subprocess.run(nmap_command, capture_output=True, text=True, check=True, timeout=600) stop_progress_flag.set() progress_thread.join() # 等 print("--- Nmap 扫描完成 ---", file=sys.stderr) 待线程清理 return result.stdout except subprocess.CalledProcessError as e: print(f"错误: Nmap 执行失败 (返回码 {e.returncode}):", file=sys.stderr) print(f"Nmap Stderr: {e.stderr}", file=sys.stderr) return None except subprocess.TimeoutExpired as e: print(f"错误: Nmap 扫描超时 ({e.timeout} 秒)。", file=sys.stderr) print(f"Nmap Stdout: {e.stdout}", file=sys.stderr) print(f"Nmap Stderr: {e.stderr}", file=sys.stderr) return None except FileNotFoundError: print("错误: 'sudo' 或 'nmap' 命令未找到。请确保它在 PATH 中并且已安装。", file=sys.stderr) return None def parse_nmap_json(nmap_stdout: str) -> List[Dict[str, Any]]: """将 nmap 的 JSON 输出解析为我们想要的简洁格式。""" results = [] if not nmap_stdout: return results try: nmap_data = json.loads(nmap_stdout) except json.JSONDecodeError as e: print(f"错误: 无法解析 Nmap 的 JSON 输出。{e}", file=sys.stderr) print(f"原始输出: {nmap_stdout[:500]}...", file=sys.stderr) return results if 'hosts' not in nmap_data: print("警告: Nmap 输出中未找到 'hosts' 键。", file=sys.stderr) return results for host in nmap_data.get('hosts', []): if host.get('status', {}).get('state') != 'up': continue ip = host.get('addresses', {}).get('ipv4') mac = host.get('addresses', {}).get('mac') if not ip: continue vendor = host.get('vendor', {}).get(mac, None) if mac else None device_info = { "ip": ip, "mac": mac, "vendor": vendor, "services": [], "onvif_xaddrs": [] # 即使没有脚本,也保留此键以保持格式一致 } # 1. 解析端口和服务 for port_info in host.get('ports', []): if port_info.get('state', {}).get('state') == 'open': service_data = port_info.get('service', {}) service = { "port": int(port_info.get('portid')), "service_name": service_data.get('name'), "product": service_data.get('product'), "version": service_data.get('version'), "extra_info": service_data.get('extrainfo') } device_info["services"].append(service) # 2. 解析 ONVIF 脚本输出 (现在为空) # 'hostscript' 字段包含主机级别的脚本结果 for script in host.get('hostscript', []): if script.get('id') == 'broadcast-onvif-discover': # Nmap 脚本输出是一个字符串。我们用正则提取 XAddrs。 output = script.get('output', '') # 匹配 'http://...' 格式的 URL,直到遇到空格或换行符 xaddrs = re.findall(r'http://[^\s,]+', output) device_info["onvif_xaddrs"] = xaddrs # 只有当我们发现了开放服务时才添加该设备 if device_info["services"]: results.append(device_info) return results def main(): print("--- 启动网络摄像头发现 (基于 Nmap) ---", file=sys.stderr) # 1. 获取并过滤本地网络 all_local_networks = get_all_local_networks() if not all_local_networks: print("错误: 未检测到本地网络用于扫描。退出。", file=sys.stderr) sys.exit(1) networks_to_scan = filter_docker_networks(all_local_networks) if not networks_to_scan: print("警告: 过滤后没有可扫描的网络。退出。", file=sys.stderr) sys.exit(0) print("\n--- 将扫描以下网络 ---", file=sys.stderr) for net in networks_to_scan: print(f" - {net}", file=sys.stderr) # 2. 运行 Nmap 扫描 nmap_json_output = run_nmap_scan(networks_to_scan) if not nmap_json_output: print("错误: 未从 Nmap 收到任何输出。退出。", file=sys.stderr) sys.exit(1) # 3. 解析 Nmap JSON final_results = parse_nmap_json(nmap_json_output) # 4. 将最终结果以 JSON 格式打印到 stdout # 注意:stderr 用于打印日志,stdout 仅用于输出最终的 JSON print(json.dumps(final_results, indent=2)) if __name__ == "__main__": main()