diff --git a/CMakeLists.txt b/CMakeLists.txt index 8e6d45e..ab817fd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,9 +23,6 @@ target_include_directories(nlohmann_json INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/src/vendor ) - - - add_library(edge_proxy_lib STATIC # TCP通讯层 src/network/tcp_server.cc @@ -83,13 +80,13 @@ target_link_libraries(edge_proxy_lib PRIVATE PahoMqttCpp::paho-mqttpp3 SQLite::SQLite3 pthread - nlohmann_json # rknn_api rknnrt ) target_link_libraries(edge_proxy_lib PUBLIC Crow + nlohmann_json ) # ================================================================= # Main Application Target (UNCOMMENTED AND ACTIVATED) @@ -102,19 +99,6 @@ target_link_libraries(edge_proxy PRIVATE edge_proxy_lib ) -# =================#================================================ -# 测试目标 -# ================================================================= -add_executable(test - src/test.cc - src/systemMonitor/iio_sensor.cc - -) - -target_link_libraries(test PRIVATE - edge_proxy_lib -) - add_executable(edge_streamer src/streamer/main_streamer.cpp @@ -122,17 +106,25 @@ add_executable(edge_streamer # 链接新服务所需的所有库 target_link_libraries(edge_streamer PRIVATE - # --- GStreamer (来自 pkg_check_modules) --- ${GST_LIBRARIES} - pthread ) -# 为新服务设置头文件包含路径 target_include_directories(edge_streamer PRIVATE - # 新服务的私有头文件 ${CMAKE_CURRENT_SOURCE_DIR}/src/streamer/include - - # --- GStreamer (来自 pkg_check_modules) --- ${GST_INCLUDE_DIRS} ) + +# =================#================================================ +# 测试目标 +# ================================================================= +# add_executable(test +# src/test.cc +# src/systemMonitor/iio_sensor.cc + +# ) + +# target_link_libraries(test PRIVATE +# edge_proxy_lib +# ) + diff --git a/config/devices.json b/config/devices.json index c462e9c..fcafe9e 100644 --- a/config/devices.json +++ b/config/devices.json @@ -1,59 +1,59 @@ -{ - "modbus_rtu_devices": [ - { - "enabled": false, - "device_id": "rtu_temp_sensor_lab", - "port_path": "/dev/ttyS7", - "baud_rate": 9600, - "slave_id": 1, - "poll_interval_ms": 5000, - "data_points": [ - {"name": "temperature", "address": 0, "type": "INT16", "scale": 0.1}, - {"name": "humidity", "address": 1, "type": "UINT16", "scale": 0.1} - ] - }, - { - "enabled": false, - "device_id": "rotary encoder", - "port_path": "/dev/ttyS7", - "baud_rate": 9600, - "slave_id": 111, - "poll_interval_ms": 5000, - "data_points": [ - {"name": "count", "address": 1, "type": "INT16", "scale": 1.0}, - {"name": "total_count", "address": 2, "type": "INT16", "scale": 1.0} - ] - }, - { - "enabled": false, - "device_id": "backup_counter", - "port_path": "/dev/ttyS7", - "baud_rate": 9600, - "slave_id": 10, - "poll_interval_ms": 1000, - "data_points": [ - {"name": "count", "address": 32, "type": "UINT32"} - ] + { + "modbus_rtu_devices": [ + { + "enabled": true, + "device_id": "rtu_temp_sensor_lab", + "port_path": "/dev/ttyS7", + "baud_rate": 9600, + "slave_id": 1, + "poll_interval_ms": 5000, + "data_points": [ + {"name": "temperature", "address": 0, "type": "INT16", "scale": 0.1}, + {"name": "humidity", "address": 1, "type": "UINT16", "scale": 0.1} + ] + }, + { + "enabled": false, + "device_id": "rotary encoder", + "port_path": "/dev/ttyS7", + "baud_rate": 9600, + "slave_id": 111, + "poll_interval_ms": 5000, + "data_points": [ + {"name": "count", "address": 1, "type": "INT16", "scale": 1.0}, + {"name": "total_count", "address": 2, "type": "INT16", "scale": 1.0} + ] + }, + { + "enabled": false, + "device_id": "backup_counter", + "port_path": "/dev/ttyS7", + "baud_rate": 9600, + "slave_id": 10, + "poll_interval_ms": 1000, + "data_points": [ + {"name": "count", "address": 32, "type": "UINT32"} + ] + } + ], + "modbus_tcp_devices": [ + { + "enabled": false, + "device_id": "plc_workshop1", + "ip_address": "192.168.1.120", + "port": 502, + "slave_id": 1, + "poll_interval_ms": 1000, + "data_points": [ + {"name": "motor_speed", "address": 100, "type": "UINT16", "scale": 1.0}, + {"name": "pressure", "address": 102, "type": "FLOAT32", "scale": 0.01}, + {"name": "valve_status","address": 104, "type": "UINT16", "scale": 1.0} + ] + } + ], + "modbus_rtu_bus_configs": { + "/dev/ttyS7": { + "inter_device_delay_ms": 150 + } } - ], - "modbus_tcp_devices": [ - { - "enabled": false, - "device_id": "plc_workshop1", - "ip_address": "192.168.1.120", - "port": 502, - "slave_id": 1, - "poll_interval_ms": 1000, - "data_points": [ - {"name": "motor_speed", "address": 100, "type": "UINT16", "scale": 1.0}, - {"name": "pressure", "address": 102, "type": "FLOAT32", "scale": 0.01}, - {"name": "valve_status","address": 104, "type": "UINT16", "scale": 1.0} - ] - } - ], - "modbus_rtu_bus_configs": { - "/dev/ttyS7": { - "inter_device_delay_ms": 150 - } - } -} \ No newline at end of file + } \ No newline at end of file diff --git a/python-work/new_find_camera.py b/python-work/new_find_camera.py new file mode 100644 index 0000000..00616ee --- /dev/null +++ b/python-work/new_find_camera.py @@ -0,0 +1,280 @@ +#!/usr/bin/env python3 + +import subprocess +import json +import sys +import ipaddress +import socket +import re +from typing import List, Set, Dict, Any +import time # 导入 time +import threading # 导入 threading + +# 尝试导入 psutil +try: + import psutil +except ImportError: + print("错误: 'psutil' 库未找到。请在 Dockerfile 中或使用 'pip install psutil' 安装。") + sys.exit(1) + +# --- 配置 --- +COMMON_CAMERA_PORTS = [ + 80, # HTTP (web interface) + 443, # HTTPS (secure web interface) + 554, # RTSP (Real Time Streaming Protocol) + 8000, # Often used by Hikvision (SDK/HTTP) + 8080, # Alternative HTTP/RTSP + 8001, # Hikvision stream port + 37777, # Dahua primary port + 37778, # Dahua secondary port + 8002, # Often used for camera APIs or secondary streams +] +NMAP_PORT_STRING = "T:" + ",".join(map(str, COMMON_CAMERA_PORTS)) + +# 用于过滤 Docker 网络的常见前缀 +DOCKER_NETWORK_PREFIXES = ["172.17.", "172.18.", "172.19.", "172.20."] + +# 全局标志,用于停止进度线程 +stop_progress_flag = threading.Event() +start_time = time.time() + +def progress_indicator(): + """在后台线程中运行,定期打印进度信息。""" + interval = 10 # 每 10 秒报告一次 + count = 0 + + # 打印一个提示,说明进度指示器已启动 + print(f"--- Nmap 扫描进行中(每 {interval} 秒更新一次)---", file=sys.stderr) + + while not stop_progress_flag.is_set(): + time_elapsed = time.time() - start_time + + # 使用 is_set() 检查标志,并等待最多 interval 秒 + stop_progress_flag.wait(interval) + + if not stop_progress_flag.is_set(): + count += 1 + # 打印一个简单的状态信息 + print(f"--- 状态更新 #{count}: 扫描已运行 {time_elapsed:.1f} 秒... ---", file=sys.stderr) + +def get_local_ip() -> str: + """获取本机的本地 IP 地址。""" + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # 连接到一个虚拟地址 + s.connect(('10.255.255.255', 1)) + IP = s.getsockname()[0] + except Exception: + IP = '127.0.0.1' + finally: + s.close() + return IP + +def get_all_local_networks() -> List[str]: + """ + 使用 psutil 查找所有活动的网络接口及其关联的网络范围。 + 返回一个网络 CIDR 字符串列表 (例如 '192.168.1.0/24')。 + """ + networks = set() + try: + for interface, snics in psutil.net_if_addrs().items(): + for snic in snics: + if snic.family == socket.AF_INET: # IPv4 + ip_address = snic.address + netmask = snic.netmask + if ip_address and netmask and ip_address != '127.0.0.1': + try: + # strict=False 允许主机地址 + network_obj = ipaddress.IPv4Network(f"{ip_address}/{netmask}", strict=False) + networks.add(str(network_obj)) + except ipaddress.AddressValueError as e: + print(f"警告: 无法解析网络 {interface}: {ip_address}, {netmask}。错误: {e}", file=sys.stderr) + return list(networks) + except Exception as e: + print(f"错误: 使用 psutil 获取网络接口时出错: {e}", file=sys.stderr) + print("回退: 仅扫描本地 IP 的 /24 子网。", file=sys.stderr) + local_ip = get_local_ip() + if local_ip == '127.0.0.1': + return [] + return [str(ipaddress.IPv4Network(f"{local_ip}/24", strict=False))] + +def filter_docker_networks(all_networks: List[str]) -> List[str]: + """过滤掉已知的 Docker 内部网络。""" + filtered_networks = [] + print("--- 检测到以下网络 ---", file=sys.stderr) + for net_cidr in all_networks: + print(f" - {net_cidr}", file=sys.stderr) + + for net_cidr in all_networks: + is_docker_internal = False + for prefix in DOCKER_NETWORK_PREFIXES: + if net_cidr.startswith(prefix): + is_docker_internal = True + break + + if not is_docker_internal: + filtered_networks.append(net_cidr) + else: + print(f" - 排除 Docker 内部网络: {net_cidr}", file=sys.stderr) + + if not filtered_networks and all_networks: + print(f"警告: 所有检测到的网络似乎都是 Docker 内部网络。扫描可能不会发现外部设备。", file=sys.stderr) + print(f"将继续扫描所有检测到的网络: {', '.join(all_networks)}", file=sys.stderr) + return all_networks # 如果过滤后为空,但原来不为空,则返回原列表 + + return filtered_networks + +def run_nmap_scan(targets: List[str]) -> str: + """ + 对目标网络运行 nmap 扫描。 + -sV: 服务版本检测 + -p [ports]: 扫描指定的 TCP 端口 + -T4: 加快扫描速度 + -oJ -: 将 JSON 输出到 stdout + """ + print(f"\n--- 开始 Nmap 扫描 (目标: {', '.join(targets)}) ---", file=sys.stderr) + print(f"--- 扫描端口: {NMAP_PORT_STRING} ---", file=sys.stderr) + + # 需要 sudo 权限来运行 nmap + # 您的 Dockerfile 已经为 'dev' 用户设置了 NOPASSWD:ALL + nmap_command = [ + 'sudo', 'nmap', + '-oJ', '-', # 输出 JSON 到 stdout + '-T4', # 更快的时间模板 + '--open', # 只显示打开的端口 + '-sV', # 服务版本指纹识别 + # --- 移除了导致错误的 ONVIF 脚本 --- + # '--script=broadcast-onvif-discover', + # --- --------------------------- --- + '-p', NMAP_PORT_STRING, # 扫描特定端口 + ] + targets + + start_time = time.time() + stop_progress_flag.clear() # 确保标志是清除的 + progress_thread = threading.Thread(target=progress_indicator, daemon=True) + progress_thread.start() + try: + # 设置超时,例如10分钟,以防扫描卡住 + result = subprocess.run(nmap_command, + capture_output=True, + text=True, + check=True, + timeout=600) + stop_progress_flag.set() + progress_thread.join() # 等 + print("--- Nmap 扫描完成 ---", file=sys.stderr) + 待线程清理 + return result.stdout + except subprocess.CalledProcessError as e: + print(f"错误: Nmap 执行失败 (返回码 {e.returncode}):", file=sys.stderr) + print(f"Nmap Stderr: {e.stderr}", file=sys.stderr) + return None + except subprocess.TimeoutExpired as e: + print(f"错误: Nmap 扫描超时 ({e.timeout} 秒)。", file=sys.stderr) + print(f"Nmap Stdout: {e.stdout}", file=sys.stderr) + print(f"Nmap Stderr: {e.stderr}", file=sys.stderr) + return None + except FileNotFoundError: + print("错误: 'sudo' 或 'nmap' 命令未找到。请确保它在 PATH 中并且已安装。", file=sys.stderr) + return None + +def parse_nmap_json(nmap_stdout: str) -> List[Dict[str, Any]]: + """将 nmap 的 JSON 输出解析为我们想要的简洁格式。""" + results = [] + if not nmap_stdout: + return results + + try: + nmap_data = json.loads(nmap_stdout) + except json.JSONDecodeError as e: + print(f"错误: 无法解析 Nmap 的 JSON 输出。{e}", file=sys.stderr) + print(f"原始输出: {nmap_stdout[:500]}...", file=sys.stderr) + return results + + if 'hosts' not in nmap_data: + print("警告: Nmap 输出中未找到 'hosts' 键。", file=sys.stderr) + return results + + for host in nmap_data.get('hosts', []): + if host.get('status', {}).get('state') != 'up': + continue + + ip = host.get('addresses', {}).get('ipv4') + mac = host.get('addresses', {}).get('mac') + if not ip: + continue + + vendor = host.get('vendor', {}).get(mac, None) if mac else None + + device_info = { + "ip": ip, + "mac": mac, + "vendor": vendor, + "services": [], + "onvif_xaddrs": [] # 即使没有脚本,也保留此键以保持格式一致 + } + + # 1. 解析端口和服务 + for port_info in host.get('ports', []): + if port_info.get('state', {}).get('state') == 'open': + service_data = port_info.get('service', {}) + service = { + "port": int(port_info.get('portid')), + "service_name": service_data.get('name'), + "product": service_data.get('product'), + "version": service_data.get('version'), + "extra_info": service_data.get('extrainfo') + } + device_info["services"].append(service) + + # 2. 解析 ONVIF 脚本输出 (现在为空) + # 'hostscript' 字段包含主机级别的脚本结果 + for script in host.get('hostscript', []): + if script.get('id') == 'broadcast-onvif-discover': + # Nmap 脚本输出是一个字符串。我们用正则提取 XAddrs。 + output = script.get('output', '') + # 匹配 'http://...' 格式的 URL,直到遇到空格或换行符 + xaddrs = re.findall(r'http://[^\s,]+', output) + device_info["onvif_xaddrs"] = xaddrs + + # 只有当我们发现了开放服务时才添加该设备 + if device_info["services"]: + results.append(device_info) + + return results + +def main(): + print("--- 启动网络摄像头发现 (基于 Nmap) ---", file=sys.stderr) + + # 1. 获取并过滤本地网络 + all_local_networks = get_all_local_networks() + if not all_local_networks: + print("错误: 未检测到本地网络用于扫描。退出。", file=sys.stderr) + sys.exit(1) + + networks_to_scan = filter_docker_networks(all_local_networks) + + if not networks_to_scan: + print("警告: 过滤后没有可扫描的网络。退出。", file=sys.stderr) + sys.exit(0) + + print("\n--- 将扫描以下网络 ---", file=sys.stderr) + for net in networks_to_scan: + print(f" - {net}", file=sys.stderr) + + # 2. 运行 Nmap 扫描 + nmap_json_output = run_nmap_scan(networks_to_scan) + + if not nmap_json_output: + print("错误: 未从 Nmap 收到任何输出。退出。", file=sys.stderr) + sys.exit(1) + + # 3. 解析 Nmap JSON + final_results = parse_nmap_json(nmap_json_output) + + # 4. 将最终结果以 JSON 格式打印到 stdout + # 注意:stderr 用于打印日志,stdout 仅用于输出最终的 JSON + print(json.dumps(final_results, indent=2)) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/deviceManager/device_manager.cc b/src/deviceManager/device_manager.cc index c7db2af..40d296c 100644 --- a/src/deviceManager/device_manager.cc +++ b/src/deviceManager/device_manager.cc @@ -5,10 +5,10 @@ #include #include #include +#include using json = nlohmann::json; - static ModbusDataType string_to_modbus_data_type(const std::string& type_str) { static const std::map type_map = { {"UINT16", ModbusDataType::UINT16}, @@ -26,175 +26,272 @@ static ModbusDataType string_to_modbus_data_type(const std::string& type_str) { DeviceManager::DeviceManager(boost::asio::io_context& io_context, ReportDataCallback report_cb) - : m_io_context(io_context), m_report_callback(std::move(report_cb)) {} + : m_io_context(io_context), m_report_callback(std::move(report_cb)) {} // DeviceManager::~DeviceManager() { - stop_all(); + stop_all(); } +ModbusRtuDeviceConfig DeviceManager::_parse_rtu_config(const json& dev_json) { + ModbusRtuDeviceConfig config; + config.device_id = dev_json.at("device_id").get(); + config.port_path = dev_json.at("port_path").get(); + config.baud_rate = dev_json.at("baud_rate").get(); + config.slave_id = dev_json.at("slave_id").get(); + config.poll_interval_ms = dev_json.at("poll_interval_ms").get(); + + for (const auto& dp_json : dev_json.at("data_points")) { + config.data_points.push_back({ + dp_json.at("name").get(), + (uint16_t)dp_json.at("address").get(), + string_to_modbus_data_type(dp_json.at("type").get()), + dp_json.value("scale", 1.0) + }); + } + return config; +} + +ModbusTcpDeviceConfig DeviceManager::_parse_tcp_config(const json& dev_json) { + ModbusTcpDeviceConfig config; + config.device_id = dev_json.at("device_id").get(); + config.ip_address = dev_json.at("ip_address").get(); + config.port = dev_json.at("port").get(); + config.slave_id = dev_json.at("slave_id").get(); + config.poll_interval_ms = dev_json.at("poll_interval_ms").get(); + + for (const auto& dp_json : dev_json.at("data_points")) { + config.data_points.push_back({ + dp_json.at("name").get(), + (uint16_t)dp_json.at("address").get(), + string_to_modbus_data_type(dp_json.at("type").get()), + dp_json.value("scale", 1.0) + }); + } + return config; +} + + void DeviceManager::load_and_start(const std::string& config_path) { - std::lock_guard lock(m_mutex); - spdlog::info("Loading device configuration from '{}'...", config_path); - std::ifstream config_file(config_path); - if (!config_file.is_open()) { - spdlog::critical("Failed to open configuration file: {}", config_path); - throw std::runtime_error("Configuration file not found."); + std::lock_guard lock(m_mutex); // + spdlog::info("Loading device configuration from '{}'...", config_path); // + std::ifstream config_file(config_path); // + if (!config_file.is_open()) { // + spdlog::critical("Failed to open configuration file: {}", config_path); // + throw std::runtime_error("Configuration file not found."); // } try { - json config_json = json::parse(config_file); + json config_json = json::parse(config_file); // + + // --- 修改:委托给新的核心逻辑函数 --- + _parse_and_apply_config(config_json); + // --- 结束修改 --- - // --- MODIFIED: 加载 Modbus RTU 设备的逻辑 --- - if (config_json.contains("modbus_rtu_devices")) { - // 1. 先将所有启用的设备按 port_path (总线) 分组 - std::map> rtu_device_groups; - for (const auto& dev_json : config_json["modbus_rtu_devices"]) { - if (!dev_json.value("enabled", false)) continue; - - ModbusRtuDeviceConfig config; - config.device_id = dev_json.at("device_id").get(); - config.port_path = dev_json.at("port_path").get(); - config.baud_rate = dev_json.at("baud_rate").get(); - config.slave_id = dev_json.at("slave_id").get(); - config.poll_interval_ms = dev_json.at("poll_interval_ms").get(); - - for (const auto& dp_json : dev_json.at("data_points")) { - config.data_points.push_back({ - dp_json.at("name").get(), - (uint16_t)dp_json.at("address").get(), - string_to_modbus_data_type(dp_json.at("type").get()), - dp_json.value("scale", 1.0) - }); - } - - // 将解析后的配置加入对应的分组 - rtu_device_groups[config.port_path].push_back(config); - } - - // 2. 为每个分组 (即每个物理串口) 创建一个 BusService - for (const auto& pair : rtu_device_groups) { - const std::string& port_path = pair.first; - const std::vector& devices_on_bus = pair.second; - - auto service = std::make_unique(port_path, devices_on_bus, m_report_callback); - service->start(); - m_rtu_bus_services.push_back(std::move(service)); - spdlog::info("Started Modbus RTU Bus service for port '{}' with {} device(s).", port_path, devices_on_bus.size()); - } - } - - // --- 加载 Modbus TCP 设备 (逻辑保持不变) --- - if (config_json.contains("modbus_tcp_devices")) { - for (const auto& dev_json : config_json["modbus_tcp_devices"]) { - if (!dev_json.value("enabled", false)) continue; - - ModbusTcpDeviceConfig config; - config.device_id = dev_json.at("device_id").get(); - config.ip_address = dev_json.at("ip_address").get(); - config.port = dev_json.at("port").get(); - config.slave_id = dev_json.at("slave_id").get(); - config.poll_interval_ms = dev_json.at("poll_interval_ms").get(); - - for (const auto& dp_json : dev_json.at("data_points")) { - config.data_points.push_back({ - dp_json.at("name").get(), - (uint16_t)dp_json.at("address").get(), - string_to_modbus_data_type(dp_json.at("type").get()), - dp_json.value("scale", 1.0) - }); - } - - auto poller = std::make_shared(m_io_context, config, m_report_callback); - poller->start(); - m_tcp_pollers.push_back(poller); - spdlog::info("Started Modbus TCP service for device '{}'.", config.device_id); - } - } - - } catch (const json::exception& e) { - spdlog::critical("Failed to parse JSON configuration: {}", e.what()); - throw; + } catch (const json::exception& e) { // + spdlog::critical("Failed to parse JSON configuration: {}", e.what()); // + throw; // } } -void DeviceManager::stop_all() { +void DeviceManager::post_apply_device_configuration(const std::string& json_payload) { + spdlog::debug("Posting config update task to io_context."); + boost::asio::post(m_io_context, [this, json_payload]() { + this->_apply_config_task(json_payload); + }); +} + +// +++ 新增:在 io_context 线程上执行的私有任务 +++ +void DeviceManager::_apply_config_task(std::string json_payload) { + // 现在我们在 io_context 线程上,安全地锁定互斥锁 std::lock_guard lock(m_mutex); - spdlog::info("Stopping all device services..."); + spdlog::info("Applying new device configuration (from MQTT)..."); - // 停止所有 RTU 总线服务 - for (auto& service : m_rtu_bus_services) { - service->stop(); + try { + json config_json = json::parse(json_payload); + _parse_and_apply_config(config_json); + spdlog::info("Successfully applied new device configuration."); + } catch (const json::exception& e) { + spdlog::error("Failed to parse and apply new config: {}", e.what()); } - m_rtu_bus_services.clear(); +} + +// +++ 新增:核心 "Diff" 逻辑 (私有, 必须持锁调用) +++ +void DeviceManager::_parse_and_apply_config(const json& config_json) { + // --- 解析新配置,构建“期望状态” --- - // 停止所有 TCP 轮询服务 - for (auto& poller : m_tcp_pollers) { - poller->stop(); + // key: port_path, value: 该总线上的所有设备配置 + std::map> new_rtu_groups; + if (config_json.contains("modbus_rtu_devices")) { + for (const auto& dev_json : config_json["modbus_rtu_devices"]) { + if (!dev_json.value("enabled", false)) continue; + auto config = _parse_rtu_config(dev_json); + new_rtu_groups[config.port_path].push_back(config); + } } - m_tcp_pollers.clear(); + + // key: device_id, value: 该设备的配置 + std::map new_tcp_configs; + if (config_json.contains("modbus_tcp_devices")) { + for (const auto& dev_json : config_json["modbus_tcp_devices"]) { + if (!dev_json.value("enabled", false)) continue; + auto config = _parse_tcp_config(dev_json); + new_tcp_configs[config.device_id] = config; + } + } + + // --- "Diff" RTU 总线服务 --- + // (我们使用 port_path 作为服务的唯一标识) + + // 查找需要停止的 RTU 服务 (存在于当前,但不存在于新配置) + for (auto it = m_rtu_bus_services.begin(); it != m_rtu_bus_services.end(); /* no increment */) { + const std::string& port_path = it->first; + if (new_rtu_groups.find(port_path) == new_rtu_groups.end()) { + spdlog::info("Config update: Stopping RTU service for port '{}'.", port_path); + it->second->stop(); + it = m_rtu_bus_services.erase(it); // erase(it) 返回下一个迭代器 + } else { + ++it; + } + } + + // 启动新/更新 RTU 服务 + for (const auto& pair : new_rtu_groups) { + const std::string& port_path = pair.first; + const auto& devices_on_bus = pair.second; + + auto it = m_rtu_bus_services.find(port_path); + if (it != m_rtu_bus_services.end()) { + // 服务已存在,我们必须重启它以应用新配置 + // TODO: 高级实现可以检查配置是否真的改变了 + spdlog::info("Config update: Restarting RTU service for port '{}'...", port_path); + it->second->stop(); + m_rtu_bus_services.erase(it); + } else { + spdlog::info("Config update: Starting new RTU service for port '{}'...", port_path); + } + + auto service = std::make_unique(port_path, devices_on_bus, m_report_callback); + service->start(); + m_rtu_bus_services[port_path] = std::move(service); + spdlog::info("Started Modbus RTU Bus service for port '{}' with {} device(s).", port_path, devices_on_bus.size()); + } + + // --- "Diff" TCP 轮询服务 --- + // (我们使用 device_id 作为服务的唯一标识) + + // 查找需要停止的 TCP 服务 + for (auto it = m_tcp_pollers.begin(); it != m_tcp_pollers.end(); /* no increment */) { + const std::string& device_id = it->first; + if (new_tcp_configs.find(device_id) == new_tcp_configs.end()) { + spdlog::info("Config update: Stopping TCP poller for device '{}'.", device_id); + it->second->stop(); + it = m_tcp_pollers.erase(it); + } else { + ++it; + } + } + + // 启动新/更新 TCP 服务 + for (const auto& pair : new_tcp_configs) { + const std::string& device_id = pair.first; + const auto& config = pair.second; + + auto it = m_tcp_pollers.find(device_id); + if (it != m_tcp_pollers.end()) { + spdlog::info("Config update: Restarting TCP poller for device '{}'...", device_id); + it->second->stop(); + m_tcp_pollers.erase(it); + } else { + spdlog::info("Config update: Starting new TCP poller for device '{}'...", device_id); + } + + auto poller = std::make_shared(m_io_context, config, m_report_callback); + poller->start(); + m_tcp_pollers[device_id] = poller; + } +} + + +void DeviceManager::stop_all() { + std::lock_guard lock(m_mutex); // + spdlog::info("Stopping all device services..."); // - spdlog::info("All device services stopped."); + // --- 遍历 map --- + for (auto& pair : m_rtu_bus_services) { + pair.second->stop(); + } + m_rtu_bus_services.clear(); // + + for (auto& pair : m_tcp_pollers) { + pair.second->stop(); + } + m_tcp_pollers.clear(); // + + + spdlog::info("All device services stopped."); // } std::vector DeviceManager::get_all_device_info() const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); // std::vector all_devices; - all_devices.reserve(m_tcp_pollers.size()); // 初始容量可以先按TCP设备数算 - - // 遍历所有 RTU 总线服务 - for (const auto& service : m_rtu_bus_services) { - const auto& configs_on_bus = service->get_all_device_configs(); - for (const auto& config : configs_on_bus) { + + // --- 历 map --- + for (const auto& pair : m_rtu_bus_services) { + const auto& service = pair.second; + const auto& configs_on_bus = service->get_all_device_configs(); + for (const auto& config : configs_on_bus) { DeviceInfo info; - info.id = config.device_id; - info.type = "ModbusRTU"; - info.is_running = service->is_running(); - info.connection_details["Port Path"] = config.port_path; - info.connection_details["Baud Rate"] = std::to_string(config.baud_rate); - info.connection_details["Slave ID"] = std::to_string(config.slave_id); - all_devices.push_back(info); + info.id = config.device_id; + info.type = "ModbusRTU"; + info.is_running = service->is_running(); + info.connection_details["Port Path"] = config.port_path; + info.connection_details["Baud Rate"] = std::to_string(config.baud_rate); + info.connection_details["Slave ID"] = std::to_string(config.slave_id); + all_devices.push_back(info); } } - // 遍历 TCP 设备 (逻辑不变) - for (const auto& poller : m_tcp_pollers) { - const auto& config = poller->get_config(); + for (const auto& pair : m_tcp_pollers) { + const auto& poller = pair.second; + const auto& config = poller->get_config(); DeviceInfo info; - info.id = config.device_id; - info.type = "ModbusTCP"; - info.is_running = poller->is_running(); - info.connection_details["IP Address"] = config.ip_address; - info.connection_details["Port"] = std::to_string(config.port); - info.connection_details["Slave ID"] = std::to_string(config.slave_id); - all_devices.push_back(info); + info.id = config.device_id; + info.type = "ModbusTCP"; + info.is_running = poller->is_running(); + info.connection_details["IP Address"] = config.ip_address; + info.connection_details["Port"] = std::to_string(config.port); + info.connection_details["Slave ID"] = std::to_string(config.slave_id); + all_devices.push_back(info); } - return all_devices; + + return all_devices; } bool DeviceManager::send_control_command(const std::string& device_id, uint16_t address, uint16_t value) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); // - // 首先,在 TCP poller 列表中查找 - for (const auto& poller : m_tcp_pollers) { - if (poller->get_config().device_id == device_id) { - spdlog::info("Found TCP device '{}'. Dispatching write command to address {}.", device_id, address); - poller->write_single_register(address, value); - return true; + // --- 使用 map::find 进行 O(logN) 查找 --- + auto tcp_it = m_tcp_pollers.find(device_id); + if (tcp_it != m_tcp_pollers.end()) { + spdlog::info("Found TCP device '{}'. Dispatching write command to address {}.", device_id, address); + tcp_it->second->write_single_register(address, value); + return true; + } + + + // RTU 查找逻辑保持不变 + for (const auto& pair : m_rtu_bus_services) { + const auto& service = pair.second; + if (service->manages_device(device_id)) { // + spdlog::info("Found RTU device '{}' on a bus. Dispatching write command.", device_id); // + service->write_single_register(device_id, address, value); // + return true; // } } - // 然后,在 RTU bus service 列表中查找哪个服务管理着该设备 - for (const auto& service : m_rtu_bus_services) { - if (service->manages_device(device_id)) { - spdlog::info("Found RTU device '{}' on a bus. Dispatching write command.", device_id); - service->write_single_register(device_id, address, value); - return true; - } - } - - spdlog::warn("send_control_command failed: Device with ID '{}' not found in any service.", device_id); - return false; + spdlog::warn("send_control_command failed: Device with ID '{}' not found in any service.", device_id); // + return false; // } \ No newline at end of file diff --git a/src/deviceManager/device_manager.h b/src/deviceManager/device_manager.h index 3798c76..f9e5fa0 100644 --- a/src/deviceManager/device_manager.h +++ b/src/deviceManager/device_manager.h @@ -10,18 +10,17 @@ #include #include #include +#include +#include // /** * @brief 用于向API层传递设备信息的统一结构体 */ struct DeviceInfo { + // ... (结构体保持不变) ... std::string id; - std::string type; // 例如 "ModbusRTU", "ModbusTCP" + std::string type; bool is_running; - - // 使用 map 存储连接相关的详细信息,增加灵活性 - // 例如: {"IP Address": "192.168.1.10", "Port": "502"} - // 或 {"Port Path": "/dev/ttyS0", "Baud Rate": "9600"} std::map connection_details; }; @@ -43,17 +42,25 @@ public: * @brief 从JSON配置文件加载所有设备并启动服务 * @param config_path JSON配置文件的路径 */ - void load_and_start(const std::string& config_path); + void load_and_start(const std::string& config_path); // + + /** + * @brief (线程安全) 从 MQTT 线程接收配置更新请求 + * 此方法是异步的,它将配置应用任务 post 到 io_context 线程。 + * @param json_payload 包含新配置的 JSON 字符串 + */ + void post_apply_device_configuration(const std::string& json_payload); + /** * @brief 获取所有当前管理的设备的信息 * @return 包含所有设备信息的vector */ - std::vector get_all_device_info() const; + std::vector get_all_device_info() const; // /** * @brief 安全地停止所有正在运行的设备服务 */ - void stop_all(); + void stop_all(); // /** * @brief 向指定的Modbus设备发送一个写单个寄存器的命令 @@ -62,17 +69,41 @@ public: * @param value 要写入的值 * @return 如果找到设备并成功分派命令,则返回true;否则返回false */ - bool send_control_command(const std::string& device_id, uint16_t address, uint16_t value); + bool send_control_command(const std::string& device_id, uint16_t address, uint16_t value); // private: - boost::asio::io_context& m_io_context; - ReportDataCallback m_report_callback; + /** + * @brief (私有, 在 io_context 线程上执行) + * 锁定互斥锁并执行配置应用。 + */ + void _apply_config_task(std::string json_payload); - // 用于存储正在运行的服务实例,以管理其生命周期 - std::vector> m_rtu_bus_services; - std::vector> m_tcp_pollers; + /** + * @brief (私有, 必须在互斥锁保护下调用) + * 核心的 "Diff" 逻辑,比较新配置与当前运行的服务,并应用更改。 + */ + void _parse_and_apply_config(const nlohmann::json& config_json); - mutable std::mutex m_mutex; + /** + * @brief (私有, 辅助函数) 解析 Modbus RTU 设备配置 + */ + ModbusRtuDeviceConfig _parse_rtu_config(const nlohmann::json& dev_json); + + /** + * @brief (私有, 辅助函数) 解析 Modbus TCP 设备配置 + */ + ModbusTcpDeviceConfig _parse_tcp_config(const nlohmann::json& dev_json); + +private: + boost::asio::io_context& m_io_context; // + ReportDataCallback m_report_callback; // + + // key: port_path (e.g., /dev/ttyS0) + std::map> m_rtu_bus_services; + // key: device_id + std::map> m_tcp_pollers; + + mutable std::mutex m_mutex; // }; #endif // DEVICE_MANAGER_H \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 6d5801a..bbf3812 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -18,7 +18,6 @@ #include #include -// 用于 ASIO 服务的全局 io_context boost::asio::io_context g_io_context; /** @@ -87,7 +86,7 @@ int main(int argc, char* argv[]) { }; DeviceManager device_manager(g_io_context, report_to_mqtt); - MqttRouter mqtt_router(mqtt_client, device_manager); + MqttRouter mqtt_router(mqtt_client, device_manager, "../config/devices.json"); std::vector listen_ports = { 12345 }; TCPServer tcp_server(g_io_context, listen_ports, mqtt_client); SystemMonitor::SystemMonitor monitor; diff --git a/src/mqtt/handler/command_handler.cpp b/src/mqtt/handler/command_handler.cpp index ebf0275..cd06897 100644 --- a/src/mqtt/handler/command_handler.cpp +++ b/src/mqtt/handler/command_handler.cpp @@ -2,50 +2,112 @@ #include "command_handler.h" #include "spdlog/spdlog.h" #include "vendor/nlohmann/json.hpp" +#include "utils/mqtt_topic_matcher.h" +#include using json = nlohmann::json; -// 构造函数实现更新 -CommandHandler::CommandHandler(MqttClient& client, DeviceManager& deviceManager) - : m_client(client), m_device_manager(deviceManager) {} +CommandHandler::CommandHandler(MqttClient& client, DeviceManager& deviceManager, std::string config_path) + : m_client(client), + m_device_manager(deviceManager), + m_config_file_path(std::move(config_path)) +{ + spdlog::info("CommandHandler initialized, will persist config to: {}", m_config_file_path); +} void CommandHandler::handle(mqtt::const_message_ptr msg) { const std::string topic = msg->get_topic(); const std::string payload = msg->get_payload_str(); - - spdlog::info("CommandHandler received a command on topic '{}'", topic); - try { - auto cmd_json = json::parse(payload); + const std::string command_filter = "commands/my-edge-proxy-device-01/#"; + const std::string proxy_control_filter = "proxy/control/my-edge-proxy-device-01/config/update"; + const std::string ack_topic = "proxy/control/my-edge-proxy-device-01/config/ack"; - // 校验命令格式 - if (!cmd_json.contains("deviceId") || !cmd_json.contains("address") || !cmd_json.contains("value")) { - spdlog::warn("Command JSON from topic '{}' is missing required fields.", topic); + if (MqttUtils::topic_matches(proxy_control_filter, topic)) { + + spdlog::info("CommandHandler received proxy config update on topic '{}'", topic); + + if (payload.empty()) { + spdlog::warn("Config update payload is empty. Ignoring."); + m_client.publish(ack_topic, "{\"status\":\"error\",\"reason\":\"payload empty\"}"); return; } - // 解析命令 - std::string device_id = cmd_json.at("deviceId").get(); - uint16_t address = cmd_json.at("address").get(); - uint16_t value = cmd_json.at("value").get(); + try { + // 步骤 1: 尝试将新配置写入磁盘文件 + std::ofstream config_file(m_config_file_path); + if (!config_file.is_open()) { + spdlog::error("Failed to open config file for writing: {}", m_config_file_path); + m_client.publish(ack_topic, "{\"status\":\"error\", \"reason\":\"failed to write file\"}"); + return; + } + + // 验证JSON合法性 + try { + [[maybe_unused]] auto parsed_json = json::parse(payload); + } catch (const json::exception& e) { + spdlog::error("Failed to parse new config JSON: {}", e.what()); + m_client.publish(ack_topic, "{\"status\":\"error\", \"reason\":\"invalid json\"}"); + return; + } - // 通过 DeviceManager 发送控制指令 - bool success = m_device_manager.send_control_command(device_id, address, value); + config_file << payload; + config_file.close(); + + spdlog::info("Successfully persisted new config to '{}'.", m_config_file_path); - // (可选) 将执行结果反馈到 MQTT,形成闭环 - json response_json; - response_json["success"] = success; - response_json["original_command"] = cmd_json; - std::string response_topic = "proxy/command/result/" + device_id; - m_client.publish(response_topic, response_json.dump()); - - if (success) { - spdlog::info("Successfully dispatched command to device '{}'.", device_id); - } else { - spdlog::error("Failed to dispatch command to device '{}'.", device_id); + // 步骤 2: 只有在写入成功后,才应用到运行时 + m_device_manager.post_apply_device_configuration(payload); + + m_client.publish(ack_topic, "{\"status\":\"success\"}"); + + } catch (const std::exception& e) { + spdlog::error("Exception while persisting config: {}", e.what()); + m_client.publish(ack_topic, "{\"status\":\"error\", \"reason\":\"exception\"}"); } - } catch (const json::exception& e) { - spdlog::error("Failed to parse command JSON from topic '{}': {}", topic, e.what()); + // 调用 DeviceManager 的新方法 + // 这个方法必须是线程安全的,它会将实际的更新操作 post 到 g_io_context + m_device_manager.post_apply_device_configuration(payload); + + m_client.publish("proxy/control/my-edge-proxy-device-01/config/ack", "{\"status\":\"received\"}"); + + } + else if (MqttUtils::topic_matches(command_filter, topic)) { + spdlog::info("CommandHandler received device command on topic '{}'", topic); + + try { + auto cmd_json = json::parse(payload); + + // 校验命令格式 + if (!cmd_json.contains("deviceId") || !cmd_json.contains("address") || !cmd_json.contains("value")) { + spdlog::warn("Command JSON from topic '{}' is missing required fields.", topic); + return; + } + + std::string device_id = cmd_json.at("deviceId").get(); + uint16_t address = cmd_json.at("address").get(); + uint16_t value = cmd_json.at("value").get(); + bool success = m_device_manager.send_control_command(device_id, address, value); + + json response_json; + response_json["success"] = success; + response_json["original_command"] = cmd_json; + std::string response_topic = "proxy/command/result/" + device_id; + m_client.publish(response_topic, response_json.dump()); + + if (success) { + spdlog::info("Successfully dispatched command to device '{}'.", device_id); + } else { + spdlog::error("Failed to dispatch command to device '{}'.", device_id); + } + + } catch (const json::exception& e) { + spdlog::error("Failed to parse command JSON from topic '{}': {}", topic, e.what()); + } } + else { + spdlog::warn("CommandHandler received message on unhandled topic: {}", topic); + } + } \ No newline at end of file diff --git a/src/mqtt/handler/command_handler.h b/src/mqtt/handler/command_handler.h index 6288ea7..aef5810 100644 --- a/src/mqtt/handler/command_handler.h +++ b/src/mqtt/handler/command_handler.h @@ -5,9 +5,10 @@ class CommandHandler { public: - explicit CommandHandler(MqttClient& client, DeviceManager& deviceManager); + explicit CommandHandler(MqttClient& client, DeviceManager& deviceManager, std::string config_path); void handle(mqtt::const_message_ptr msg); private: MqttClient& m_client; DeviceManager& m_device_manager; + std::string m_config_file_path; }; \ No newline at end of file diff --git a/src/mqtt/mqtt_router.cpp b/src/mqtt/mqtt_router.cpp index 7dde2cc..3bed9f2 100644 --- a/src/mqtt/mqtt_router.cpp +++ b/src/mqtt/mqtt_router.cpp @@ -2,9 +2,14 @@ #include "spdlog/spdlog.h" #include "utils/mqtt_topic_matcher.h" -MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager) : m_client(client) { +MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager, std::string config_path) + : m_client(client), + m_config_file_path(std::move(config_path)) // +++ 新增:初始化路径 +{ m_data_handler = std::make_unique(m_client); - m_command_handler = std::make_unique(m_client, deviceManager); + + // +++ 修改:将 config_path 传递给 CommandHandler +++ + m_command_handler = std::make_unique(m_client, deviceManager, m_config_file_path); m_client.set_message_callback([this](mqtt::const_message_ptr msg) { this->on_message_arrived(std::move(msg)); @@ -15,20 +20,22 @@ MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager) : m_cli void MqttRouter::start() { m_client.subscribe("devices/+/data"); m_client.subscribe("commands/my-edge-proxy-device-01/#"); + m_client.subscribe("proxy/control/my-edge-proxy-device-01/config/update"); spdlog::info("MqttRouter has subscribed to all topics."); } void MqttRouter::on_message_arrived(mqtt::const_message_ptr msg) { const auto& topic = msg->get_topic(); - - // 定义我们的主题过滤器 + const std::string data_filter = "devices/+/data"; const std::string command_filter = "commands/my-edge-proxy-device-01/#"; + const std::string proxy_control_filter = "proxy/control/my-edge-proxy-device-01/config/update"; if (MqttUtils::topic_matches(data_filter, topic)) { m_data_handler->handle(msg); } - else if (MqttUtils::topic_matches(command_filter, topic)) { + else if (MqttUtils::topic_matches(command_filter, topic) || + MqttUtils::topic_matches(proxy_control_filter, topic)) { m_command_handler->handle(msg); } diff --git a/src/mqtt/mqtt_router.h b/src/mqtt/mqtt_router.h index 1741ee8..1f4ba06 100644 --- a/src/mqtt/mqtt_router.h +++ b/src/mqtt/mqtt_router.h @@ -6,15 +6,16 @@ class MqttRouter { public: - // 构造函数现在接收 MqttClient 和所有的处理器 - MqttRouter(MqttClient& client, DeviceManager& deviceManager); + MqttRouter(MqttClient& client, DeviceManager& deviceManager, std::string config_path); void start(); private: void on_message_arrived(mqtt::const_message_ptr msg); MqttClient& m_client; - // 使用智能指针持有处理器实例 + std::unique_ptr m_data_handler; std::unique_ptr m_command_handler; + + std::string m_config_file_path; }; \ No newline at end of file diff --git a/src/utils/mqtt_topic_matcher.cpp b/src/utils/mqtt_topic_matcher.cpp index c8bcf98..445f773 100644 --- a/src/utils/mqtt_topic_matcher.cpp +++ b/src/utils/mqtt_topic_matcher.cpp @@ -4,7 +4,6 @@ namespace MqttUtils { -// 一个内部辅助函数,用于按 '/' 分割字符串 std::vector split(const std::string& s, char delimiter) { std::vector tokens; std::string token; @@ -28,7 +27,6 @@ bool topic_matches(const std::string& filter, const std::string& topic) { for (size_t i = 0; i < filter_len; ++i) { if (i >= topic_len) { - // 如果过滤器比主题长,只有当最后一个是 '#' 时才可能匹配 return (i == filter_len - 1 && filter_levels[i] == "#"); } @@ -36,17 +34,13 @@ bool topic_matches(const std::string& filter, const std::string& topic) { const std::string& t_level = topic_levels[i]; if (f_level == "#") { - // '#' 匹配剩余所有层级 return true; } if (f_level != "+" && f_level != t_level) { - // 如果不是 '+' 且不相等,则不匹配 return false; } } - - // 如果过滤器所有层级都匹配完了,主题不能比过滤器更长 return topic_len == filter_len; } } // namespace MqttUtils \ No newline at end of file diff --git a/streamer_Server.txt b/streamer_Server.txt deleted file mode 100644 index c2d2bec..0000000 --- a/streamer_Server.txt +++ /dev/null @@ -1,346 +0,0 @@ -感谢您提供 `Dockerfile` 和 `docker-compose.yml` 文件。这是极其宝贵的信息,它让我能将之前的 C++ 架构蓝图,完美适配到您**已经高度定制化**的 RK3588 开发环境中。 - -[cite\_start]您提供的 `Dockerfile` 非常出色。它已经正确处理了最复杂的部分:通过 `PPA` [cite: 1, 2] [cite\_start]安装了特定于 Rockchip 的 GStreamer 插件 (`gstreamer1.0-rockchip` [cite: 2][cite\_start]) 以及 MPP/RGA 的**C++ 开发库** (`librockchip-mpp-dev`, `librga-dev` [cite: 2])。 - -您的 `docker-compose.yml` 也配置正确,通过 `devices` 和 `group_add` 将 NPU/VPU/RGA 硬件能力 成功映射到了容器内。 - -我们的架构决策保持不变:**将流媒体服务作为独立的 C++ 进程**。这可以确保它在崩溃时(例如 C++ 的段错误)**绝对不会**影响您现有的 Modbus/MQTT 核心服务。 - -基于您的环境,我为您制定了以下**高度定制**的开发计划。 - ------ - -### A. 核心架构规划 (基于您的 Docker 环境) - -1. **独立进程:** 我们将开发一个全新的 C++ 可执行文件(例如 `edge-streamer-cpp`)。 -2. [cite\_start]**共享容器:** 这个新进程将与您现有的(Modbus/MQTT)C++ 服务**运行在同一个 `edge-proxy-dev` 容器中**。这允许它们共享所有硬件资源 和已安装的库 [cite: 2],同时保持进程级的故障隔离。 -3. **开发工作流:** 您将在主机上编写代码(位于 `docker-compose.yml` 同级的目录中),代码会通过 `volumes` 自动同步到容器的 `/app` 目录。您将在容器内执行所有编译和运行操作。 -4. **服务间通信 (IPC) 优化:** - * **控制(C++ -\> C++):** 您现有的服务(Modbus)将通过 **HTTP**(例如 `http://localhost:8001/api/start`)来控制新的流媒体服务。 - * [cite\_start]**AI 结果(C++ -\> MQTT):** 新的流媒体服务在获得 AI 结果后,将使用 `paho.mqtt.cpp` 库(您的 `Dockerfile` 已经编译了它 [cite: 8])将 JSON 结果直接发布到 `docker-compose.yml` 中定义的 `mqtt-broker` 服务。这是最高效、最解耦的方案。 - ------ - -### B. 关键发现:环境依赖检查 (RKNN) - -[cite\_start]我发现了一个关键点:您的 `Dockerfile` 安装了 GStreamer 和 VPU/MPP 的开发库 (`librockchip-mpp-dev` [cite: 2]),这对于**视频编解码**是完美的。 - -但是,它**缺失了 AI 检测所需的 NPU (RKNN) C-API 开发库**(即 `rknn_api.h` 和 `librknnrt.so`)。 - -我的计划将基于您需要 NPU 加速 AI 检测的前提。因此,**我们的第一步必须是**将这些缺失的库添加到您的 Docker 镜像中。 - ------ - -### C. 阶段一:完善您的 Docker 环境 (添加 RKNN, HTTP, JSON) - -**目标:** 将缺失的 NPU C-API、C++ HTTP 库和 C++ JSON 库添加到您的 `edge-proxy-dev` 镜像中。 - -**步骤:** - -1. **准备 RKNN SDK (在主机上):** - - * 在您的主机项目目录(`docker-compose.yml` 所在的目录)下,创建一个新目录,例如 `docker/rknn_sdk/`。 - * 从您的 RK3588 SDK 中,复制 `include/rknn_api.h` 到 `docker/rknn_sdk/include/rknn_api.h`。 - * 复制 `lib/librknnrt.so` 到 `docker/rknn_sdk/lib/librknnrt.so`。 - -2. **修改您的 `docker/Dockerfile`:** - - * [cite\_start]找到构建 `paho.mqtt.cpp` [cite: 8] 的 `RUN` 指令块。 - * [cite\_start]在 `cmake --build build --target install && \` [cite: 9] [cite\_start]之后,`rm -rf /tmp/build-context` [cite: 9] 之前,插入以下代码: - - - - ```dockerfile - # ... (paho.mqtt.cpp 的 cmake install) - cmake --build build --target install && \ - - # --- 规划师建议:添加 RKNN, HTTP, JSON 库 --- - - # 1. 复制 RKNN C-API (NPU 库) (假设已按步骤1放置) - # 注意:COPY 指令的源路径是相对于 docker-compose.yml 的 context - COPY docker/rknn_sdk/include/rknn_api.h /usr/local/include/ - COPY docker/rknn_sdk/lib/librknnrt.so /usr/local/lib/ - - # 2. 安装 C++ HTTP 和 JSON 的 header-only 库 - # (需要先安装 curl) - apt-get update && apt-get install -y --no-install-recommends curl && \ - # C++ HTTP Lib - curl -L https://raw.githubusercontent.com/yhirose/cpp-httplib/master/httplib.h -o /usr/local/include/httplib.h && \ - # C++ JSON Lib - curl -L https://github.com/nlohmann/json/releases/latest/download/json.hpp -o /usr/local/include/json.hpp && \ - - # 3. 更新动态链接库缓存 (使系统找到 librknnrt.so) - ldconfig && \ - - # 4. 清理 apt 缓存 - apt-get remove -y curl && \ - apt-get autoremove -y && \ - - # (Dockerfile 原有的清理命令) - rm -rf /tmp/build-context - - # (Dockerfile 的剩余部分) - RUN rm -rf /var/lib/apt/lists/* - # ... - ``` - -3. **重建 Docker 镜像 (关键步骤):** - - * 在主机的终端中执行: - - - - ```bash - docker-compose build edge-proxy-dev - ``` - ------ - -### D. 阶段二:开发工作流程与项目设置 - -1. **启动开发环境:** - - ```bash - docker-compose up -d - ``` - - * 这将启动 `edge-proxy-dev` 和 `mqtt-broker` 两个服务。 - -2. **创建新服务目录 (在主机上):** - - * 在您的项目根目录(`docker-compose.yml` 所在位置)创建一个新目录: - - - - ```bash - mkdir edge-streamer-cpp - ``` - -3. **进入容器的开发 Shell:** - - ```bash - docker-compose exec edge-proxy-dev /bin/bash - ``` - -4. **在容器内初始化项目骨架:** - - * **注意:** 以下所有命令均在 **容器内的 shell** (`/app` 目录) 中执行。 - - - - ```bash - # (容器内) - # /app 目录是您主机项目的挂载点 - cd /app/edge-streamer-cpp - - # 创建源码和构建目录 - mkdir src include build - - # 创建初始文件 (您将在主机上编辑它们) - touch src/main.cpp - touch src/StreamManager.cpp - touch include/StreamManager.h - touch CMakeLists.txt - ``` - -5. **开始编码:** - - * 现在,在您的**主机**上,使用您喜欢的 IDE (如 VS Code) 打开 `edge-streamer-cpp` 目录,开始编辑 `CMakeLists.txt` 和 `.cpp` / `.h` 文件。 - ------ - -### E. 阶段三:配置 CMakeLists.txt - -[cite\_start]**目标:** 配置 CMake,使其能正确链接 GStreamer [cite: 2][cite\_start]、RKNN (阶段 C 添加的)、Paho MQTT [cite: 7, 8] 和 HTTP/JSON 库。 - - * 将以下内容粘贴到您在**主机**上打开的 `edge-streamer-cpp/CMakeLists.txt` 文件中: - - ```cmake - cmake_minimum_required(VERSION 3.10) - project(EdgeStreamer CXX) - - set(CMAKE_CXX_STANDARD 17) - set(CMAKE_CXX_STANDARD_REQUIRED ON) - - # [cite_start]--- 1. 查找 GStreamer (来自 Dockerfile) [cite: 2] --- - find_package(PkgConfig REQUIRED) - pkg_check_modules(GST REQUIRED gstreamer-1.0 gstreamer-app-1.0) - - # [cite_start]--- 2. 查找 Paho MQTT (来自 Dockerfile) [cite: 7, 8] --- - # 您的 Dockerfile 编译并安装了它,我们可以直接 find_package - find_package(paho-mqttpp3 REQUIRED) # C++ 库 - find_package(paho-mqtt-c REQUIRED) # C 库 (依赖) - - # --- 3. 包含 RKNN, HTTP, JSON (来自 阶段C) --- - # 这些头文件已在 /usr/local/include,会自动被 C++ 编译器找到 - include_directories( - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${GST_INCLUDE_DIRS} - /usr/local/include - ) - - # --- 4. 定义可执行文件 --- - add_executable(edge-streamer - src/main.cpp - src/StreamManager.cpp - ) - - # --- 5. 链接所有库 --- - target_link_libraries(edge-streamer - PRIVATE - # GStreamer - ${GST_LIBRARIES} - - # Paho MQTT (C++ 和 C) - paho-mqttpp3 - paho-mqtt-c - - # RKNN (来自 librknnrt.so) - rknnrt - - # 线程 (GStreamer, httplib, Paho 都需要) - pthread - - # [cite_start]Paho-MQTT SSL 依赖 [cite: 9] [cite_start](Dockerfile 有 libssl-dev [cite: 2]) - ssl - crypto - ) - ``` - ------ - -### F. 阶段四:C++ 核心代码实现 (IPC 调整) - -您在 `src/` 和 `include/` 中的代码逻辑与我上一个计划(C++ 核心版)非常相似,但**IPC 部分将进行关键优化**: - - * **`main.cpp`**: - - * 完全不变。使用 `httplib.h` 在 `8001` 端口(或您选择的端口)启动一个 HTTP 服务,用于接收您现有 Modbus 服务的 "Start/Stop Stream" 命令。 - - * **`include/StreamManager.h`**: - - * 需要添加 `paho.mqtt.cpp` 的头文件和成员变量: - - - - ```cpp - #include // Paho MQTT C++ - // ... - class StreamManager { - // ... - private: - // ... (GstElement* 等) - - // --- 新增 MQTT 客户端 --- - const std::string m_mqtt_server_address = "tcp://mqtt-broker:1883"; // - const std::string m_mqtt_client_id = "edge-streamer-ai"; - mqtt::async_client m_mqtt_client; - }; - ``` - - * **`src/StreamManager.cpp` (关键的 AI 回调调整):** - - * 在 `StreamManager` 的构造函数或初始化方法中,连接到 MQTT Broker。 - * 在 GStreamer 的 `on_new_sample_from_sink` 回调函数中,当 RKNN 推理完成后: - - - - ```cpp - // (伪代码) - // 静态 GStreamer 回调函数 - static GstFlowReturn on_new_sample_from_sink(GstAppSink *sink, gpointer user_data) { - StreamManager *manager = static_cast(user_data); - - // ... (1. 拉取 GstBuffer) ... - - // ... (2. 调用 RKNN C-API 进行推理) ... - // rknn_outputs_get(ctx, 1, outputs, NULL); - - // ... (3. (TODO) 将 'outputs' 格式化为 JSON 字符串) ... - // nlohmann::json ai_result; - // ai_result["stream_id"] = manager->get_stream_id(); - // ai_result["object_count"] = ...; - // std::string payload = ai_result.dump(); - - // 4. (优化) 将 JSON 结果发布到内部 MQTT Broker - try { - std::string topic = "ai/results/" + manager->get_stream_id(); - manager->get_mqtt_client().publish(topic, payload); - } catch (const mqtt::exception& exc) { - std::cerr << "Error publishing to MQTT: " << exc.what() << std::endl; - } - - // ... (5. 释放 GstBuffer 和 RKNN outputs) ... - - return GST_FLOW_OK; - } - ``` - ------ - -### G. 阶段五:编译与运行 (容器内) - -在您**主机**上编写完代码后,回到您**容器内的 shell** (`docker-compose exec ...` 的那个)。 - -1. **编译项目 (在容器内):** - - ```bash - # (容器内) - cd /app/edge-streamer-cpp/build - - # 运行 CMake (仅需一次) - cmake .. - - # 编译 (每次代码变更后执行) - make -j$(nproc) - ``` - -2. **运行新服务 (在容器内):** - - ```bash - # (容器内) - # 编译好的可执行文件位于 build/ 目录 - ./edge-streamer - # 您应该会看到 HTTP API 和 GStreamer 启动的日志 - ``` - -3. **测试 (在 *第二个* 容器 Shell 中):** - - * 在**主机**上打开一个**新的**终端,再次 `exec` 进同一个容器: - - - - ```bash - docker-compose exec edge-proxy-dev /bin/bash - ``` - - * **A. 测试 API (控制):** - ```bash - # (第二个容器内) - # (需要先 apt-get install curl) - curl -X POST http://localhost:8001/api/v1/stream/start \ - -d '{"stream_id":"cam1", "rtsp_url":"rtsp://..."}' - ``` - * **B. 监听 AI 结果 (MQTT):** - ```bash - # (第二个容器内) - # (需要先 apt-get install mosquitto-clients) - mosquitto_sub -h mqtt-broker -t "ai/results/#" -v - ``` - * 您现在应该能在一个终端看到服务日志,在另一个终端看到 AI 推理结果。 - ------ - -### H. 阶段六:生产部署 (可选) - -您当前的 `docker-compose.yml` 使用 `command: sleep infinity`,这非常适合开发。 - -当您准备部署时,您需要一个进程管理器来同时启动和监控您的**两个 C++ 服务**(Modbus服务 和 `edge-streamer`服务)。 - -**推荐方案:** - -1. 修改 `Dockerfile` 以安装 `supervisor` (`apt-get install -y supervisor`)。 -2. 创建一个 `supervisord.conf` 文件,配置 `[program:modbus_service]` 和 `[program:streamer_service]`。 -3. 修改 `docker-compose.yml`,将 `command:` 更改为 `"/usr/bin/supervisord -c /etc/supervisor/supervisord.conf"`。 - -这将确保两个独立的服务都在容器启动时自动运行,并能在崩溃时自动重启。 \ No newline at end of file