bonus-edge-proxy/python-work/new_find_camera.py

280 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import subprocess
import json
import sys
import ipaddress
import socket
import re
from typing import List, Set, Dict, Any
import time # 导入 time
import threading # 导入 threading
# 尝试导入 psutil
try:
import psutil
except ImportError:
print("错误: 'psutil' 库未找到。请在 Dockerfile 中或使用 'pip install psutil' 安装。")
sys.exit(1)
# --- 配置 ---
COMMON_CAMERA_PORTS = [
80, # HTTP (web interface)
443, # HTTPS (secure web interface)
554, # RTSP (Real Time Streaming Protocol)
8000, # Often used by Hikvision (SDK/HTTP)
8080, # Alternative HTTP/RTSP
8001, # Hikvision stream port
37777, # Dahua primary port
37778, # Dahua secondary port
8002, # Often used for camera APIs or secondary streams
]
NMAP_PORT_STRING = "T:" + ",".join(map(str, COMMON_CAMERA_PORTS))
# 用于过滤 Docker 网络的常见前缀
DOCKER_NETWORK_PREFIXES = ["172.17.", "172.18.", "172.19.", "172.20."]
# 全局标志,用于停止进度线程
stop_progress_flag = threading.Event()
start_time = time.time()
def progress_indicator():
"""在后台线程中运行,定期打印进度信息。"""
interval = 10 # 每 10 秒报告一次
count = 0
# 打印一个提示,说明进度指示器已启动
print(f"--- Nmap 扫描进行中(每 {interval} 秒更新一次)---", file=sys.stderr)
while not stop_progress_flag.is_set():
time_elapsed = time.time() - start_time
# 使用 is_set() 检查标志,并等待最多 interval 秒
stop_progress_flag.wait(interval)
if not stop_progress_flag.is_set():
count += 1
# 打印一个简单的状态信息
print(f"--- 状态更新 #{count}: 扫描已运行 {time_elapsed:.1f} 秒... ---", file=sys.stderr)
def get_local_ip() -> str:
"""获取本机的本地 IP 地址。"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# 连接到一个虚拟地址
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
def get_all_local_networks() -> List[str]:
"""
使用 psutil 查找所有活动的网络接口及其关联的网络范围。
返回一个网络 CIDR 字符串列表 (例如 '192.168.1.0/24')。
"""
networks = set()
try:
for interface, snics in psutil.net_if_addrs().items():
for snic in snics:
if snic.family == socket.AF_INET: # IPv4
ip_address = snic.address
netmask = snic.netmask
if ip_address and netmask and ip_address != '127.0.0.1':
try:
# strict=False 允许主机地址
network_obj = ipaddress.IPv4Network(f"{ip_address}/{netmask}", strict=False)
networks.add(str(network_obj))
except ipaddress.AddressValueError as e:
print(f"警告: 无法解析网络 {interface}: {ip_address}, {netmask}。错误: {e}", file=sys.stderr)
return list(networks)
except Exception as e:
print(f"错误: 使用 psutil 获取网络接口时出错: {e}", file=sys.stderr)
print("回退: 仅扫描本地 IP 的 /24 子网。", file=sys.stderr)
local_ip = get_local_ip()
if local_ip == '127.0.0.1':
return []
return [str(ipaddress.IPv4Network(f"{local_ip}/24", strict=False))]
def filter_docker_networks(all_networks: List[str]) -> List[str]:
"""过滤掉已知的 Docker 内部网络。"""
filtered_networks = []
print("--- 检测到以下网络 ---", file=sys.stderr)
for net_cidr in all_networks:
print(f" - {net_cidr}", file=sys.stderr)
for net_cidr in all_networks:
is_docker_internal = False
for prefix in DOCKER_NETWORK_PREFIXES:
if net_cidr.startswith(prefix):
is_docker_internal = True
break
if not is_docker_internal:
filtered_networks.append(net_cidr)
else:
print(f" - 排除 Docker 内部网络: {net_cidr}", file=sys.stderr)
if not filtered_networks and all_networks:
print(f"警告: 所有检测到的网络似乎都是 Docker 内部网络。扫描可能不会发现外部设备。", file=sys.stderr)
print(f"将继续扫描所有检测到的网络: {', '.join(all_networks)}", file=sys.stderr)
return all_networks # 如果过滤后为空,但原来不为空,则返回原列表
return filtered_networks
def run_nmap_scan(targets: List[str]) -> str:
"""
对目标网络运行 nmap 扫描。
-sV: 服务版本检测
-p [ports]: 扫描指定的 TCP 端口
-T4: 加快扫描速度
-oJ -: 将 JSON 输出到 stdout
"""
print(f"\n--- 开始 Nmap 扫描 (目标: {', '.join(targets)}) ---", file=sys.stderr)
print(f"--- 扫描端口: {NMAP_PORT_STRING} ---", file=sys.stderr)
# 需要 sudo 权限来运行 nmap
# 您的 Dockerfile 已经为 'dev' 用户设置了 NOPASSWD:ALL
nmap_command = [
'sudo', 'nmap',
'-oJ', '-', # 输出 JSON 到 stdout
'-T4', # 更快的时间模板
'--open', # 只显示打开的端口
'-sV', # 服务版本指纹识别
# --- 移除了导致错误的 ONVIF 脚本 ---
# '--script=broadcast-onvif-discover',
# --- --------------------------- ---
'-p', NMAP_PORT_STRING, # 扫描特定端口
] + targets
start_time = time.time()
stop_progress_flag.clear() # 确保标志是清除的
progress_thread = threading.Thread(target=progress_indicator, daemon=True)
progress_thread.start()
try:
# 设置超时例如10分钟以防扫描卡住
result = subprocess.run(nmap_command,
capture_output=True,
text=True,
check=True,
timeout=600)
stop_progress_flag.set()
progress_thread.join() # 等
print("--- Nmap 扫描完成 ---", file=sys.stderr)
待线程清理
return result.stdout
except subprocess.CalledProcessError as e:
print(f"错误: Nmap 执行失败 (返回码 {e.returncode}):", file=sys.stderr)
print(f"Nmap Stderr: {e.stderr}", file=sys.stderr)
return None
except subprocess.TimeoutExpired as e:
print(f"错误: Nmap 扫描超时 ({e.timeout} 秒)。", file=sys.stderr)
print(f"Nmap Stdout: {e.stdout}", file=sys.stderr)
print(f"Nmap Stderr: {e.stderr}", file=sys.stderr)
return None
except FileNotFoundError:
print("错误: 'sudo''nmap' 命令未找到。请确保它在 PATH 中并且已安装。", file=sys.stderr)
return None
def parse_nmap_json(nmap_stdout: str) -> List[Dict[str, Any]]:
"""将 nmap 的 JSON 输出解析为我们想要的简洁格式。"""
results = []
if not nmap_stdout:
return results
try:
nmap_data = json.loads(nmap_stdout)
except json.JSONDecodeError as e:
print(f"错误: 无法解析 Nmap 的 JSON 输出。{e}", file=sys.stderr)
print(f"原始输出: {nmap_stdout[:500]}...", file=sys.stderr)
return results
if 'hosts' not in nmap_data:
print("警告: Nmap 输出中未找到 'hosts' 键。", file=sys.stderr)
return results
for host in nmap_data.get('hosts', []):
if host.get('status', {}).get('state') != 'up':
continue
ip = host.get('addresses', {}).get('ipv4')
mac = host.get('addresses', {}).get('mac')
if not ip:
continue
vendor = host.get('vendor', {}).get(mac, None) if mac else None
device_info = {
"ip": ip,
"mac": mac,
"vendor": vendor,
"services": [],
"onvif_xaddrs": [] # 即使没有脚本,也保留此键以保持格式一致
}
# 1. 解析端口和服务
for port_info in host.get('ports', []):
if port_info.get('state', {}).get('state') == 'open':
service_data = port_info.get('service', {})
service = {
"port": int(port_info.get('portid')),
"service_name": service_data.get('name'),
"product": service_data.get('product'),
"version": service_data.get('version'),
"extra_info": service_data.get('extrainfo')
}
device_info["services"].append(service)
# 2. 解析 ONVIF 脚本输出 (现在为空)
# 'hostscript' 字段包含主机级别的脚本结果
for script in host.get('hostscript', []):
if script.get('id') == 'broadcast-onvif-discover':
# Nmap 脚本输出是一个字符串。我们用正则提取 XAddrs。
output = script.get('output', '')
# 匹配 'http://...' 格式的 URL直到遇到空格或换行符
xaddrs = re.findall(r'http://[^\s,]+', output)
device_info["onvif_xaddrs"] = xaddrs
# 只有当我们发现了开放服务时才添加该设备
if device_info["services"]:
results.append(device_info)
return results
def main():
print("--- 启动网络摄像头发现 (基于 Nmap) ---", file=sys.stderr)
# 1. 获取并过滤本地网络
all_local_networks = get_all_local_networks()
if not all_local_networks:
print("错误: 未检测到本地网络用于扫描。退出。", file=sys.stderr)
sys.exit(1)
networks_to_scan = filter_docker_networks(all_local_networks)
if not networks_to_scan:
print("警告: 过滤后没有可扫描的网络。退出。", file=sys.stderr)
sys.exit(0)
print("\n--- 将扫描以下网络 ---", file=sys.stderr)
for net in networks_to_scan:
print(f" - {net}", file=sys.stderr)
# 2. 运行 Nmap 扫描
nmap_json_output = run_nmap_scan(networks_to_scan)
if not nmap_json_output:
print("错误: 未从 Nmap 收到任何输出。退出。", file=sys.stderr)
sys.exit(1)
# 3. 解析 Nmap JSON
final_results = parse_nmap_json(nmap_json_output)
# 4. 将最终结果以 JSON 格式打印到 stdout
# 注意stderr 用于打印日志stdout 仅用于输出最终的 JSON
print(json.dumps(final_results, indent=2))
if __name__ == "__main__":
main()