diff --git a/src/main.cpp b/src/main.cpp index ea7ec35..615f5f2 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -10,7 +10,7 @@ #include "web/web_server.h" #include "dataCache/live_data_cache.h" #include "dataStorage/data_storage.h" -#include "config/config_manager.h" +#include "config/config_manager.h" #include #include @@ -49,11 +49,12 @@ int main(int argc, char* argv[]) { if (!ConfigManager::getInstance().load(config_path)) { std::cerr << "Failed to load configuration from " << config_path << ". Running with defaults, but this may cause issues." << std::endl; - } + auto& config = ConfigManager::getInstance(); + try { - spdlog::set_level(spdlog::level::from_str(ConfigManager::getInstance().getLogLevel())); + spdlog::set_level(spdlog::level::from_str(config.getLogLevel())); spdlog::info("Edge Proxy starting up..."); } catch (const spdlog::spdlog_ex& ex) { std::cerr << "Log initialization failed: " << ex.what() << std::endl; @@ -61,7 +62,7 @@ int main(int argc, char* argv[]) { } spdlog::info("Initializing Data Storage..."); - if (!DataStorage::getInstance().initialize(ConfigManager::getInstance().getDataStorageDbPath())) { + if (!DataStorage::getInstance().initialize(config.getDataStorageDbPath())) { spdlog::critical("Failed to initialize DataStorage. Exiting."); return 1; } @@ -69,8 +70,8 @@ int main(int argc, char* argv[]) { try { DataCache data_cache; LiveDataCache live_data_cache; - MqttClient mqtt_client(ConfigManager::getInstance().getMqttBroker(), - ConfigManager::getInstance().getMqttClientID() + MqttClient mqtt_client(config.getMqttBroker(), + config.getMqttClientID() ); auto report_to_mqtt = [&](const UnifiedData& data) { if (DataStorage::getInstance().storeProcessedData(data)) { @@ -94,13 +95,15 @@ int main(int argc, char* argv[]) { }; DeviceManager device_manager(g_io_context, report_to_mqtt); - MqttRouter mqtt_router(mqtt_client, device_manager, ConfigManager::getInstance().getDevicesConfigPath()); - std::vector listen_ports = ConfigManager::getInstance().getTcpServerPorts(); + MqttRouter mqtt_router(mqtt_client, device_manager); + + std::vector listen_ports = config.getTcpServerPorts(); TCPServer tcp_server(g_io_context, listen_ports, mqtt_client); SystemMonitor::SystemMonitor monitor; - if (!data_cache.open("edge_data_cache.db")) { - spdlog::critical("Failed to initialize data cache. Exiting."); + // [修改] 使用 ConfigManager 中的路径,而不是硬编码 + if (!data_cache.open(config.getDataCacheDbPath())) { + spdlog::critical("Failed to initialize data cache at '{}'. Exiting.", config.getDataCacheDbPath()); return 1; } CacheUploader cache_uploader(g_io_context, mqtt_client, data_cache); @@ -111,7 +114,7 @@ int main(int argc, char* argv[]) { }); mqtt_client.connect(); - mqtt_router.start(); + mqtt_router.start(); // MqttRouter 内部会动态订阅 Topic monitor.getCpuUtilization(); @@ -119,9 +122,9 @@ int main(int argc, char* argv[]) { system_monitor_timer.async_wait(std::bind(poll_system_metrics, std::ref(system_monitor_timer), std::ref(monitor), std::ref(mqtt_client))); - device_manager.load_and_start(ConfigManager::getInstance().getDevicesConfigPath()); + device_manager.load_and_start(config.getDevicesConfigPath()); - WebServer web_server(monitor, device_manager, live_data_cache, ConfigManager::getInstance().getWebServerPort()); + WebServer web_server(monitor, device_manager, live_data_cache, config.getWebServerPort()); web_server.start(); boost::asio::signal_set signals(g_io_context, SIGINT, SIGTERM); diff --git a/src/mqtt/handler/command_handler.cpp b/src/mqtt/handler/command_handler.cpp index cd06897..6e284a5 100644 --- a/src/mqtt/handler/command_handler.cpp +++ b/src/mqtt/handler/command_handler.cpp @@ -3,77 +3,71 @@ #include "spdlog/spdlog.h" #include "vendor/nlohmann/json.hpp" #include "utils/mqtt_topic_matcher.h" +#include "config/config_manager.h" // [新增] 引入 ConfigManager #include using json = nlohmann::json; -CommandHandler::CommandHandler(MqttClient& client, DeviceManager& deviceManager, std::string config_path) +// [修改] 构造函数签名变更 +CommandHandler::CommandHandler(MqttClient& client, DeviceManager& deviceManager) : m_client(client), m_device_manager(deviceManager), - m_config_file_path(std::move(config_path)) + m_devices_config_path(ConfigManager::getInstance().getDevicesConfigPath()), + m_device_id(ConfigManager::getInstance().getDeviceID()), + m_ack_topic("proxy/control/" + m_device_id + "/device/ack"), + m_command_filter("commands/" + m_device_id + "/#"), + m_proxy_control_filter("proxy/control/" + m_device_id + "/device/update") { - spdlog::info("CommandHandler initialized, will persist config to: {}", m_config_file_path); + spdlog::info("CommandHandler initialized, will persist device config to: {}", m_devices_config_path); + spdlog::debug("CommandHandler watching command topic: {}", m_command_filter); + spdlog::debug("CommandHandler watching control topic: {}", m_proxy_control_filter); } void CommandHandler::handle(mqtt::const_message_ptr msg) { const std::string topic = msg->get_topic(); const std::string payload = msg->get_payload_str(); - - const std::string command_filter = "commands/my-edge-proxy-device-01/#"; - const std::string proxy_control_filter = "proxy/control/my-edge-proxy-device-01/config/update"; - const std::string ack_topic = "proxy/control/my-edge-proxy-device-01/config/ack"; - - if (MqttUtils::topic_matches(proxy_control_filter, topic)) { + if (MqttUtils::topic_matches(m_proxy_control_filter, topic)) { spdlog::info("CommandHandler received proxy config update on topic '{}'", topic); if (payload.empty()) { spdlog::warn("Config update payload is empty. Ignoring."); - m_client.publish(ack_topic, "{\"status\":\"error\",\"reason\":\"payload empty\"}"); + m_client.publish(m_ack_topic, "{\"status\":\"error\",\"reason\":\"payload empty\"}"); return; } try { - // 步骤 1: 尝试将新配置写入磁盘文件 - std::ofstream config_file(m_config_file_path); + std::ofstream config_file(m_devices_config_path); if (!config_file.is_open()) { - spdlog::error("Failed to open config file for writing: {}", m_config_file_path); - m_client.publish(ack_topic, "{\"status\":\"error\", \"reason\":\"failed to write file\"}"); + spdlog::error("Failed to open config file for writing: {}", m_devices_config_path); + m_client.publish(m_ack_topic, "{\"status\":\"error\", \"reason\":\"failed to write file\"}"); return; } - // 验证JSON合法性 try { [[maybe_unused]] auto parsed_json = json::parse(payload); } catch (const json::exception& e) { spdlog::error("Failed to parse new config JSON: {}", e.what()); - m_client.publish(ack_topic, "{\"status\":\"error\", \"reason\":\"invalid json\"}"); + m_client.publish(m_ack_topic, "{\"status\":\"error\", \"reason\":\"invalid json\"}"); return; } config_file << payload; config_file.close(); - spdlog::info("Successfully persisted new config to '{}'.", m_config_file_path); - - // 步骤 2: 只有在写入成功后,才应用到运行时 - m_device_manager.post_apply_device_configuration(payload); + spdlog::info("Successfully persisted new config to '{}'.", m_devices_config_path); - m_client.publish(ack_topic, "{\"status\":\"success\"}"); + m_device_manager.post_apply_device_configuration(payload); + m_client.publish(m_ack_topic, "{\"status\":\"success\"}"); } catch (const std::exception& e) { spdlog::error("Exception while persisting config: {}", e.what()); - m_client.publish(ack_topic, "{\"status\":\"error\", \"reason\":\"exception\"}"); + m_client.publish(m_ack_topic, "{\"status\":\"error\", \"reason\":\"exception\"}"); } - - // 调用 DeviceManager 的新方法 - // 这个方法必须是线程安全的,它会将实际的更新操作 post 到 g_io_context - m_device_manager.post_apply_device_configuration(payload); - m_client.publish("proxy/control/my-edge-proxy-device-01/config/ack", "{\"status\":\"received\"}"); } - else if (MqttUtils::topic_matches(command_filter, topic)) { + else if (MqttUtils::topic_matches(m_command_filter, topic)) { spdlog::info("CommandHandler received device command on topic '{}'", topic); try { @@ -109,5 +103,4 @@ void CommandHandler::handle(mqtt::const_message_ptr msg) { else { spdlog::warn("CommandHandler received message on unhandled topic: {}", topic); } - } \ No newline at end of file diff --git a/src/mqtt/handler/command_handler.h b/src/mqtt/handler/command_handler.h index aef5810..5910a81 100644 --- a/src/mqtt/handler/command_handler.h +++ b/src/mqtt/handler/command_handler.h @@ -2,13 +2,22 @@ #pragma once #include "../mqtt_client.h" #include "deviceManager/device_manager.h" +#include // [新增] 包含 string class CommandHandler { public: - explicit CommandHandler(MqttClient& client, DeviceManager& deviceManager, std::string config_path); + // [修改] 移除了 config_path 参数 + explicit CommandHandler(MqttClient& client, DeviceManager& deviceManager); void handle(mqtt::const_message_ptr msg); + private: MqttClient& m_client; DeviceManager& m_device_manager; - std::string m_config_file_path; + + std::string m_devices_config_path; + std::string m_device_id; + + std::string m_ack_topic; + std::string m_command_filter; + std::string m_proxy_control_filter; }; \ No newline at end of file diff --git a/src/mqtt/mqtt_router.cpp b/src/mqtt/mqtt_router.cpp index 3bed9f2..9a5754c 100644 --- a/src/mqtt/mqtt_router.cpp +++ b/src/mqtt/mqtt_router.cpp @@ -1,15 +1,19 @@ #include "mqtt_router.h" #include "spdlog/spdlog.h" #include "utils/mqtt_topic_matcher.h" +#include "config/config_manager.h" // [新增] 引入 ConfigManager -MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager, std::string config_path) +// [修改] 构造函数签名变更 +MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager) : m_client(client), - m_config_file_path(std::move(config_path)) // +++ 新增:初始化路径 + // [修改] m_config_file_path 已移除 + // [修改] 从 ConfigManager 初始化 m_device_id + m_device_id(ConfigManager::getInstance().getDeviceID()) { m_data_handler = std::make_unique(m_client); - - // +++ 修改:将 config_path 传递给 CommandHandler +++ - m_command_handler = std::make_unique(m_client, deviceManager, m_config_file_path); + + // [修改] CommandHandler 的构造函数也不再需要 config_path + m_command_handler = std::make_unique(m_client, deviceManager); m_client.set_message_callback([this](mqtt::const_message_ptr msg) { this->on_message_arrived(std::move(msg)); @@ -18,18 +22,25 @@ MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager, std::st } void MqttRouter::start() { + // [修改] 动态构建订阅主题,移除硬编码 + std::string command_topic = "commands/" + m_device_id + "/#"; + std::string control_topic = "proxy/control/" + m_device_id + "/device/update"; + m_client.subscribe("devices/+/data"); - m_client.subscribe("commands/my-edge-proxy-device-01/#"); - m_client.subscribe("proxy/control/my-edge-proxy-device-01/config/update"); - spdlog::info("MqttRouter has subscribed to all topics."); + m_client.subscribe(command_topic); + m_client.subscribe(control_topic); + + spdlog::info("MqttRouter subscribed to 'devices/+/data'"); + spdlog::info("MqttRouter subscribed to '{}'", command_topic); + spdlog::info("MqttRouter subscribed to '{}'", control_topic); } void MqttRouter::on_message_arrived(mqtt::const_message_ptr msg) { const auto& topic = msg->get_topic(); const std::string data_filter = "devices/+/data"; - const std::string command_filter = "commands/my-edge-proxy-device-01/#"; - const std::string proxy_control_filter = "proxy/control/my-edge-proxy-device-01/config/update"; + const std::string command_filter = "commands/" + m_device_id + "/#"; + const std::string proxy_control_filter = "proxy/control/" + m_device_id + "/device/update"; if (MqttUtils::topic_matches(data_filter, topic)) { m_data_handler->handle(msg); diff --git a/src/mqtt/mqtt_router.h b/src/mqtt/mqtt_router.h index 1f4ba06..fe10b21 100644 --- a/src/mqtt/mqtt_router.h +++ b/src/mqtt/mqtt_router.h @@ -3,10 +3,11 @@ #include "handler/data_handler.h" // 引入处理器 #include "handler/command_handler.h" // 引入处理器 #include +#include class MqttRouter { public: - MqttRouter(MqttClient& client, DeviceManager& deviceManager, std::string config_path); + MqttRouter(MqttClient& client, DeviceManager& deviceManager); void start(); private: @@ -17,5 +18,7 @@ private: std::unique_ptr m_data_handler; std::unique_ptr m_command_handler; - std::string m_config_file_path; + // [修改] 缓存从 ConfigManager 获取的 device_id,用于动态构建 Topic + std::string m_device_id; + // [修改] m_config_file_path 已移除,CommandHandler 将自行管理 }; \ No newline at end of file