添加onvif库

This commit is contained in:
GuanYuankai 2025-10-20 10:30:27 +08:00
parent c6ef920844
commit 2885b69f4b
2 changed files with 224 additions and 0 deletions

View File

@ -66,6 +66,8 @@ RUN apt-get update && \
libegl1-mesa-dev \
libgles2-mesa-dev \
&& \
pip3 install --no-cache-dir onvif_zeep && \
\
# 用户和组创建
groupadd -r developers && \
useradd -ms /bin/bash -g developers -G sudo dev && \

View File

@ -0,0 +1,222 @@
import ipaddress
import socket
import threading
import concurrent.futures
import time
import re
from typing import List, Set, Dict, Tuple
from urllib.parse import urlparse
# ONVIF related imports (install with: pip install onvif_zeep)
try:
from onvif_zeep import ONVIFCamera
from zeep import xsd # Needed for some internal ONVIF type definitions
ONVIF_AVAILABLE = True
except ImportError:
print("Warning: 'onvif_zeep' library not found. ONVIF discovery will be skipped.")
print(" To enable ONVIF discovery, install it: pip install onvif_zeep")
ONVIF_AVAILABLE = False
# --- Global Data and Locks ---
active_ips: Set[str] = set()
found_cameras: Dict[str, Set[int]] = {} # IP -> {ports}
ip_lock = threading.Lock()
camera_lock = threading.Lock()
# Common camera ports to scan
COMMON_CAMERA_PORTS: List[int] = [80, 554, 8000, 8080, 8001, 8090, 443]
# Extend this list if you know other common ports for your cameras
# --- Network Utility Functions ---
def get_local_ip() -> str:
"""Gets the local IP address by attempting to connect to an external server."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# Try to connect to a public DNS server (doesn't actually send data)
s.connect(('8.8.8.8', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
def get_network_range(ip_address: str, subnet_mask: str = '24') -> ipaddress.IPv4Network:
"""
Given an IP address and subnet mask, returns the IPv4Network object.
Assumes /24 for simplicity if subnet mask is not explicitly provided.
"""
try:
network = ipaddress.IPv4Network(f"{ip_address}/{subnet_mask}", strict=False)
return network
except ipaddress.AddressValueError:
print(f"Error: Invalid IP address or subnet mask: {ip_address}/{subnet_mask}")
exit(1)
# --- Discovery Functions ---
def ping_ip(ip: str, timeout: float = 0.1) -> None:
"""
Attempts to 'ping' an IP by trying to connect to a common port (e.g., 80 or 554).
If connection is established, assumes the host is active.
"""
for port in [80, 554]: # Just need any open port to consider it "active"
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
sock.connect((ip, port))
with ip_lock:
active_ips.add(ip)
sock.close()
return # Host is active, no need to try other ports
except (socket.timeout, ConnectionRefusedError, OSError):
pass # Port not open or connection refused
except Exception as e:
# print(f"Ping error for {ip}:{port}: {e}") # Uncomment for deeper debugging
pass
finally:
if sock:
sock.close()
def scan_port(ip: str, port: int, timeout: float = 0.1) -> Tuple[str, int, bool]:
"""Scans a single port on a given IP address."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
sock.connect((ip, port))
return ip, port, True
except (socket.timeout, ConnectionRefusedError, OSError):
return ip, port, False
finally:
sock.close()
def onvif_discovery_task() -> None:
"""Performs ONVIF WS-Discovery to find compatible devices."""
if not ONVIF_AVAILABLE:
return
print("ONVIF: Starting discovery. This may take a few seconds...")
try:
# discover() sends a UDP multicast probe and listens for responses
# discovery_timeout is in seconds
devices = ONVIFCamera.discover(discovery_timeout=5)
print(f"ONVIF: Found {len(devices)} potential ONVIF devices.")
for device_url in devices:
try:
# device_url looks like 'http://192.168.1.108/onvif/device_service'
# or 'http://192.168.1.108:8080/onvif/device_service'
parsed_url = urlparse(device_url)
ip = parsed_url.hostname
port = parsed_url.port if parsed_url.port else (80 if parsed_url.scheme == 'http' else 443)
if ip:
with ip_lock:
active_ips.add(ip)
with camera_lock:
if ip not in found_cameras:
found_cameras[ip] = set()
found_cameras[ip].add(port)
print(f"ONVIF: Discovered device at {ip}:{port} (Service URL: {device_url})")
except Exception as e:
print(f"ONVIF: Error parsing device URL {device_url}: {e}")
except Exception as e:
print(f"ONVIF: Error during discovery: {e}")
# --- Main Program Logic ---
def main():
print("--- Starting Network Camera Discovery on RK3588 ---")
# 1. Get Local IP and Network Info
local_ip = get_local_ip()
if local_ip == '127.0.0.1':
print("Warning: Could not determine local IP. Using 192.168.1.0/24 as fallback.")
network_range = get_network_range('192.168.1.1', '24')
else:
print(f"Local IP Address: {local_ip}")
# Assuming /24 subnet for local network
network_range = get_network_range(local_ip, '24')
print(f"Scanning network range: {network_range}")
# Start ONVIF discovery in a separate thread
onvif_thread = threading.Thread(target=onvif_discovery_task)
onvif_thread.start()
# 2. Ping all IPs in the subnet to find active hosts
print(f"\nPinging all IPs in {network_range} to find active hosts...")
ip_list = [str(ip) for ip in network_range.hosts()]
# Use ThreadPoolExecutor for efficient parallel execution
ping_timeout = 0.05 # Smaller timeout for faster ping
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
for ip in ip_list:
executor.submit(ping_ip, ip, ping_timeout)
# Wait for ping tasks to complete
ping_start_time = time.time()
while threading.active_count() > 1 and (time.time() - ping_start_time) < 10: # Max 10s for pings
time.sleep(0.1)
with ip_lock:
print(f"Found {len(active_ips)} active IPs.")
# 3. Port scan active IPs
print(f"\nScanning active IPs for common camera ports: {COMMON_CAMERA_PORTS}...")
futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
with ip_lock: # Ensure active_ips is not modified while iterating
for ip in list(active_ips): # Iterate over a copy to avoid issues if set changes
for port in COMMON_CAMERA_PORTS:
futures.append(executor.submit(scan_port, ip, port))
for future in concurrent.futures.as_completed(futures):
ip, port, is_open = future.result()
if is_open:
with camera_lock:
if ip not in found_cameras:
found_cameras[ip] = set()
found_cameras[ip].add(port)
# print(f" Found open port: {ip}:{port}") # Uncomment for verbose output
# Wait for ONVIF discovery to finish
onvif_thread.join()
# 4. Present Results
print("\n--- Discovery Results ---")
if not found_cameras:
print("No network cameras found based on common ports or ONVIF discovery.")
else:
print("Potential Network Cameras Found:")
for ip, ports in sorted(found_cameras.items()):
print(f" IP: {ip}")
print(f" Open Ports: {', '.join(map(str, sorted(list(ports))))}")
# Heuristic to suggest RTSP/Web URLs
suggested_urls = []
if 554 in ports:
suggested_urls.append(f"RTSP (example, user/pass/path needed): rtsp://[user]:[pass]@{ip}:554/stream")
if 80 in ports:
suggested_urls.append(f"Web Admin: http://{ip}")
if 8080 in ports:
suggested_urls.append(f"Web Admin: http://{ip}:8080")
if 443 in ports:
suggested_urls.append(f"Web Admin (HTTPS): https://{ip}")
for url in suggested_urls:
print(f" - {url}")
print("-" * 30)
print("\n--- Discovery Finished ---")
if __name__ == "__main__":
main()