bonus-edge-proxy/python-work/find_cameras.py

310 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import ipaddress
import socket
import subprocess
import threading
import concurrent.futures
from typing import List, Set, Union, Dict
import time
import os
from urllib.parse import urlparse
from collections import defaultdict # 导入 defaultdict 以简化camera_lock下的操作
# --- Global Variables for thread-safe access ---
active_ips: Set[str] = set()
found_cameras: Dict[str, Dict[str, str]] = defaultdict(dict) # 使用defaultdict简化内层字典的初始化
ip_lock = threading.Lock()
camera_lock = threading.Lock()
# --- Configuration ---
MAX_WORKERS = 100 # Number of concurrent threads for scanning
COMMON_CAMERA_PORTS = [
80, # HTTP (web interface)
443, # HTTPS (secure web interface)
554, # RTSP (Real Time Streaming Protocol)
8000, # Often used by Hikvision (SDK/HTTP)
8080, # Alternative HTTP/RTSP
8001, # Hikvision stream port
37777, # Dahua primary port
37778, # Dahua secondary port
8002, # Often used for camera APIs or secondary streams
# Add more ports if you know specific ones for your camera brands
]
SSH_PORTS = [22] # Potential SSH access for some cameras
# --- Imports for ONVIF Discovery ---
ONVIF_AVAILABLE = False
try:
import psutil # For getting all network interfaces
import onvif # The package 'onvif-zeep' installs the 'onvif' module
# Attempt to import specific discovery module first, as it's the intended way.
# If this fails, the ONVIF_AVAILABLE flag will be set to False.
try:
from onvif import discovery
_discovery_method = discovery.find_device
print("ONVIF: Using onvif.discovery.find_device for discovery.")
except (ImportError, AttributeError):
# Fallback: check if the top-level 'onvif' module has a discover method
if hasattr(onvif, 'discover') and callable(onvif.discover):
_discovery_method = onvif.discover
print("ONVIF: Using top-level onvif.discover() for discovery.")
else:
_discovery_method = None
print("ONVIF: No suitable ONVIF discovery method found in 'onvif' module.")
raise ImportError("No ONVIF discovery method.") # Force into the outer except block
ONVIF_AVAILABLE = True
except ImportError as e:
print(f"Warning: Required libraries for full ONVIF functionality could not be imported.")
print(f" Error: {e}")
print(f" Please ensure 'psutil' and 'onvif-zeep' are installed:")
print(f" pip install psutil onvif-zeep")
print(f" ONVIF discovery will be skipped.")
# If psutil is not available, we can still do single network scanning later
try:
import psutil
except ImportError:
psutil = None # Mark psutil as not available
_discovery_method = None # Ensure it's None if ONVIF is unavailable
# --- Imports for SSH service detection ---
PARAMIKO_AVAILABLE = False
try:
import paramiko
PARAMIKO_AVAILABLE = True
except ImportError:
print("Warning: 'paramiko' not installed. SSH service detection will be skipped. "
"Install with 'pip install paramiko' for full functionality.")
def get_local_ip() -> str:
"""Gets the local IP address of the machine."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# Connect to a dummy address. Doesn't actually send data.
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
def get_all_local_networks() -> List[str]:
"""
Uses psutil to find all active network interfaces and their associated network ranges.
Returns a list of network CIDR strings (e.g., '192.168.1.0/24').
"""
if not psutil:
print("Warning: 'psutil' not available. Only scanning the /24 subnet of the local IP address.")
local_ip = get_local_ip()
if local_ip == '127.0.0.1':
return [] # Cannot determine external network from localhost
return [str(ipaddress.IPv4Network(f"{local_ip}/24", strict=False))]
networks = set()
try:
for interface, snics in psutil.net_if_addrs().items():
for snic in snics:
if snic.family == socket.AF_INET: # IPv4 address
ip_address = snic.address
netmask = snic.netmask
if ip_address and netmask and ip_address != '127.0.0.1':
try:
# Calculate the network address using the IP and netmask
# ipaddress module can handle this directly from address and netmask
network_obj = ipaddress.IPv4Network(f"{ip_address}/{netmask}", strict=False)
networks.add(str(network_obj))
except ipaddress.AddressValueError as e:
print(f"Warning: Could not parse IP address or netmask for {interface}: {ip_address}, {netmask}. Error: {e}")
return list(networks)
except Exception as e:
print(f"Error getting network interfaces with psutil: {e}")
print("Falling back to scanning only the /24 subnet of the local IP address.")
local_ip = get_local_ip()
if local_ip == '127.0.0.1':
return []
return [str(ipaddress.IPv4Network(f"{local_ip}/24", strict=False))]
def onvif_discovery_task() -> None:
"""Performs ONVIF WS-Discovery to find compatible devices."""
global ONVIF_AVAILABLE, _discovery_method
if not ONVIF_AVAILABLE or _discovery_method is None:
print("ONVIF: Skipping discovery due to 'psutil' or 'onvif-zeep' not being available or no suitable discovery method.")
return
print("ONVIF: Starting discovery. This may take a few seconds...")
try:
# Use the determined discovery method
discovered_device_xaddrs: List[str] = _discovery_method(timeout=5)
# Ensure raw_xaddrs is a list before iteration
if not isinstance(discovered_device_xaddrs, list):
discovered_device_xaddrs = [discovered_device_xaddrs] if discovered_device_xaddrs else []
discovered_ips_via_onvif = []
for xaddr in discovered_device_xaddrs:
try:
# Extract IP from the XAddr URL
parsed_url = urlparse(xaddr)
device_ip = parsed_url.hostname
if device_ip and device_ip not in discovered_ips_via_onvif:
discovered_ips_via_onvif.append(device_ip)
except Exception as url_e:
print(f"ONVIF: Warning: Could not parse IP from device XAddr '{xaddr}': {url_e}")
if discovered_ips_via_onvif:
print(f"ONVIF: Found {len(discovered_ips_via_onvif)} potential ONVIF devices via WS-Discovery.")
for device_ip in discovered_ips_via_onvif:
with ip_lock:
active_ips.add(device_ip)
with camera_lock:
found_cameras[device_ip]['ONVIF_Discovery'] = "ONVIF Device (WS-Discovery)"
else:
print("ONVIF: No ONVIF devices found via WS-Discovery.")
except Exception as e:
print(f"ONVIF: Error during ONVIF discovery (using _discovery_method): {e}")
def check_socket(ip: str, port: int, timeout: float = 0.5) -> bool:
"""Attempts to connect to a specific port on an IP address."""
try:
with socket.create_connection((ip, port), timeout) as sock:
sock.shutdown(socket.SHUT_RDWR) # Gracefully close connection
return True
except (socket.timeout, ConnectionRefusedError, OSError):
return False
except Exception as e:
# print(f"Error checking {ip}:{port}: {e}") # Uncomment for debugging
return False
def check_ssh_banner(ip: str, port: int, timeout: float = 0.5) -> Union[str, bool]:
"""Attempts to get SSH banner to verify SSH service."""
if not PARAMIKO_AVAILABLE:
return False
try:
transport = paramiko.Transport((ip, port))
transport.connect(timeout=timeout)
banner = transport.get_banner()
transport.close()
return banner.strip()
except (paramiko.SSHException, socket.error, socket.timeout):
return False
except Exception as e:
# print(f"Error getting SSH banner from {ip}:{port}: {e}") # Uncomment for debugging
return False
def service_scan_task(ip: str) -> None:
"""Scans common camera ports and SSH ports on a given IP and updates found_cameras."""
# Try ONVIF specific ports (80, 554, 8000, 8080) for detailed service if available
for port in COMMON_CAMERA_PORTS:
if check_socket(ip, port):
with camera_lock:
if port == 80:
found_cameras[ip]['HTTP'] = f"Open on port {port}"
elif port == 443:
found_cameras[ip]['HTTPS'] = f"Open on port {port}"
elif port == 554:
found_cameras[ip]['RTSP'] = f"Open on port {port}"
elif port == 8000 or port == 8001: # Common for Hikvision
found_cameras[ip]['Hikvision_Service'] = f"Open on port {port}"
elif port == 37777 or port == 37778: # Common for Dahua
found_cameras[ip]['Dahua_Service'] = f"Open on port {port}"
else:
found_cameras[ip][f'TCP_{port}'] = f"Open on port {port}"
for port in SSH_PORTS:
banner = check_ssh_banner(ip, port)
if banner:
with camera_lock:
found_cameras[ip]['SSH'] = f"Open on port {port} (Banner: {banner})"
def main():
start_time = time.time()
print("--- Starting Network Camera Discovery on RK3588 ---")
# Get local IP address (for display)
local_ip = get_local_ip()
print(f"Local IP Address: {local_ip}")
# Get all local networks for scanning
all_local_networks = get_all_local_networks()
if not all_local_networks:
print("No local networks detected for scanning. Exiting.")
return
print("Detected local networks for scanning:")
for net in all_local_networks:
print(f" - {net}")
# --- EXCLUDE Docker internal networks ---
# If running inside a Docker container, often these are internal.
# Adjust these prefixes based on your Docker network configuration if different.
all_local_networks_filtered = []
DOCKER_NETWORK_PREFIXES = ["172.17.", "172.18.", "172.19.", "172.20."] # Add more if your Docker uses other 172.x ranges
for net_cidr in all_local_networks:
is_docker_internal = False
for prefix in DOCKER_NETWORK_PREFIXES:
if net_cidr.startswith(prefix):
is_docker_internal = True
break
if not is_docker_internal:
all_local_networks_filtered.append(net_cidr)
else:
print(f" - Excluding Docker internal network: {net_cidr}")
if not all_local_networks_filtered:
print("No external local networks found for scanning after filtering Docker networks. Exiting.")
return
print("\nFiltered local networks for scanning:")
for net in all_local_networks_filtered:
print(f" - {net}")
# --- ONVIF Discovery (runs independently) ---
onvif_discovery_task()
# --- Prepare IPs for Service Scan (without relying on 'ping') ---
all_ips_for_service_scan = set()
# Use the filtered list of networks
for network_str in all_local_networks_filtered:
try:
network = ipaddress.IPv4Network(network_str, strict=False)
for ip_obj in network.hosts():
all_ips_for_service_scan.add(str(ip_obj))
except Exception as e:
print(f"Error processing network {network_str}: {e}")
continue
total_ips_for_service_scan = len(all_ips_for_service_scan)
print(f"Proceeding directly to service scan of {total_ips_for_service_scan} IPs on common camera ports.")
# --- Service Scan for common camera ports ---
if total_ips_for_service_scan > 0:
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# Map the service scan task to all IPs identified by local networks
executor.map(service_scan_task, list(all_ips_for_service_scan))
# Wait for service scan to complete using concurrent.futures
# (executor.map is blocking here, which is fine)
# --- Discovery Results ---
print("\n--- Discovery Results ---")
if found_cameras:
print(f"Found {len(found_cameras)} potential camera devices and services:")
for ip, services in found_cameras.items():
print(f" IP: {ip}")
# Sort services for consistent output
for service, details in sorted(services.items()):
print(f" - {service}: {details}")
else:
print("No network cameras or detectable services found based on ONVIF or port scanning.")
print(f"\n--- Discovery Finished in {time.time() - start_time:.2f} seconds ---")
if __name__ == "__main__":
main()