280 lines
11 KiB
Python
280 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
|
||
import subprocess
|
||
import json
|
||
import sys
|
||
import ipaddress
|
||
import socket
|
||
import re
|
||
from typing import List, Set, Dict, Any
|
||
import time # 导入 time
|
||
import threading # 导入 threading
|
||
|
||
# 尝试导入 psutil
|
||
try:
|
||
import psutil
|
||
except ImportError:
|
||
print("错误: 'psutil' 库未找到。请在 Dockerfile 中或使用 'pip install psutil' 安装。")
|
||
sys.exit(1)
|
||
|
||
# --- 配置 ---
|
||
COMMON_CAMERA_PORTS = [
|
||
80, # HTTP (web interface)
|
||
443, # HTTPS (secure web interface)
|
||
554, # RTSP (Real Time Streaming Protocol)
|
||
8000, # Often used by Hikvision (SDK/HTTP)
|
||
8080, # Alternative HTTP/RTSP
|
||
8001, # Hikvision stream port
|
||
37777, # Dahua primary port
|
||
37778, # Dahua secondary port
|
||
8002, # Often used for camera APIs or secondary streams
|
||
]
|
||
NMAP_PORT_STRING = "T:" + ",".join(map(str, COMMON_CAMERA_PORTS))
|
||
|
||
# 用于过滤 Docker 网络的常见前缀
|
||
DOCKER_NETWORK_PREFIXES = ["172.17.", "172.18.", "172.19.", "172.20."]
|
||
|
||
# 全局标志,用于停止进度线程
|
||
stop_progress_flag = threading.Event()
|
||
start_time = time.time()
|
||
|
||
def progress_indicator():
|
||
"""在后台线程中运行,定期打印进度信息。"""
|
||
interval = 10 # 每 10 秒报告一次
|
||
count = 0
|
||
|
||
# 打印一个提示,说明进度指示器已启动
|
||
print(f"--- Nmap 扫描进行中(每 {interval} 秒更新一次)---", file=sys.stderr)
|
||
|
||
while not stop_progress_flag.is_set():
|
||
time_elapsed = time.time() - start_time
|
||
|
||
# 使用 is_set() 检查标志,并等待最多 interval 秒
|
||
stop_progress_flag.wait(interval)
|
||
|
||
if not stop_progress_flag.is_set():
|
||
count += 1
|
||
# 打印一个简单的状态信息
|
||
print(f"--- 状态更新 #{count}: 扫描已运行 {time_elapsed:.1f} 秒... ---", file=sys.stderr)
|
||
|
||
def get_local_ip() -> str:
|
||
"""获取本机的本地 IP 地址。"""
|
||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
try:
|
||
# 连接到一个虚拟地址
|
||
s.connect(('10.255.255.255', 1))
|
||
IP = s.getsockname()[0]
|
||
except Exception:
|
||
IP = '127.0.0.1'
|
||
finally:
|
||
s.close()
|
||
return IP
|
||
|
||
def get_all_local_networks() -> List[str]:
|
||
"""
|
||
使用 psutil 查找所有活动的网络接口及其关联的网络范围。
|
||
返回一个网络 CIDR 字符串列表 (例如 '192.168.1.0/24')。
|
||
"""
|
||
networks = set()
|
||
try:
|
||
for interface, snics in psutil.net_if_addrs().items():
|
||
for snic in snics:
|
||
if snic.family == socket.AF_INET: # IPv4
|
||
ip_address = snic.address
|
||
netmask = snic.netmask
|
||
if ip_address and netmask and ip_address != '127.0.0.1':
|
||
try:
|
||
# strict=False 允许主机地址
|
||
network_obj = ipaddress.IPv4Network(f"{ip_address}/{netmask}", strict=False)
|
||
networks.add(str(network_obj))
|
||
except ipaddress.AddressValueError as e:
|
||
print(f"警告: 无法解析网络 {interface}: {ip_address}, {netmask}。错误: {e}", file=sys.stderr)
|
||
return list(networks)
|
||
except Exception as e:
|
||
print(f"错误: 使用 psutil 获取网络接口时出错: {e}", file=sys.stderr)
|
||
print("回退: 仅扫描本地 IP 的 /24 子网。", file=sys.stderr)
|
||
local_ip = get_local_ip()
|
||
if local_ip == '127.0.0.1':
|
||
return []
|
||
return [str(ipaddress.IPv4Network(f"{local_ip}/24", strict=False))]
|
||
|
||
def filter_docker_networks(all_networks: List[str]) -> List[str]:
|
||
"""过滤掉已知的 Docker 内部网络。"""
|
||
filtered_networks = []
|
||
print("--- 检测到以下网络 ---", file=sys.stderr)
|
||
for net_cidr in all_networks:
|
||
print(f" - {net_cidr}", file=sys.stderr)
|
||
|
||
for net_cidr in all_networks:
|
||
is_docker_internal = False
|
||
for prefix in DOCKER_NETWORK_PREFIXES:
|
||
if net_cidr.startswith(prefix):
|
||
is_docker_internal = True
|
||
break
|
||
|
||
if not is_docker_internal:
|
||
filtered_networks.append(net_cidr)
|
||
else:
|
||
print(f" - 排除 Docker 内部网络: {net_cidr}", file=sys.stderr)
|
||
|
||
if not filtered_networks and all_networks:
|
||
print(f"警告: 所有检测到的网络似乎都是 Docker 内部网络。扫描可能不会发现外部设备。", file=sys.stderr)
|
||
print(f"将继续扫描所有检测到的网络: {', '.join(all_networks)}", file=sys.stderr)
|
||
return all_networks # 如果过滤后为空,但原来不为空,则返回原列表
|
||
|
||
return filtered_networks
|
||
|
||
def run_nmap_scan(targets: List[str]) -> str:
|
||
"""
|
||
对目标网络运行 nmap 扫描。
|
||
-sV: 服务版本检测
|
||
-p [ports]: 扫描指定的 TCP 端口
|
||
-T4: 加快扫描速度
|
||
-oJ -: 将 JSON 输出到 stdout
|
||
"""
|
||
print(f"\n--- 开始 Nmap 扫描 (目标: {', '.join(targets)}) ---", file=sys.stderr)
|
||
print(f"--- 扫描端口: {NMAP_PORT_STRING} ---", file=sys.stderr)
|
||
|
||
# 需要 sudo 权限来运行 nmap
|
||
# 您的 Dockerfile 已经为 'dev' 用户设置了 NOPASSWD:ALL
|
||
nmap_command = [
|
||
'sudo', 'nmap',
|
||
'-oJ', '-', # 输出 JSON 到 stdout
|
||
'-T4', # 更快的时间模板
|
||
'--open', # 只显示打开的端口
|
||
'-sV', # 服务版本指纹识别
|
||
# --- 移除了导致错误的 ONVIF 脚本 ---
|
||
# '--script=broadcast-onvif-discover',
|
||
# --- --------------------------- ---
|
||
'-p', NMAP_PORT_STRING, # 扫描特定端口
|
||
] + targets
|
||
|
||
start_time = time.time()
|
||
stop_progress_flag.clear() # 确保标志是清除的
|
||
progress_thread = threading.Thread(target=progress_indicator, daemon=True)
|
||
progress_thread.start()
|
||
try:
|
||
# 设置超时,例如10分钟,以防扫描卡住
|
||
result = subprocess.run(nmap_command,
|
||
capture_output=True,
|
||
text=True,
|
||
check=True,
|
||
timeout=600)
|
||
stop_progress_flag.set()
|
||
progress_thread.join() # 等
|
||
print("--- Nmap 扫描完成 ---", file=sys.stderr)
|
||
待线程清理
|
||
return result.stdout
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"错误: Nmap 执行失败 (返回码 {e.returncode}):", file=sys.stderr)
|
||
print(f"Nmap Stderr: {e.stderr}", file=sys.stderr)
|
||
return None
|
||
except subprocess.TimeoutExpired as e:
|
||
print(f"错误: Nmap 扫描超时 ({e.timeout} 秒)。", file=sys.stderr)
|
||
print(f"Nmap Stdout: {e.stdout}", file=sys.stderr)
|
||
print(f"Nmap Stderr: {e.stderr}", file=sys.stderr)
|
||
return None
|
||
except FileNotFoundError:
|
||
print("错误: 'sudo' 或 'nmap' 命令未找到。请确保它在 PATH 中并且已安装。", file=sys.stderr)
|
||
return None
|
||
|
||
def parse_nmap_json(nmap_stdout: str) -> List[Dict[str, Any]]:
|
||
"""将 nmap 的 JSON 输出解析为我们想要的简洁格式。"""
|
||
results = []
|
||
if not nmap_stdout:
|
||
return results
|
||
|
||
try:
|
||
nmap_data = json.loads(nmap_stdout)
|
||
except json.JSONDecodeError as e:
|
||
print(f"错误: 无法解析 Nmap 的 JSON 输出。{e}", file=sys.stderr)
|
||
print(f"原始输出: {nmap_stdout[:500]}...", file=sys.stderr)
|
||
return results
|
||
|
||
if 'hosts' not in nmap_data:
|
||
print("警告: Nmap 输出中未找到 'hosts' 键。", file=sys.stderr)
|
||
return results
|
||
|
||
for host in nmap_data.get('hosts', []):
|
||
if host.get('status', {}).get('state') != 'up':
|
||
continue
|
||
|
||
ip = host.get('addresses', {}).get('ipv4')
|
||
mac = host.get('addresses', {}).get('mac')
|
||
if not ip:
|
||
continue
|
||
|
||
vendor = host.get('vendor', {}).get(mac, None) if mac else None
|
||
|
||
device_info = {
|
||
"ip": ip,
|
||
"mac": mac,
|
||
"vendor": vendor,
|
||
"services": [],
|
||
"onvif_xaddrs": [] # 即使没有脚本,也保留此键以保持格式一致
|
||
}
|
||
|
||
# 1. 解析端口和服务
|
||
for port_info in host.get('ports', []):
|
||
if port_info.get('state', {}).get('state') == 'open':
|
||
service_data = port_info.get('service', {})
|
||
service = {
|
||
"port": int(port_info.get('portid')),
|
||
"service_name": service_data.get('name'),
|
||
"product": service_data.get('product'),
|
||
"version": service_data.get('version'),
|
||
"extra_info": service_data.get('extrainfo')
|
||
}
|
||
device_info["services"].append(service)
|
||
|
||
# 2. 解析 ONVIF 脚本输出 (现在为空)
|
||
# 'hostscript' 字段包含主机级别的脚本结果
|
||
for script in host.get('hostscript', []):
|
||
if script.get('id') == 'broadcast-onvif-discover':
|
||
# Nmap 脚本输出是一个字符串。我们用正则提取 XAddrs。
|
||
output = script.get('output', '')
|
||
# 匹配 'http://...' 格式的 URL,直到遇到空格或换行符
|
||
xaddrs = re.findall(r'http://[^\s,]+', output)
|
||
device_info["onvif_xaddrs"] = xaddrs
|
||
|
||
# 只有当我们发现了开放服务时才添加该设备
|
||
if device_info["services"]:
|
||
results.append(device_info)
|
||
|
||
return results
|
||
|
||
def main():
|
||
print("--- 启动网络摄像头发现 (基于 Nmap) ---", file=sys.stderr)
|
||
|
||
# 1. 获取并过滤本地网络
|
||
all_local_networks = get_all_local_networks()
|
||
if not all_local_networks:
|
||
print("错误: 未检测到本地网络用于扫描。退出。", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
networks_to_scan = filter_docker_networks(all_local_networks)
|
||
|
||
if not networks_to_scan:
|
||
print("警告: 过滤后没有可扫描的网络。退出。", file=sys.stderr)
|
||
sys.exit(0)
|
||
|
||
print("\n--- 将扫描以下网络 ---", file=sys.stderr)
|
||
for net in networks_to_scan:
|
||
print(f" - {net}", file=sys.stderr)
|
||
|
||
# 2. 运行 Nmap 扫描
|
||
nmap_json_output = run_nmap_scan(networks_to_scan)
|
||
|
||
if not nmap_json_output:
|
||
print("错误: 未从 Nmap 收到任何输出。退出。", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
# 3. 解析 Nmap JSON
|
||
final_results = parse_nmap_json(nmap_json_output)
|
||
|
||
# 4. 将最终结果以 JSON 格式打印到 stdout
|
||
# 注意:stderr 用于打印日志,stdout 仅用于输出最终的 JSON
|
||
print(json.dumps(final_results, indent=2))
|
||
|
||
if __name__ == "__main__":
|
||
main() |