进一步解耦config

This commit is contained in:
GuanYuankai 2025-10-23 09:29:11 +00:00
parent 374e8b586d
commit 4814560be6
5 changed files with 75 additions and 56 deletions

View File

@ -10,7 +10,7 @@
#include "web/web_server.h"
#include "dataCache/live_data_cache.h"
#include "dataStorage/data_storage.h"
#include "config/config_manager.h"
#include "config/config_manager.h"
#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
@ -49,11 +49,12 @@ int main(int argc, char* argv[]) {
if (!ConfigManager::getInstance().load(config_path)) {
std::cerr << "Failed to load configuration from " << config_path
<< ". Running with defaults, but this may cause issues." << std::endl;
}
auto& config = ConfigManager::getInstance();
try {
spdlog::set_level(spdlog::level::from_str(ConfigManager::getInstance().getLogLevel()));
spdlog::set_level(spdlog::level::from_str(config.getLogLevel()));
spdlog::info("Edge Proxy starting up...");
} catch (const spdlog::spdlog_ex& ex) {
std::cerr << "Log initialization failed: " << ex.what() << std::endl;
@ -61,7 +62,7 @@ int main(int argc, char* argv[]) {
}
spdlog::info("Initializing Data Storage...");
if (!DataStorage::getInstance().initialize(ConfigManager::getInstance().getDataStorageDbPath())) {
if (!DataStorage::getInstance().initialize(config.getDataStorageDbPath())) {
spdlog::critical("Failed to initialize DataStorage. Exiting.");
return 1;
}
@ -69,8 +70,8 @@ int main(int argc, char* argv[]) {
try {
DataCache data_cache;
LiveDataCache live_data_cache;
MqttClient mqtt_client(ConfigManager::getInstance().getMqttBroker(),
ConfigManager::getInstance().getMqttClientID()
MqttClient mqtt_client(config.getMqttBroker(),
config.getMqttClientID()
);
auto report_to_mqtt = [&](const UnifiedData& data) {
if (DataStorage::getInstance().storeProcessedData(data)) {
@ -94,13 +95,15 @@ int main(int argc, char* argv[]) {
};
DeviceManager device_manager(g_io_context, report_to_mqtt);
MqttRouter mqtt_router(mqtt_client, device_manager, ConfigManager::getInstance().getDevicesConfigPath());
std::vector<uint16_t> listen_ports = ConfigManager::getInstance().getTcpServerPorts();
MqttRouter mqtt_router(mqtt_client, device_manager);
std::vector<uint16_t> listen_ports = config.getTcpServerPorts();
TCPServer tcp_server(g_io_context, listen_ports, mqtt_client);
SystemMonitor::SystemMonitor monitor;
if (!data_cache.open("edge_data_cache.db")) {
spdlog::critical("Failed to initialize data cache. Exiting.");
// [修改] 使用 ConfigManager 中的路径,而不是硬编码
if (!data_cache.open(config.getDataCacheDbPath())) {
spdlog::critical("Failed to initialize data cache at '{}'. Exiting.", config.getDataCacheDbPath());
return 1;
}
CacheUploader cache_uploader(g_io_context, mqtt_client, data_cache);
@ -111,7 +114,7 @@ int main(int argc, char* argv[]) {
});
mqtt_client.connect();
mqtt_router.start();
mqtt_router.start(); // MqttRouter 内部会动态订阅 Topic
monitor.getCpuUtilization();
@ -119,9 +122,9 @@ int main(int argc, char* argv[]) {
system_monitor_timer.async_wait(std::bind(poll_system_metrics, std::ref(system_monitor_timer), std::ref(monitor), std::ref(mqtt_client)));
device_manager.load_and_start(ConfigManager::getInstance().getDevicesConfigPath());
device_manager.load_and_start(config.getDevicesConfigPath());
WebServer web_server(monitor, device_manager, live_data_cache, ConfigManager::getInstance().getWebServerPort());
WebServer web_server(monitor, device_manager, live_data_cache, config.getWebServerPort());
web_server.start();
boost::asio::signal_set signals(g_io_context, SIGINT, SIGTERM);

View File

@ -3,77 +3,71 @@
#include "spdlog/spdlog.h"
#include "vendor/nlohmann/json.hpp"
#include "utils/mqtt_topic_matcher.h"
#include "config/config_manager.h" // [新增] 引入 ConfigManager
#include <fstream>
using json = nlohmann::json;
CommandHandler::CommandHandler(MqttClient& client, DeviceManager& deviceManager, std::string config_path)
// [修改] 构造函数签名变更
CommandHandler::CommandHandler(MqttClient& client, DeviceManager& deviceManager)
: m_client(client),
m_device_manager(deviceManager),
m_config_file_path(std::move(config_path))
m_devices_config_path(ConfigManager::getInstance().getDevicesConfigPath()),
m_device_id(ConfigManager::getInstance().getDeviceID()),
m_ack_topic("proxy/control/" + m_device_id + "/device/ack"),
m_command_filter("commands/" + m_device_id + "/#"),
m_proxy_control_filter("proxy/control/" + m_device_id + "/device/update")
{
spdlog::info("CommandHandler initialized, will persist config to: {}", m_config_file_path);
spdlog::info("CommandHandler initialized, will persist device config to: {}", m_devices_config_path);
spdlog::debug("CommandHandler watching command topic: {}", m_command_filter);
spdlog::debug("CommandHandler watching control topic: {}", m_proxy_control_filter);
}
void CommandHandler::handle(mqtt::const_message_ptr msg) {
const std::string topic = msg->get_topic();
const std::string payload = msg->get_payload_str();
const std::string command_filter = "commands/my-edge-proxy-device-01/#";
const std::string proxy_control_filter = "proxy/control/my-edge-proxy-device-01/config/update";
const std::string ack_topic = "proxy/control/my-edge-proxy-device-01/config/ack";
if (MqttUtils::topic_matches(proxy_control_filter, topic)) {
if (MqttUtils::topic_matches(m_proxy_control_filter, topic)) {
spdlog::info("CommandHandler received proxy config update on topic '{}'", topic);
if (payload.empty()) {
spdlog::warn("Config update payload is empty. Ignoring.");
m_client.publish(ack_topic, "{\"status\":\"error\",\"reason\":\"payload empty\"}");
m_client.publish(m_ack_topic, "{\"status\":\"error\",\"reason\":\"payload empty\"}");
return;
}
try {
// 步骤 1: 尝试将新配置写入磁盘文件
std::ofstream config_file(m_config_file_path);
std::ofstream config_file(m_devices_config_path);
if (!config_file.is_open()) {
spdlog::error("Failed to open config file for writing: {}", m_config_file_path);
m_client.publish(ack_topic, "{\"status\":\"error\", \"reason\":\"failed to write file\"}");
spdlog::error("Failed to open config file for writing: {}", m_devices_config_path);
m_client.publish(m_ack_topic, "{\"status\":\"error\", \"reason\":\"failed to write file\"}");
return;
}
// 验证JSON合法性
try {
[[maybe_unused]] auto parsed_json = json::parse(payload);
} catch (const json::exception& e) {
spdlog::error("Failed to parse new config JSON: {}", e.what());
m_client.publish(ack_topic, "{\"status\":\"error\", \"reason\":\"invalid json\"}");
m_client.publish(m_ack_topic, "{\"status\":\"error\", \"reason\":\"invalid json\"}");
return;
}
config_file << payload;
config_file.close();
spdlog::info("Successfully persisted new config to '{}'.", m_config_file_path);
// 步骤 2: 只有在写入成功后,才应用到运行时
m_device_manager.post_apply_device_configuration(payload);
spdlog::info("Successfully persisted new config to '{}'.", m_devices_config_path);
m_client.publish(ack_topic, "{\"status\":\"success\"}");
m_device_manager.post_apply_device_configuration(payload);
m_client.publish(m_ack_topic, "{\"status\":\"success\"}");
} catch (const std::exception& e) {
spdlog::error("Exception while persisting config: {}", e.what());
m_client.publish(ack_topic, "{\"status\":\"error\", \"reason\":\"exception\"}");
m_client.publish(m_ack_topic, "{\"status\":\"error\", \"reason\":\"exception\"}");
}
// 调用 DeviceManager 的新方法
// 这个方法必须是线程安全的,它会将实际的更新操作 post 到 g_io_context
m_device_manager.post_apply_device_configuration(payload);
m_client.publish("proxy/control/my-edge-proxy-device-01/config/ack", "{\"status\":\"received\"}");
}
else if (MqttUtils::topic_matches(command_filter, topic)) {
else if (MqttUtils::topic_matches(m_command_filter, topic)) {
spdlog::info("CommandHandler received device command on topic '{}'", topic);
try {
@ -109,5 +103,4 @@ void CommandHandler::handle(mqtt::const_message_ptr msg) {
else {
spdlog::warn("CommandHandler received message on unhandled topic: {}", topic);
}
}

View File

@ -2,13 +2,22 @@
#pragma once
#include "../mqtt_client.h"
#include "deviceManager/device_manager.h"
#include <string> // [新增] 包含 string
class CommandHandler {
public:
explicit CommandHandler(MqttClient& client, DeviceManager& deviceManager, std::string config_path);
// [修改] 移除了 config_path 参数
explicit CommandHandler(MqttClient& client, DeviceManager& deviceManager);
void handle(mqtt::const_message_ptr msg);
private:
MqttClient& m_client;
DeviceManager& m_device_manager;
std::string m_config_file_path;
std::string m_devices_config_path;
std::string m_device_id;
std::string m_ack_topic;
std::string m_command_filter;
std::string m_proxy_control_filter;
};

View File

@ -1,15 +1,19 @@
#include "mqtt_router.h"
#include "spdlog/spdlog.h"
#include "utils/mqtt_topic_matcher.h"
#include "config/config_manager.h" // [新增] 引入 ConfigManager
MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager, std::string config_path)
// [修改] 构造函数签名变更
MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager)
: m_client(client),
m_config_file_path(std::move(config_path)) // +++ 新增:初始化路径
// [修改] m_config_file_path 已移除
// [修改] 从 ConfigManager 初始化 m_device_id
m_device_id(ConfigManager::getInstance().getDeviceID())
{
m_data_handler = std::make_unique<DataHandler>(m_client);
// +++ 修改:将 config_path 传递给 CommandHandler +++
m_command_handler = std::make_unique<CommandHandler>(m_client, deviceManager, m_config_file_path);
// [修改] CommandHandler 的构造函数也不再需要 config_path
m_command_handler = std::make_unique<CommandHandler>(m_client, deviceManager);
m_client.set_message_callback([this](mqtt::const_message_ptr msg) {
this->on_message_arrived(std::move(msg));
@ -18,18 +22,25 @@ MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager, std::st
}
void MqttRouter::start() {
// [修改] 动态构建订阅主题,移除硬编码
std::string command_topic = "commands/" + m_device_id + "/#";
std::string control_topic = "proxy/control/" + m_device_id + "/device/update";
m_client.subscribe("devices/+/data");
m_client.subscribe("commands/my-edge-proxy-device-01/#");
m_client.subscribe("proxy/control/my-edge-proxy-device-01/config/update");
spdlog::info("MqttRouter has subscribed to all topics.");
m_client.subscribe(command_topic);
m_client.subscribe(control_topic);
spdlog::info("MqttRouter subscribed to 'devices/+/data'");
spdlog::info("MqttRouter subscribed to '{}'", command_topic);
spdlog::info("MqttRouter subscribed to '{}'", control_topic);
}
void MqttRouter::on_message_arrived(mqtt::const_message_ptr msg) {
const auto& topic = msg->get_topic();
const std::string data_filter = "devices/+/data";
const std::string command_filter = "commands/my-edge-proxy-device-01/#";
const std::string proxy_control_filter = "proxy/control/my-edge-proxy-device-01/config/update";
const std::string command_filter = "commands/" + m_device_id + "/#";
const std::string proxy_control_filter = "proxy/control/" + m_device_id + "/device/update";
if (MqttUtils::topic_matches(data_filter, topic)) {
m_data_handler->handle(msg);

View File

@ -3,10 +3,11 @@
#include "handler/data_handler.h" // 引入处理器
#include "handler/command_handler.h" // 引入处理器
#include <memory>
#include <string>
class MqttRouter {
public:
MqttRouter(MqttClient& client, DeviceManager& deviceManager, std::string config_path);
MqttRouter(MqttClient& client, DeviceManager& deviceManager);
void start();
private:
@ -17,5 +18,7 @@ private:
std::unique_ptr<DataHandler> m_data_handler;
std::unique_ptr<CommandHandler> m_command_handler;
std::string m_config_file_path;
// [修改] 缓存从 ConfigManager 获取的 device_id用于动态构建 Topic
std::string m_device_id;
// [修改] m_config_file_path 已移除CommandHandler 将自行管理
};