From 2885b69f4b5f2696da86738e40881f3fc30c03a8 Mon Sep 17 00:00:00 2001 From: GuanYuankai Date: Mon, 20 Oct 2025 10:30:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0onvif=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker/Dockerfile | 2 + src/python-work/find_cameras.py | 222 ++++++++++++++++++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 src/python-work/find_cameras.py diff --git a/docker/Dockerfile b/docker/Dockerfile index 94c9a5c..2c8afb1 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -66,6 +66,8 @@ RUN apt-get update && \ libegl1-mesa-dev \ libgles2-mesa-dev \ && \ + pip3 install --no-cache-dir onvif_zeep && \ + \ # 用户和组创建 groupadd -r developers && \ useradd -ms /bin/bash -g developers -G sudo dev && \ diff --git a/src/python-work/find_cameras.py b/src/python-work/find_cameras.py new file mode 100644 index 0000000..e93a7b8 --- /dev/null +++ b/src/python-work/find_cameras.py @@ -0,0 +1,222 @@ +import ipaddress +import socket +import threading +import concurrent.futures +import time +import re +from typing import List, Set, Dict, Tuple +from urllib.parse import urlparse + +# ONVIF related imports (install with: pip install onvif_zeep) +try: + from onvif_zeep import ONVIFCamera + from zeep import xsd # Needed for some internal ONVIF type definitions + ONVIF_AVAILABLE = True +except ImportError: + print("Warning: 'onvif_zeep' library not found. ONVIF discovery will be skipped.") + print(" To enable ONVIF discovery, install it: pip install onvif_zeep") + ONVIF_AVAILABLE = False + + +# --- Global Data and Locks --- +active_ips: Set[str] = set() +found_cameras: Dict[str, Set[int]] = {} # IP -> {ports} + +ip_lock = threading.Lock() +camera_lock = threading.Lock() + +# Common camera ports to scan +COMMON_CAMERA_PORTS: List[int] = [80, 554, 8000, 8080, 8001, 8090, 443] +# Extend this list if you know other common ports for your cameras + + +# --- Network Utility Functions --- + +def get_local_ip() -> str: + """Gets the local IP address by attempting to connect to an external server.""" + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # Try to connect to a public DNS server (doesn't actually send data) + s.connect(('8.8.8.8', 1)) + IP = s.getsockname()[0] + except Exception: + IP = '127.0.0.1' + finally: + s.close() + return IP + +def get_network_range(ip_address: str, subnet_mask: str = '24') -> ipaddress.IPv4Network: + """ + Given an IP address and subnet mask, returns the IPv4Network object. + Assumes /24 for simplicity if subnet mask is not explicitly provided. + """ + try: + network = ipaddress.IPv4Network(f"{ip_address}/{subnet_mask}", strict=False) + return network + except ipaddress.AddressValueError: + print(f"Error: Invalid IP address or subnet mask: {ip_address}/{subnet_mask}") + exit(1) + +# --- Discovery Functions --- + +def ping_ip(ip: str, timeout: float = 0.1) -> None: + """ + Attempts to 'ping' an IP by trying to connect to a common port (e.g., 80 or 554). + If connection is established, assumes the host is active. + """ + for port in [80, 554]: # Just need any open port to consider it "active" + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + try: + sock.connect((ip, port)) + with ip_lock: + active_ips.add(ip) + sock.close() + return # Host is active, no need to try other ports + except (socket.timeout, ConnectionRefusedError, OSError): + pass # Port not open or connection refused + except Exception as e: + # print(f"Ping error for {ip}:{port}: {e}") # Uncomment for deeper debugging + pass + finally: + if sock: + sock.close() + +def scan_port(ip: str, port: int, timeout: float = 0.1) -> Tuple[str, int, bool]: + """Scans a single port on a given IP address.""" + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + try: + sock.connect((ip, port)) + return ip, port, True + except (socket.timeout, ConnectionRefusedError, OSError): + return ip, port, False + finally: + sock.close() + + +def onvif_discovery_task() -> None: + """Performs ONVIF WS-Discovery to find compatible devices.""" + if not ONVIF_AVAILABLE: + return + + print("ONVIF: Starting discovery. This may take a few seconds...") + try: + # discover() sends a UDP multicast probe and listens for responses + # discovery_timeout is in seconds + devices = ONVIFCamera.discover(discovery_timeout=5) + print(f"ONVIF: Found {len(devices)} potential ONVIF devices.") + + for device_url in devices: + try: + # device_url looks like 'http://192.168.1.108/onvif/device_service' + # or 'http://192.168.1.108:8080/onvif/device_service' + parsed_url = urlparse(device_url) + ip = parsed_url.hostname + port = parsed_url.port if parsed_url.port else (80 if parsed_url.scheme == 'http' else 443) + + if ip: + with ip_lock: + active_ips.add(ip) + with camera_lock: + if ip not in found_cameras: + found_cameras[ip] = set() + found_cameras[ip].add(port) + print(f"ONVIF: Discovered device at {ip}:{port} (Service URL: {device_url})") + + except Exception as e: + print(f"ONVIF: Error parsing device URL {device_url}: {e}") + + except Exception as e: + print(f"ONVIF: Error during discovery: {e}") + +# --- Main Program Logic --- + +def main(): + print("--- Starting Network Camera Discovery on RK3588 ---") + + # 1. Get Local IP and Network Info + local_ip = get_local_ip() + if local_ip == '127.0.0.1': + print("Warning: Could not determine local IP. Using 192.168.1.0/24 as fallback.") + network_range = get_network_range('192.168.1.1', '24') + else: + print(f"Local IP Address: {local_ip}") + # Assuming /24 subnet for local network + network_range = get_network_range(local_ip, '24') + + print(f"Scanning network range: {network_range}") + + # Start ONVIF discovery in a separate thread + onvif_thread = threading.Thread(target=onvif_discovery_task) + onvif_thread.start() + + # 2. Ping all IPs in the subnet to find active hosts + print(f"\nPinging all IPs in {network_range} to find active hosts...") + ip_list = [str(ip) for ip in network_range.hosts()] + + # Use ThreadPoolExecutor for efficient parallel execution + ping_timeout = 0.05 # Smaller timeout for faster ping + with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor: + for ip in ip_list: + executor.submit(ping_ip, ip, ping_timeout) + + # Wait for ping tasks to complete + ping_start_time = time.time() + while threading.active_count() > 1 and (time.time() - ping_start_time) < 10: # Max 10s for pings + time.sleep(0.1) + + with ip_lock: + print(f"Found {len(active_ips)} active IPs.") + + # 3. Port scan active IPs + print(f"\nScanning active IPs for common camera ports: {COMMON_CAMERA_PORTS}...") + + futures = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor: + with ip_lock: # Ensure active_ips is not modified while iterating + for ip in list(active_ips): # Iterate over a copy to avoid issues if set changes + for port in COMMON_CAMERA_PORTS: + futures.append(executor.submit(scan_port, ip, port)) + + for future in concurrent.futures.as_completed(futures): + ip, port, is_open = future.result() + if is_open: + with camera_lock: + if ip not in found_cameras: + found_cameras[ip] = set() + found_cameras[ip].add(port) + # print(f" Found open port: {ip}:{port}") # Uncomment for verbose output + + # Wait for ONVIF discovery to finish + onvif_thread.join() + + # 4. Present Results + print("\n--- Discovery Results ---") + if not found_cameras: + print("No network cameras found based on common ports or ONVIF discovery.") + else: + print("Potential Network Cameras Found:") + for ip, ports in sorted(found_cameras.items()): + print(f" IP: {ip}") + print(f" Open Ports: {', '.join(map(str, sorted(list(ports))))}") + + # Heuristic to suggest RTSP/Web URLs + suggested_urls = [] + if 554 in ports: + suggested_urls.append(f"RTSP (example, user/pass/path needed): rtsp://[user]:[pass]@{ip}:554/stream") + if 80 in ports: + suggested_urls.append(f"Web Admin: http://{ip}") + if 8080 in ports: + suggested_urls.append(f"Web Admin: http://{ip}:8080") + if 443 in ports: + suggested_urls.append(f"Web Admin (HTTPS): https://{ip}") + + for url in suggested_urls: + print(f" - {url}") + print("-" * 30) + + print("\n--- Discovery Finished ---") + +if __name__ == "__main__": + main()