From 803bfcc377931a75f69229a15fda07320292fc70 Mon Sep 17 00:00:00 2001 From: GuanYuankai Date: Thu, 16 Oct 2025 08:47:55 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9modbus=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E5=87=BD=E6=95=B0=EF=BC=8C=E5=A2=9E=E5=8A=A0=E5=8F=8D=E5=90=91?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/deviceManager/device_manager.cc | 36 +++++++++++- src/deviceManager/device_manager.h | 9 +++ src/main.cpp | 46 ++++++--------- src/modbus/modbus_master_poller.cc | 75 +++++++++++++++++++++++-- src/modbus/modbus_master_poller.h | 10 +++- src/modbus/modbus_rtu_poller_service.cc | 71 ++++++++++++----------- src/modbus/modbus_rtu_poller_service.h | 12 +++- src/mqtt/handler/command_handler.cpp | 53 ++++++++++++----- src/mqtt/handler/command_handler.h | 7 ++- src/mqtt/handler/data_handler.cpp | 30 +++++----- src/mqtt/mqtt_router.cpp | 4 +- src/mqtt/mqtt_router.h | 2 +- 12 files changed, 249 insertions(+), 106 deletions(-) diff --git a/src/deviceManager/device_manager.cc b/src/deviceManager/device_manager.cc index 49a0a32..caa7edf 100644 --- a/src/deviceManager/device_manager.cc +++ b/src/deviceManager/device_manager.cc @@ -108,18 +108,23 @@ void DeviceManager::load_and_start(const std::string& config_path) { } void DeviceManager::stop_all() { - // --- <<< 添加锁 >>> --- std::lock_guard lock(m_mutex); spdlog::info("Stopping all device services..."); + for (auto& service : m_rtu_services) { service->stop(); } - // For Asio-based objects, stopping is usually handled by stopping the io_context. - // If they have explicit stop logic, call it here. + m_rtu_services.clear(); + + for (auto& poller : m_tcp_pollers) { + poller->stop(); + } m_tcp_pollers.clear(); + spdlog::info("All device services stopped."); } + std::vector DeviceManager::get_all_device_info() const { // 使用 lock_guard 确保在函数返回前互斥锁能被自动释放 std::lock_guard lock(m_mutex); @@ -160,4 +165,29 @@ std::vector DeviceManager::get_all_device_info() const { } return all_devices; +} + +bool DeviceManager::send_control_command(const std::string& device_id, uint16_t address, uint16_t value) { + std::lock_guard lock(m_mutex); + + // 首先,在 TCP poller 列表中查找 + for (const auto& poller : m_tcp_pollers) { + if (poller->get_config().device_id == device_id) { + spdlog::info("Found TCP device '{}'. Dispatching write command to address {}.", device_id, address); + poller->write_single_register(address, value); + return true; + } + } + + // 然后,在 RTU service 列表中查找 + for (const auto& service : m_rtu_services) { + if (service->get_config().device_id == device_id) { + spdlog::info("Found RTU device '{}'. Dispatching write command to address {}.", device_id, address); + service->write_single_register(address, value); + return true; + } + } + + spdlog::warn("send_control_command failed: Device with ID '{}' not found.", device_id); + return false; } \ No newline at end of file diff --git a/src/deviceManager/device_manager.h b/src/deviceManager/device_manager.h index b678617..ae9d265 100644 --- a/src/deviceManager/device_manager.h +++ b/src/deviceManager/device_manager.h @@ -55,6 +55,15 @@ public: */ void stop_all(); + /** + * @brief 向指定的Modbus设备发送一个写单个寄存器的命令 + * @param device_id 目标设备的唯一ID + * @param address 要写入的寄存器地址 + * @param value 要写入的值 + * @return 如果找到设备并成功分派命令,则返回true;否则返回false + */ + bool send_control_command(const std::string& device_id, uint16_t address, uint16_t value); + private: boost::asio::io_context& m_io_context; ReportDataCallback m_report_callback; diff --git a/src/main.cpp b/src/main.cpp index 23e9075..2aa6208 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -19,14 +19,6 @@ // 用于 ASIO 服务的全局 io_context boost::asio::io_context g_io_context; -/** - * @brief 处理终止信号 (SIGINT, SIGTERM). - */ -// void signalHandler(int signum) { -// spdlog::warn("Interrupt signal ({}) received. Shutting down.", signum); -// g_io_context.stop(); -// } - /** * @brief 周期性轮询系统状态并发布到 MQTT */ @@ -60,17 +52,29 @@ int main(int argc, char* argv[]) { return 1; } - // signal(SIGINT, signalHandler); - // signal(SIGTERM, signalHandler); - try { + DataCache data_cache; MqttClient mqtt_client("tcp://mqtt-broker:1883", "edge-proxy-main-client"); - MqttRouter mqtt_router(mqtt_client); + auto report_to_mqtt = [&](const UnifiedData& data) { + if (mqtt_client.is_connected()) { + // 网络正常,直接上报 + std::string topic = "devices/" + data.device_id + "/data"; + g_io_context.post([&, topic, payload = data.data_json]() { + mqtt_client.publish(topic, payload, 1, false); + }); + } else { + // 网络断开,写入缓存 + spdlog::warn("MQTT disconnected. Caching data for device '{}'.", data.device_id); + data_cache.add(data); + } + }; + + DeviceManager device_manager(g_io_context, report_to_mqtt); + MqttRouter mqtt_router(mqtt_client, device_manager); std::vector listen_ports = { 8888 }; TCPServer tcp_server(g_io_context, listen_ports, mqtt_client); SystemMonitor::SystemMonitor monitor; - DataCache data_cache; if (!data_cache.open("edge_data_cache.db")) { spdlog::critical("Failed to initialize data cache. Exiting."); return 1; @@ -90,23 +94,7 @@ int main(int argc, char* argv[]) { boost::asio::steady_timer system_monitor_timer(g_io_context, std::chrono::seconds(15)); system_monitor_timer.async_wait(std::bind(poll_system_metrics, std::ref(system_monitor_timer), std::ref(monitor), std::ref(mqtt_client))); - // --- 创建包含缓存逻辑的统一数据上报回调函数 >>> - auto report_to_mqtt = [&](const UnifiedData& data) { - if (mqtt_client.is_connected()) { - // 网络正常,直接上报 - std::string topic = "devices/" + data.device_id + "/data"; - g_io_context.post([&, topic, payload = data.data_json]() { - mqtt_client.publish(topic, payload, 1, false); - }); - } else { - // 网络断开,写入缓存 - spdlog::warn("MQTT disconnected. Caching data for device '{}'.", data.device_id); - data_cache.add(data); - } - }; - // 实例化设备管理器并从文件加载所有设备 (使用新的回调) - DeviceManager device_manager(g_io_context, report_to_mqtt); device_manager.load_and_start("../config/devices.json"); WebServer web_server(monitor, device_manager,8080); diff --git a/src/modbus/modbus_master_poller.cc b/src/modbus/modbus_master_poller.cc index cb21f41..807b987 100644 --- a/src/modbus/modbus_master_poller.cc +++ b/src/modbus/modbus_master_poller.cc @@ -31,14 +31,12 @@ void ModbusMasterPoller::start() { do_connect(); } -// --- <<< 新增 stop 方法实现 >>> --- + void ModbusMasterPoller::stop() { if (!m_is_running.exchange(false)) { - // 如果之前已经是 false,说明已经停止了,直接返回 return; } - - // 使用 post 确保取消操作在 io_context 的事件循环中执行,保证线程安全 + boost::asio::post(m_io_context, [this, self = shared_from_this()](){ spdlog::info("[Modbus TCP] Stopping poller for device '{}'...", m_config.device_id); @@ -54,7 +52,6 @@ void ModbusMasterPoller::stop() { }); } -// --- <<< 新增 get_config 和 is_running 实现 >>> --- const ModbusTcpDeviceConfig& ModbusMasterPoller::get_config() const { return m_config; } @@ -63,6 +60,74 @@ bool ModbusMasterPoller::is_running() const { return m_is_running; } +void ModbusMasterPoller::write_single_register(uint16_t address, uint16_t value) { + if (!m_is_running) { + spdlog::warn("[Modbus TCP] Device '{}': Poller is not running. Cannot write.", m_config.device_id); + return; + } + + // 使用 post 将写操作投递到 io_context 线程中执行,以保证线程安全 + boost::asio::post(m_io_context, [this, self = shared_from_this(), address, value]() { + spdlog::debug("[Modbus TCP] Device '{}': Writing value {} to address {}.", m_config.device_id, value, address); + + // 1. 创建一个独立的写缓冲区 + auto write_cmd_buffer = std::make_shared>(); + + // 2. 构建写命令PDU (功能码0x06) + // PDU = Address (2 bytes) + Value (2 bytes) + std::vector pdu; + pdu.push_back(static_cast((address >> 8) & 0xFF)); + pdu.push_back(static_cast(address & 0xFF)); + pdu.push_back(static_cast((value >> 8) & 0xFF)); + pdu.push_back(static_cast(value & 0xFF)); + + // 3. 构建MBAP头 + write_cmd_buffer->resize(7 + pdu.size()); + uint16_t tid = m_transaction_id++; // 原子地增加事务ID + uint16_t length = 1 + pdu.size(); // UnitID + PDU + + (*write_cmd_buffer)[0] = static_cast((tid >> 8) & 0xFF); + (*write_cmd_buffer)[1] = static_cast(tid & 0xFF); + (*write_cmd_buffer)[2] = 0; (*write_cmd_buffer)[3] = 0; // Protocol ID + (*write_cmd_buffer)[4] = static_cast((length >> 8) & 0xFF); + (*write_cmd_buffer)[5] = static_cast(length & 0xFF); + (*write_cmd_buffer)[6] = m_config.slave_id; + std::copy(pdu.begin(), pdu.end(), write_cmd_buffer->begin() + 7); + + // 4. 异步发送写命令 + boost::asio::async_write(m_socket, boost::asio::buffer(*write_cmd_buffer), + [this, self, tid, write_cmd_buffer](const boost::system::error_code& ec, std::size_t) { + if (!m_is_running || ec) { + if (ec != boost::asio::error::operation_aborted) { + spdlog::error("[{}] Write command failed: {}", m_config.device_id, ec.message()); + on_error("write_command"); + } + return; + } + + // 5. 异步读取响应 + auto read_cmd_buffer = std::make_shared>(); // 写响应通常是12字节 + boost::asio::async_read(m_socket, boost::asio::buffer(*read_cmd_buffer, 12), + [this, self, tid, read_cmd_buffer](const boost::system::error_code& ec, std::size_t) { + if (!m_is_running || ec) { + if (ec != boost::asio::error::operation_aborted) { + spdlog::error("[{}] Read command response failed: {}", m_config.device_id, ec.message()); + on_error("read_command_response"); + } + return; + } + // 简单检查一下事务ID是否匹配 + uint16_t resp_tid = ((*read_cmd_buffer)[0] << 8) | (*read_cmd_buffer)[1]; + if (resp_tid == tid) { + spdlog::info("[{}] Successfully wrote and received confirmation.", m_config.device_id); + } else { + spdlog::warn("[{}] Write command TID mismatch. Sent {}, Rcvd {}.", m_config.device_id, tid, resp_tid); + } + }); + }); + }); +} + void ModbusMasterPoller::do_connect() { auto self = shared_from_this(); tcp::resolver resolver(m_io_context); diff --git a/src/modbus/modbus_master_poller.h b/src/modbus/modbus_master_poller.h index 31efe0c..870b4dc 100644 --- a/src/modbus/modbus_master_poller.h +++ b/src/modbus/modbus_master_poller.h @@ -8,6 +8,7 @@ #include #include #include +#include class ModbusMasterPoller : public std::enable_shared_from_this { public: @@ -22,6 +23,13 @@ public: const ModbusTcpDeviceConfig& get_config() const; bool is_running() const; + /** + * @brief (线程安全) 异步地向设备写入单个寄存器 + * @param address 寄存器地址 + * @param value 要写入的值 + */ + void write_single_register(uint16_t address, uint16_t value); + private: void do_connect(); void do_poll(); @@ -40,7 +48,7 @@ private: std::vector m_write_buffer; std::array m_read_buffer; - uint16_t m_transaction_id = 0; + std::atomic m_transaction_id{0}; std::atomic m_is_running{false}; }; diff --git a/src/modbus/modbus_rtu_poller_service.cc b/src/modbus/modbus_rtu_poller_service.cc index 0f673bc..7ffef49 100644 --- a/src/modbus/modbus_rtu_poller_service.cc +++ b/src/modbus/modbus_rtu_poller_service.cc @@ -40,20 +40,29 @@ bool ModbusRtuPollerService::is_running() const { return !m_stop_flag && m_thread.joinable(); } +void ModbusRtuPollerService::write_single_register(uint16_t address, uint16_t value) { + std::lock_guard lock(m_client_mutex); + + try { + spdlog::debug("[Modbus RTU] Device '{}': Writing value {} to address {}.", m_config.device_id, value, address); + m_client.writeSingleRegister(m_config.slave_id, address, value); + spdlog::info("[Modbus RTU] Device '{}': Successfully wrote value {} to address {}.", m_config.device_id, value, address); + } catch (const std::exception& e) { + spdlog::error("[Modbus RTU] Device '{}': Failed to write to address {}: {}", m_config.device_id, address, e.what()); + } +} + void ModbusRtuPollerService::run() { - // 检查是否有数据点被配置 if (m_config.data_points.empty()) { spdlog::warn("[Modbus RTU] Device '{}' has no data points configured. Thread will not run.", m_config.device_id); return; } - // 设置并打开串口 if (!m_client.setPortSettings(m_config.port_path, m_config.baud_rate)) { spdlog::error("[Modbus RTU] Failed to set up serial port '{}' for device '{}'. Thread exiting.", m_config.port_path, m_config.device_id); return; } - // 1. 为了提高效率,我们计算出需要一次性读取的连续寄存器范围 auto [min_it, max_it] = std::minmax_element(m_config.data_points.begin(), m_config.data_points.end(), [](const DataPointConfig& a, const DataPointConfig& b) { return a.address < b.address; @@ -61,55 +70,49 @@ void ModbusRtuPollerService::run() { uint16_t start_address = min_it->address; uint16_t last_address = max_it->address; - // 考虑多寄存器数据类型(如FLOAT32)会占用额外的寄存器 if (max_it->type == ModbusDataType::UINT32 || max_it->type == ModbusDataType::INT32 || max_it->type == ModbusDataType::FLOAT32) { - last_address += 1; // 32位数据占用2个16位寄存器 + last_address += 1; } uint16_t quantity = last_address - start_address + 1; spdlog::info("[Modbus RTU] Device '{}' will poll {} registers starting from address {}.", m_config.device_id, quantity, start_address); - // 线程主循环 while (!m_stop_flag) { - try { - // 2. 一次性读取所有需要的连续寄存器 - std::vector raw_registers = m_client.readHoldingRegisters(m_config.slave_id, start_address, quantity); - - // 3. 将返回的寄存器数组转换为 (地址 -> 值) 的映射,方便解析器按地址查找 - std::map registers_map; - for (uint16_t i = 0; i < raw_registers.size(); ++i) { - registers_map[start_address + i] = raw_registers[i]; + { // <--- 创建一个新的作用域来控制锁的生命周期 + std::lock_guard lock(m_client_mutex); + try { + std::vector raw_registers = m_client.readHoldingRegisters(m_config.slave_id, start_address, quantity); + + std::map registers_map; + for (uint16_t i = 0; i < raw_registers.size(); ++i) { + registers_map[start_address + i] = raw_registers[i]; + } + + nlohmann::json data_j = GenericModbusParser::parse(registers_map, m_config.data_points); + + UnifiedData report_data; + report_data.device_id = m_config.device_id; + report_data.timestamp_ms = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + report_data.data_json = data_j.dump(); + + if (m_report_callback) { + m_report_callback(report_data); + } + + } catch (const std::exception& e) { + spdlog::error("[Modbus RTU] Error during communication with device '{}': {}", m_config.device_id, e.what()); } + } // <--- 锁在这里被释放 - // 4. 使用通用的、配置驱动的解析器进行解析 - nlohmann::json data_j = GenericModbusParser::parse(registers_map, m_config.data_points); - - // 5. 封装成 UnifiedData 结构并上报 - UnifiedData report_data; - report_data.device_id = m_config.device_id; - report_data.timestamp_ms = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - report_data.data_json = data_j.dump(); - - if (m_report_callback) { - m_report_callback(report_data); - } - - } catch (const std::exception& e) { - spdlog::error("[Modbus RTU] Error during communication with device '{}': {}", m_config.device_id, e.what()); - } - - // 在本线程中等待,不影响主线程的事件循环 auto wake_up_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(m_config.poll_interval_ms); while (std::chrono::steady_clock::now() < wake_up_time) { if (m_stop_flag) { break; } - // 每次只“打盹”100毫秒 std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } - // 线程退出前关闭串口 m_client.closePort(); spdlog::info("[RTU Poller {}] Run loop finished. Thread exiting.", m_config.device_id); } \ No newline at end of file diff --git a/src/modbus/modbus_rtu_poller_service.h b/src/modbus/modbus_rtu_poller_service.h index 2eca36f..c5ebe49 100644 --- a/src/modbus/modbus_rtu_poller_service.h +++ b/src/modbus/modbus_rtu_poller_service.h @@ -8,6 +8,7 @@ #include #include #include +#include /** * @brief Modbus RTU 轮询服务类 @@ -46,7 +47,6 @@ public: */ void stop(); - // --- <<< 新增方法 >>> --- /** * @brief 获取设备的配置信息 (const-ref to avoid copy) */ @@ -56,7 +56,13 @@ public: * @brief 检查轮询服务是否正在运行 */ bool is_running() const; - // --- <<< END >>> --- + + /** + * @brief 向设备写入单个寄存器 + * @param address 寄存器地址 + * @param value 要写入的值 + */ + void write_single_register(uint16_t address, uint16_t value); private: /** * @brief 线程的主循环函数 @@ -71,6 +77,8 @@ private: std::thread m_thread; std::atomic m_stop_flag{false}; + + std::mutex m_client_mutex; }; #endif // MODBUS_RTU_POLLER_SERVICE_H \ No newline at end of file diff --git a/src/mqtt/handler/command_handler.cpp b/src/mqtt/handler/command_handler.cpp index 444ca2c..ebf0275 100644 --- a/src/mqtt/handler/command_handler.cpp +++ b/src/mqtt/handler/command_handler.cpp @@ -1,26 +1,51 @@ +// 文件名: command_handler.cpp #include "command_handler.h" #include "spdlog/spdlog.h" #include "vendor/nlohmann/json.hpp" using json = nlohmann::json; -CommandHandler::CommandHandler(MqttClient& client) : m_client(client) {} +// 构造函数实现更新 +CommandHandler::CommandHandler(MqttClient& client, DeviceManager& deviceManager) + : m_client(client), m_device_manager(deviceManager) {} void CommandHandler::handle(mqtt::const_message_ptr msg) { - spdlog::info("DataHandler is processing message from topic: {}", msg->get_topic()); + const std::string topic = msg->get_topic(); + const std::string payload = msg->get_payload_str(); + + spdlog::info("CommandHandler received a command on topic '{}'", topic); + try { - // (这里是原来 MqttRouter 中处理数据上报的逻辑) - auto received_json = json::parse(msg->get_payload_str()); - if (received_json.contains("temperature")) { - double temp = received_json["temperature"]; - json forward_json; - forward_json["source_topic"] = msg->get_topic(); - forward_json["processed_temp"] = temp; - forward_json["timestamp"] = time(0); - const std::string output_topic = "proxy/processed/temperature"; - m_client.publish(output_topic, forward_json.dump()); + auto cmd_json = json::parse(payload); + + // 校验命令格式 + if (!cmd_json.contains("deviceId") || !cmd_json.contains("address") || !cmd_json.contains("value")) { + spdlog::warn("Command JSON from topic '{}' is missing required fields.", topic); + return; } - } catch (const json::parse_error& e) { - spdlog::error("JSON parse error in DataHandler: {}", e.what()); + + // 解析命令 + std::string device_id = cmd_json.at("deviceId").get(); + uint16_t address = cmd_json.at("address").get(); + uint16_t value = cmd_json.at("value").get(); + + // 通过 DeviceManager 发送控制指令 + bool success = m_device_manager.send_control_command(device_id, address, value); + + // (可选) 将执行结果反馈到 MQTT,形成闭环 + json response_json; + response_json["success"] = success; + response_json["original_command"] = cmd_json; + std::string response_topic = "proxy/command/result/" + device_id; + m_client.publish(response_topic, response_json.dump()); + + if (success) { + spdlog::info("Successfully dispatched command to device '{}'.", device_id); + } else { + spdlog::error("Failed to dispatch command to device '{}'.", device_id); + } + + } catch (const json::exception& e) { + spdlog::error("Failed to parse command JSON from topic '{}': {}", topic, e.what()); } } \ No newline at end of file diff --git a/src/mqtt/handler/command_handler.h b/src/mqtt/handler/command_handler.h index ee38b9b..6288ea7 100644 --- a/src/mqtt/handler/command_handler.h +++ b/src/mqtt/handler/command_handler.h @@ -1,10 +1,13 @@ +// 文件名: command_handler.h #pragma once -#include "../mqtt_client.h" // 注意 #include 路径的变化 +#include "../mqtt_client.h" +#include "deviceManager/device_manager.h" class CommandHandler { public: - explicit CommandHandler(MqttClient& client); + explicit CommandHandler(MqttClient& client, DeviceManager& deviceManager); void handle(mqtt::const_message_ptr msg); private: MqttClient& m_client; + DeviceManager& m_device_manager; }; \ No newline at end of file diff --git a/src/mqtt/handler/data_handler.cpp b/src/mqtt/handler/data_handler.cpp index 96f1734..1c3a9e2 100644 --- a/src/mqtt/handler/data_handler.cpp +++ b/src/mqtt/handler/data_handler.cpp @@ -7,20 +7,24 @@ using json = nlohmann::json; DataHandler::DataHandler(MqttClient& client) : m_client(client) {} void DataHandler::handle(mqtt::const_message_ptr msg) { - spdlog::info("DataHandler is processing message from topic: {}", msg->get_topic()); + const auto& input_topic = msg->get_topic(); + const auto& payload_str = msg->get_payload_str(); + + spdlog::debug("DataHandler received data from topic: {}", input_topic); + try { - // (这里是原来 MqttRouter 中处理数据上报的逻辑) - auto received_json = json::parse(msg->get_payload_str()); - if (received_json.contains("temperature")) { - double temp = received_json["temperature"]; - json forward_json; - forward_json["source_topic"] = msg->get_topic(); - forward_json["processed_temp"] = temp; - forward_json["timestamp"] = time(0); - const std::string output_topic = "proxy/processed/temperature"; - m_client.publish(output_topic, forward_json.dump()); - } + auto payload_json = json::parse(payload_str); + + payload_json["_proxy_source_topic"] = input_topic; + payload_json["_proxy_processed_ts"] = time(nullptr); + + const std::string output_topic = "proxy/processed/data"; + + m_client.publish(output_topic, payload_json.dump()); + + spdlog::debug("Successfully forwarded data from '{}' to '{}'", input_topic, output_topic); + } catch (const json::parse_error& e) { - spdlog::error("JSON parse error in DataHandler: {}", e.what()); + spdlog::error("JSON parse error in DataHandler for topic '{}': {}", input_topic, e.what()); } } \ No newline at end of file diff --git a/src/mqtt/mqtt_router.cpp b/src/mqtt/mqtt_router.cpp index 728e3d8..7dde2cc 100644 --- a/src/mqtt/mqtt_router.cpp +++ b/src/mqtt/mqtt_router.cpp @@ -2,9 +2,9 @@ #include "spdlog/spdlog.h" #include "utils/mqtt_topic_matcher.h" -MqttRouter::MqttRouter(MqttClient& client) : m_client(client) { +MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager) : m_client(client) { m_data_handler = std::make_unique(m_client); - m_command_handler = std::make_unique(m_client); + m_command_handler = std::make_unique(m_client, deviceManager); m_client.set_message_callback([this](mqtt::const_message_ptr msg) { this->on_message_arrived(std::move(msg)); diff --git a/src/mqtt/mqtt_router.h b/src/mqtt/mqtt_router.h index 32428b6..1741ee8 100644 --- a/src/mqtt/mqtt_router.h +++ b/src/mqtt/mqtt_router.h @@ -7,7 +7,7 @@ class MqttRouter { public: // 构造函数现在接收 MqttClient 和所有的处理器 - MqttRouter(MqttClient& client); + MqttRouter(MqttClient& client, DeviceManager& deviceManager); void start(); private: