bonus-edge-proxy/python-work/new_find_camera.py

280 lines
11 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import subprocess
import json
import sys
import ipaddress
import socket
import re
from typing import List, Set, Dict, Any
import time # 导入 time
import threading # 导入 threading
# 尝试导入 psutil
try:
import psutil
except ImportError:
print("错误: 'psutil' 库未找到。请在 Dockerfile 中或使用 'pip install psutil' 安装。")
sys.exit(1)
# --- 配置 ---
COMMON_CAMERA_PORTS = [
80, # HTTP (web interface)
443, # HTTPS (secure web interface)
554, # RTSP (Real Time Streaming Protocol)
8000, # Often used by Hikvision (SDK/HTTP)
8080, # Alternative HTTP/RTSP
8001, # Hikvision stream port
37777, # Dahua primary port
37778, # Dahua secondary port
8002, # Often used for camera APIs or secondary streams
]
NMAP_PORT_STRING = "T:" + ",".join(map(str, COMMON_CAMERA_PORTS))
# 用于过滤 Docker 网络的常见前缀
DOCKER_NETWORK_PREFIXES = ["172.17.", "172.18.", "172.19.", "172.20."]
# 全局标志,用于停止进度线程
stop_progress_flag = threading.Event()
start_time = time.time()
def progress_indicator():
"""在后台线程中运行,定期打印进度信息。"""
interval = 10 # 每 10 秒报告一次
count = 0
# 打印一个提示,说明进度指示器已启动
print(f"--- Nmap 扫描进行中(每 {interval} 秒更新一次)---", file=sys.stderr)
while not stop_progress_flag.is_set():
time_elapsed = time.time() - start_time
# 使用 is_set() 检查标志,并等待最多 interval 秒
stop_progress_flag.wait(interval)
if not stop_progress_flag.is_set():
count += 1
# 打印一个简单的状态信息
print(f"--- 状态更新 #{count}: 扫描已运行 {time_elapsed:.1f} 秒... ---", file=sys.stderr)
def get_local_ip() -> str:
"""获取本机的本地 IP 地址。"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# 连接到一个虚拟地址
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
def get_all_local_networks() -> List[str]:
"""
使用 psutil 查找所有活动的网络接口及其关联的网络范围
返回一个网络 CIDR 字符串列表 (例如 '192.168.1.0/24')
"""
networks = set()
try:
for interface, snics in psutil.net_if_addrs().items():
for snic in snics:
if snic.family == socket.AF_INET: # IPv4
ip_address = snic.address
netmask = snic.netmask
if ip_address and netmask and ip_address != '127.0.0.1':
try:
# strict=False 允许主机地址
network_obj = ipaddress.IPv4Network(f"{ip_address}/{netmask}", strict=False)
networks.add(str(network_obj))
except ipaddress.AddressValueError as e:
print(f"警告: 无法解析网络 {interface}: {ip_address}, {netmask}。错误: {e}", file=sys.stderr)
return list(networks)
except Exception as e:
print(f"错误: 使用 psutil 获取网络接口时出错: {e}", file=sys.stderr)
print("回退: 仅扫描本地 IP 的 /24 子网。", file=sys.stderr)
local_ip = get_local_ip()
if local_ip == '127.0.0.1':
return []
return [str(ipaddress.IPv4Network(f"{local_ip}/24", strict=False))]
def filter_docker_networks(all_networks: List[str]) -> List[str]:
"""过滤掉已知的 Docker 内部网络。"""
filtered_networks = []
print("--- 检测到以下网络 ---", file=sys.stderr)
for net_cidr in all_networks:
print(f" - {net_cidr}", file=sys.stderr)
for net_cidr in all_networks:
is_docker_internal = False
for prefix in DOCKER_NETWORK_PREFIXES:
if net_cidr.startswith(prefix):
is_docker_internal = True
break
if not is_docker_internal:
filtered_networks.append(net_cidr)
else:
print(f" - 排除 Docker 内部网络: {net_cidr}", file=sys.stderr)
if not filtered_networks and all_networks:
print(f"警告: 所有检测到的网络似乎都是 Docker 内部网络。扫描可能不会发现外部设备。", file=sys.stderr)
print(f"将继续扫描所有检测到的网络: {', '.join(all_networks)}", file=sys.stderr)
return all_networks # 如果过滤后为空,但原来不为空,则返回原列表
return filtered_networks
def run_nmap_scan(targets: List[str]) -> str:
"""
对目标网络运行 nmap 扫描
-sV: 服务版本检测
-p [ports]: 扫描指定的 TCP 端口
-T4: 加快扫描速度
-oJ -: JSON 输出到 stdout
"""
print(f"\n--- 开始 Nmap 扫描 (目标: {', '.join(targets)}) ---", file=sys.stderr)
print(f"--- 扫描端口: {NMAP_PORT_STRING} ---", file=sys.stderr)
# 需要 sudo 权限来运行 nmap
# 您的 Dockerfile 已经为 'dev' 用户设置了 NOPASSWD:ALL
nmap_command = [
'sudo', 'nmap',
'-oJ', '-', # 输出 JSON 到 stdout
'-T4', # 更快的时间模板
'--open', # 只显示打开的端口
'-sV', # 服务版本指纹识别
# --- 移除了导致错误的 ONVIF 脚本 ---
# '--script=broadcast-onvif-discover',
# --- --------------------------- ---
'-p', NMAP_PORT_STRING, # 扫描特定端口
] + targets
start_time = time.time()
stop_progress_flag.clear() # 确保标志是清除的
progress_thread = threading.Thread(target=progress_indicator, daemon=True)
progress_thread.start()
try:
# 设置超时例如10分钟以防扫描卡住
result = subprocess.run(nmap_command,
capture_output=True,
text=True,
check=True,
timeout=600)
stop_progress_flag.set()
progress_thread.join() # 等
print("--- Nmap 扫描完成 ---", file=sys.stderr)
待线程清理
return result.stdout
except subprocess.CalledProcessError as e:
print(f"错误: Nmap 执行失败 (返回码 {e.returncode}):", file=sys.stderr)
print(f"Nmap Stderr: {e.stderr}", file=sys.stderr)
return None
except subprocess.TimeoutExpired as e:
print(f"错误: Nmap 扫描超时 ({e.timeout} 秒)。", file=sys.stderr)
print(f"Nmap Stdout: {e.stdout}", file=sys.stderr)
print(f"Nmap Stderr: {e.stderr}", file=sys.stderr)
return None
except FileNotFoundError:
print("错误: 'sudo''nmap' 命令未找到。请确保它在 PATH 中并且已安装。", file=sys.stderr)
return None
def parse_nmap_json(nmap_stdout: str) -> List[Dict[str, Any]]:
"""将 nmap 的 JSON 输出解析为我们想要的简洁格式。"""
results = []
if not nmap_stdout:
return results
try:
nmap_data = json.loads(nmap_stdout)
except json.JSONDecodeError as e:
print(f"错误: 无法解析 Nmap 的 JSON 输出。{e}", file=sys.stderr)
print(f"原始输出: {nmap_stdout[:500]}...", file=sys.stderr)
return results
if 'hosts' not in nmap_data:
print("警告: Nmap 输出中未找到 'hosts' 键。", file=sys.stderr)
return results
for host in nmap_data.get('hosts', []):
if host.get('status', {}).get('state') != 'up':
continue
ip = host.get('addresses', {}).get('ipv4')
mac = host.get('addresses', {}).get('mac')
if not ip:
continue
vendor = host.get('vendor', {}).get(mac, None) if mac else None
device_info = {
"ip": ip,
"mac": mac,
"vendor": vendor,
"services": [],
"onvif_xaddrs": [] # 即使没有脚本,也保留此键以保持格式一致
}
# 1. 解析端口和服务
for port_info in host.get('ports', []):
if port_info.get('state', {}).get('state') == 'open':
service_data = port_info.get('service', {})
service = {
"port": int(port_info.get('portid')),
"service_name": service_data.get('name'),
"product": service_data.get('product'),
"version": service_data.get('version'),
"extra_info": service_data.get('extrainfo')
}
device_info["services"].append(service)
# 2. 解析 ONVIF 脚本输出 (现在为空)
# 'hostscript' 字段包含主机级别的脚本结果
for script in host.get('hostscript', []):
if script.get('id') == 'broadcast-onvif-discover':
# Nmap 脚本输出是一个字符串。我们用正则提取 XAddrs。
output = script.get('output', '')
# 匹配 'http://...' 格式的 URL直到遇到空格或换行符
xaddrs = re.findall(r'http://[^\s,]+', output)
device_info["onvif_xaddrs"] = xaddrs
# 只有当我们发现了开放服务时才添加该设备
if device_info["services"]:
results.append(device_info)
return results
def main():
print("--- 启动网络摄像头发现 (基于 Nmap) ---", file=sys.stderr)
# 1. 获取并过滤本地网络
all_local_networks = get_all_local_networks()
if not all_local_networks:
print("错误: 未检测到本地网络用于扫描。退出。", file=sys.stderr)
sys.exit(1)
networks_to_scan = filter_docker_networks(all_local_networks)
if not networks_to_scan:
print("警告: 过滤后没有可扫描的网络。退出。", file=sys.stderr)
sys.exit(0)
print("\n--- 将扫描以下网络 ---", file=sys.stderr)
for net in networks_to_scan:
print(f" - {net}", file=sys.stderr)
# 2. 运行 Nmap 扫描
nmap_json_output = run_nmap_scan(networks_to_scan)
if not nmap_json_output:
print("错误: 未从 Nmap 收到任何输出。退出。", file=sys.stderr)
sys.exit(1)
# 3. 解析 Nmap JSON
final_results = parse_nmap_json(nmap_json_output)
# 4. 将最终结果以 JSON 格式打印到 stdout
# 注意stderr 用于打印日志stdout 仅用于输出最终的 JSON
print(json.dumps(final_results, indent=2))
if __name__ == "__main__":
main()