From 3aaec6a42c0535988d84124ec9f592f48a5c889b Mon Sep 17 00:00:00 2001 From: GuanYuankai Date: Tue, 14 Oct 2025 16:45:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8=E9=85=8D=E7=BD=AE=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E6=9D=A5=E7=AE=A1=E7=90=86=E8=AE=BE=E5=A4=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 3 ++ config/devices.json | 52 +++++++++++++++++++ src/device_manager.cc | 114 ++++++++++++++++++++++++++++++++++++++++++ src/device_manager.h | 47 +++++++++++++++++ src/main.cpp | 107 ++++++++++++--------------------------- 5 files changed, 247 insertions(+), 76 deletions(-) create mode 100644 config/devices.json create mode 100644 src/device_manager.cc create mode 100644 src/device_manager.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f44ea06..609ec12 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,6 +23,9 @@ add_library(edge_proxy_lib STATIC # TCP通讯层 src/network/tcp_server.cc + # --- 设备管理模块 --- + src/device_manager.cc + # 小工具 src/utils/mqtt_topic_matcher.cpp diff --git a/config/devices.json b/config/devices.json new file mode 100644 index 0000000..6247752 --- /dev/null +++ b/config/devices.json @@ -0,0 +1,52 @@ +{ + "modbus_rtu_devices": [ + { + "enabled": true, + "device_id": "rtu_temp_sensor_lab", + "port_path": "/dev/ttyS7", + "baud_rate": 9600, + "slave_id": 1, + "poll_interval_ms": 5000, + "data_points": [ + {"name": "temperature", "address": 0, "type": "INT16", "scale": 0.1}, + {"name": "humidity", "address": 1, "type": "UINT16", "scale": 0.1} + ] + }, + { + "enabled": false, + "device_id": "water_meter_main_gate", + "port_path": "/dev/ttyS7", + "baud_rate": 9600, + "slave_id": 5, + "poll_interval_ms": 10000, + "data_points": [ + {"name": "total_flow", "address": 256, "type": "FLOAT32", "scale": 1.0} + ] + }, + { + "enabled": false, + "device_id": "backup_counter", + "port_path": "/dev/ttyS7", + "baud_rate": 9600, + "slave_id": 10, + "poll_interval_ms": 1000, + "data_points": [ + {"name": "count", "address": 32, "type": "UINT32"} + ] + } + ], + "modbus_tcp_devices": [ + { + "enabled": false, + "device_id": "plc_workshop1", + "ip_address": "192.168.1.120", + "port": 502, + "slave_id": 1, + "poll_interval_ms": 1000, + "data_points": [ + {"name": "motor_speed", "address": 100, "type": "UINT16"}, + {"name": "pressure", "address": 102, "type": "FLOAT32", "scale": 0.01} + ] + } + ] +} \ No newline at end of file diff --git a/src/device_manager.cc b/src/device_manager.cc new file mode 100644 index 0000000..2561277 --- /dev/null +++ b/src/device_manager.cc @@ -0,0 +1,114 @@ +// 文件名: src/device_manager.cc +#include "device_manager.h" +#include "modbus/modbus_common.h" +#include "spdlog/spdlog.h" +#include +#include +#include + +using json = nlohmann::json; + +// 辅助函数:将JSON中的字符串类型映射到C++枚举 +static ModbusDataType string_to_modbus_data_type(const std::string& type_str) { + static const std::map type_map = { + {"UINT16", ModbusDataType::UINT16}, + {"INT16", ModbusDataType::INT16}, + {"UINT32", ModbusDataType::UINT32}, + {"INT32", ModbusDataType::INT32}, + {"FLOAT32", ModbusDataType::FLOAT32} + }; + auto it = type_map.find(type_str); + if (it != type_map.end()) { + return it->second; + } + throw std::runtime_error("Unknown ModbusDataType string: " + type_str); +} + + +DeviceManager::DeviceManager(boost::asio::io_context& io_context, ReportDataCallback report_cb) + : m_io_context(io_context), m_report_callback(std::move(report_cb)) {} + +DeviceManager::~DeviceManager() { + stop_all(); +} + +void DeviceManager::load_and_start(const std::string& config_path) { + spdlog::info("Loading device configuration from '{}'...", config_path); + std::ifstream config_file(config_path); + if (!config_file.is_open()) { + spdlog::critical("Failed to open configuration file: {}", config_path); + throw std::runtime_error("Configuration file not found."); + } + + try { + json config_json = json::parse(config_file); + + // --- 加载 Modbus RTU 设备 --- + if (config_json.contains("modbus_rtu_devices")) { + for (const auto& dev_json : config_json["modbus_rtu_devices"]) { + if (!dev_json.value("enabled", false)) continue; + + ModbusRtuDeviceConfig config; + config.device_id = dev_json.at("device_id").get(); + config.port_path = dev_json.at("port_path").get(); + config.baud_rate = dev_json.at("baud_rate").get(); + config.slave_id = dev_json.at("slave_id").get(); + config.poll_interval_ms = dev_json.at("poll_interval_ms").get(); + + for (const auto& dp_json : dev_json.at("data_points")) { + config.data_points.push_back({ + dp_json.at("name").get(), + (uint16_t)dp_json.at("address").get(), + string_to_modbus_data_type(dp_json.at("type").get()), + dp_json.value("scale", 1.0) // .value for optional fields + }); + } + + auto service = std::make_unique(config, m_report_callback); + service->start(); + m_rtu_services.push_back(std::move(service)); + spdlog::info("Started Modbus RTU service for device '{}'.", config.device_id); + } + } + + // --- 加载 Modbus TCP 设备 --- + if (config_json.contains("modbus_tcp_devices")) { + for (const auto& dev_json : config_json["modbus_tcp_devices"]) { + if (!dev_json.value("enabled", false)) continue; + + ModbusDeviceConfig config; // This is the TCP config struct + config.device_id = dev_json.at("device_id").get(); + config.ip_address = dev_json.at("ip_address").get(); + config.port = dev_json.at("port").get(); + config.slave_id = dev_json.at("slave_id").get(); + config.poll_interval_ms = dev_json.at("poll_interval_ms").get(); + + // Note: The current ModbusMasterPoller doesn't use data_points yet. + // This is a point for future enhancement to make it also configuration-driven. + // For now, we just start it based on connection params. + // You would need to refactor ModbusMasterPoller similarly to ModbusRtuPollerService + // to make it fully generic. + + auto poller = std::make_shared(m_io_context, config, m_report_callback); + poller->start(); + m_tcp_pollers.push_back(poller); + spdlog::info("Started Modbus TCP service for device '{}'.", config.device_id); + } + } + + } catch (const json::exception& e) { + spdlog::critical("Failed to parse JSON configuration: {}", e.what()); + throw; + } +} + +void DeviceManager::stop_all() { + spdlog::info("Stopping all device services..."); + for (auto& service : m_rtu_services) { + service->stop(); + } + // For Asio-based objects, stopping is usually handled by stopping the io_context. + // If they have explicit stop logic, call it here. + m_tcp_pollers.clear(); + spdlog::info("All device services stopped."); +} \ No newline at end of file diff --git a/src/device_manager.h b/src/device_manager.h new file mode 100644 index 0000000..0151c9c --- /dev/null +++ b/src/device_manager.h @@ -0,0 +1,47 @@ +// 文件名: src/device_manager.h +#ifndef DEVICE_MANAGER_H +#define DEVICE_MANAGER_H + +#include "protocol/iprotocol_adapter.h" +#include "modbus/modbus_rtu_poller_service.h" +#include "modbus/modbus_master_poller.h" +#include +#include +#include +#include + +/** + * @brief 设备管理模块 + * * 负责从配置文件加载设备信息,并动态创建、启动和停止相应的 + * 设备服务实例(如Modbus轮询器)。 + */ +class DeviceManager { +public: + DeviceManager(boost::asio::io_context& io_context, ReportDataCallback report_cb); + ~DeviceManager(); + + // 禁止拷贝和赋值 + DeviceManager(const DeviceManager&) = delete; + DeviceManager& operator=(const DeviceManager&) = delete; + + /** + * @brief 从JSON配置文件加载所有设备并启动服务 + * @param config_path JSON配置文件的路径 + */ + void load_and_start(const std::string& config_path); + + /** + * @brief 安全地停止所有正在运行的设备服务 + */ + void stop_all(); + +private: + boost::asio::io_context& m_io_context; + ReportDataCallback m_report_callback; + + // 用于存储正在运行的服务实例,以管理其生命周期 + std::vector> m_rtu_services; + std::vector> m_tcp_pollers; +}; + +#endif // DEVICE_MANAGER_H \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 98eccc2..cc7e1b3 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,76 +1,66 @@ +// main.cpp + #include "network/tcp_server.h" #include "mqtt/mqtt_client.h" #include "mqtt/mqtt_router.h" #include "systemMonitor/system_monitor.h" #include "spdlog/spdlog.h" -#include "modbus/modbus_master_poller.h" -#include "modbus/modbus_rtu_poller_service.h" +#include "device_manager.h" // <<< MODIFIED: 包含新的设备管理器头文件 #include -#include +#include #include #include -#include +#include // 用于 ASIO 服务的全局 io_context boost::asio::io_context g_io_context; /** * @brief 处理终止信号 (SIGINT, SIGTERM). - * - * 这个函数会优雅地停止 Boost.Asio 的 io_context, - * 这将导致 main() 函数中的 io_context.run() 调用解除阻塞, - * 从而允许程序干净地退出。 - * - * @param signum 接收到的信号编号。 */ void signalHandler(int signum) { spdlog::warn("Interrupt signal ({}) received. Shutting down.", signum); - // 向 io_context 提交一个停止事件。这是一种线程安全的方式 - // 来告诉 io_context 停止其事件循环。 g_io_context.stop(); } +/** + * @brief 周期性轮询系统状态并发布到 MQTT + */ void poll_system_metrics( boost::asio::steady_timer& timer, SystemMonitor::SystemMonitor& monitor, MqttClient& mqtt_client ) { - // a. 采集数据 - auto cpu_util = monitor.getCpuUtilization(); + auto cpu_util = monitor.getCpuUtilization(); auto mem_info = monitor.getMemoryInfo(); double mem_total_gb = mem_info.total_kb / 1024.0 / 1024.0; - - // b. (示例) 将数据通过 MQTT 发送出去 + std::string topic = "proxy/system_status"; - std::string payload = "{\"cpu_usage\":" + std::to_string(cpu_util.totalUsagePercentage) + + std::string payload = "{\"cpu_usage\":" + std::to_string(cpu_util.totalUsagePercentage) + ",\"mem_total_gb\":" + std::to_string(mem_total_gb) + "}"; mqtt_client.publish(topic, payload); spdlog::debug("System metrics published."); - // c. 将定时器重置到 15 秒后 timer.expires_at(timer.expiry() + std::chrono::seconds(15)); - - // d. 异步等待定时器,到期后再次调用本函数,形成循环 timer.async_wait(std::bind(poll_system_metrics, std::ref(timer), std::ref(monitor), std::ref(mqtt_client))); } int main(int argc, char* argv[]) { try { - spdlog::set_level(spdlog::level::debug); // 设置日志级别为 debug,方便调试 + spdlog::set_level(spdlog::level::debug); spdlog::info("Edge Proxy starting up..."); } catch (const spdlog::spdlog_ex& ex) { std::cerr << "Log initialization failed: " << ex.what() << std::endl; return 1; } - // 注册信号处理器 signal(SIGINT, signalHandler); signal(SIGTERM, signalHandler); try { - constexpr uint16_t tcp_port = 8888; + // --- 1. 初始化核心服务 --- MqttClient mqtt_client("tcp://mqtt-broker:1883", "edge-proxy-main-client"); MqttRouter mqtt_router(mqtt_client); std::vector listen_ports = { 8888 }; @@ -80,67 +70,32 @@ int main(int argc, char* argv[]) { mqtt_client.connect(); mqtt_router.start(); - monitor.getCpuUtilization(); - boost::asio::steady_timer timer(g_io_context, std::chrono::seconds(15)); - timer.async_wait(std::bind(poll_system_metrics, std::ref(timer), std::ref(monitor), std::ref(mqtt_client))); + // --- 2. 启动系统状态监控定时器 --- + monitor.getCpuUtilization(); // 首次调用以初始化 + boost::asio::steady_timer system_monitor_timer(g_io_context, std::chrono::seconds(15)); + system_monitor_timer.async_wait(std::bind(poll_system_metrics, std::ref(system_monitor_timer), std::ref(monitor), std::ref(mqtt_client))); + // --- 3. 创建统一的数据上报回调函数 --- auto report_to_mqtt = [&](const UnifiedData& data) { std::string topic = "devices/" + data.device_id + "/data"; - mqtt_client.publish(topic, data.data_json, 1, false); + // 使用 post 确保 MQTT 发布操作在 io_context 的主事件循环中执行,保证线程安全 + g_io_context.post([&, topic, payload = data.data_json]() { + mqtt_client.publish(topic, payload, 1, false); + }); }; - // 配置并启动Modbus轮询器 - // ModbusDeviceConfig temp_sensor_config = { - // .device_id = "temp_sensor_workshop1", - // .ip_address = "192.168.1.120", // 您的Modbus设备的IP地址 - // .port = 502, // Modbus TCP标准端口 - // .slave_id = 1, // 设备的从站ID - // .start_address = 0, // 要读取的第一个寄存器的地址 - // .quantity = 8, // 从起始地址开始,总共读取多少个寄存器 - // .poll_interval_ms = 2000 // 每2000毫秒轮询一次 - // }; - - // 创建轮询器实例 (使用std::make_shared确保生命周期) - // auto poller = std::make_shared(g_io_context, temp_sensor_config, report_to_mqtt); - // poller->start(); - - - // ========================================================================== - // modbus rtu轮询服务启动 - ModbusRtuDeviceConfig temp_sensor_config = { - .device_id = "rtu_temp_sensor_lab", - .port_path = "/dev/ttyS7", - .baud_rate = 9600, - .slave_id = 0x01, - .poll_interval_ms = 5000, - .data_points = { - {"temperature", 0x0000, ModbusDataType::INT16, 0.1}, // 地址0, 16位有符号, 结果乘以0.1 - {"humidity", 0x0001, ModbusDataType::UINT16, 0.1} // 地址1, 16位无符号, 结果乘以0.1 - } - }; - ModbusRtuPollerService temp_sensor_service(temp_sensor_config, report_to_mqtt); - temp_sensor_service.start(); - - // 示例2:配置一个水表 - // ModbusRtuDeviceConfig water_meter_config = { - // .device_id = "water_meter_main_gate", - // .port_path = "/dev/ttyS7", // 假设在同一条RS485总线上 - // .baud_rate = 9600, - // .slave_id = 0x05, // 不同的从站ID - // .poll_interval_ms = 10000, - // .data_points = { - // {"total_flow", 0x0100, ModbusDataType::FLOAT32, 1.0}, // 地址256, 32位浮点数 - // {"flow_rate", 0x0102, ModbusDataType::FLOAT32, 1.0} // 地址258, 32位浮点数 - // } - // }; - // ModbusRtuPollerService water_meter_service(water_meter_config, report_to_mqtt); - // water_meter_service.start(); - + // --- 4. 实例化设备管理器并从文件加载所有设备 --- + DeviceManager device_manager(g_io_context, report_to_mqtt); + // 默认从程序运行目录下的 "devices.json" 文件加载 + device_manager.load_and_start("../config/devices.json"); + // --- 5. 启动主事件循环 (程序将阻塞在这里直到被信号中断) --- spdlog::info("All services are running. Press Ctrl+C to exit."); - g_io_context.run(); + // --- 清理工作 --- + // io_context.run() 返回后,程序将退出 try 块。 + // device_manager 的析构函数会自动被调用,它会负责停止所有设备服务线程。 spdlog::info("Shutting down MQTT client..."); mqtt_client.disconnect(); @@ -151,4 +106,4 @@ int main(int argc, char* argv[]) { spdlog::info("Server has been shut down gracefully. Exiting."); return 0; -} \ No newline at end of file +} \ No newline at end of file