bonus-edge-proxy/src/main.cpp

159 lines
6.2 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// main.cpp
#include "network/tcp_server.h"
#include "mqtt/mqtt_client.h"
#include "mqtt/mqtt_router.h"
#include "systemMonitor/system_monitor.h"
#include "spdlog/spdlog.h"
#include "deviceManager/device_manager.h"
#include "dataCache/data_cache.h"
#include "dataCache/cache_uploader.h"
#include "web/web_server.h"
#include "dataCache/live_data_cache.h"
#include "dataStorage/data_storage.h"
#include "config/config_manager.h"
#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
#include <csignal>
#include <iostream>
#include <functional>
boost::asio::io_context g_io_context;
/**
* @brief 周期性轮询系统状态并发布到 MQTT
*/
void poll_system_metrics(
boost::asio::steady_timer& timer,
SystemMonitor::SystemMonitor& monitor,
MqttClient& mqtt_client
) {
if (g_io_context.stopped()) return;
auto cpu_util = monitor.getCpuUtilization();
auto mem_info = monitor.getMemoryInfo();
double mem_total_gb = mem_info.total_kb / 1024.0 / 1024.0;
std::string topic = "proxy/system_status";
std::string payload = "{\"cpu_usage\":" + std::to_string(cpu_util.totalUsagePercentage) +
",\"mem_total_gb\":" + std::to_string(mem_total_gb) + "}";
mqtt_client.publish(topic, payload);
spdlog::debug("System metrics published.");
timer.expires_at(timer.expiry() + std::chrono::seconds(15));
timer.async_wait(std::bind(poll_system_metrics, std::ref(timer), std::ref(monitor), std::ref(mqtt_client)));
}
int main(int argc, char* argv[]) {
const std::string config_path = "/app/config/config.json";
if (!ConfigManager::getInstance().load(config_path)) {
std::cerr << "Failed to load configuration from " << config_path
<< ". Running with defaults, but this may cause issues." << std::endl;
}
try {
spdlog::set_level(spdlog::level::from_str(ConfigManager::getInstance().getLogLevel()));
spdlog::info("Edge Proxy starting up...");
} catch (const spdlog::spdlog_ex& ex) {
std::cerr << "Log initialization failed: " << ex.what() << std::endl;
return 1;
}
spdlog::info("Initializing Data Storage...");
if (!DataStorage::getInstance().initialize(ConfigManager::getInstance().getDataStorageDbPath())) {
spdlog::critical("Failed to initialize DataStorage. Exiting.");
return 1;
}
try {
DataCache data_cache;
LiveDataCache live_data_cache;
MqttClient mqtt_client(ConfigManager::getInstance().getMqttBroker(),
ConfigManager::getInstance().getMqttClientID()
);
auto report_to_mqtt = [&](const UnifiedData& data) {
if (DataStorage::getInstance().storeProcessedData(data)) {
spdlog::debug("Successfully stored PROCESSED data for device '{}'", data.device_id);
} else {
spdlog::error("Failed to store PROCESSED data for device '{}'", data.device_id);
}
live_data_cache.update_data(data.device_id, data.data_json);
if (mqtt_client.is_connected()) {
// 网络正常,直接上报
std::string topic = "devices/" + data.device_id + "/data";
g_io_context.post([&, topic, payload = data.data_json]() {
mqtt_client.publish(topic, payload, 1, false);
});
} else {
// 网络断开,写入缓存
spdlog::warn("MQTT disconnected. Caching data for device '{}'.", data.device_id);
data_cache.add(data);
}
};
DeviceManager device_manager(g_io_context, report_to_mqtt);
MqttRouter mqtt_router(mqtt_client, device_manager, ConfigManager::getInstance().getDevicesConfigPath());
std::vector<uint16_t> listen_ports = ConfigManager::getInstance().getTcpServerPorts();
TCPServer tcp_server(g_io_context, listen_ports, mqtt_client);
SystemMonitor::SystemMonitor monitor;
if (!data_cache.open("edge_data_cache.db")) {
spdlog::critical("Failed to initialize data cache. Exiting.");
return 1;
}
CacheUploader cache_uploader(g_io_context, mqtt_client, data_cache);
mqtt_client.set_connected_handler([&](const std::string& cause){
spdlog::info("MQTT client connected: {}", cause);
cache_uploader.start_upload();
});
mqtt_client.connect();
mqtt_router.start();
monitor.getCpuUtilization();
boost::asio::steady_timer system_monitor_timer(g_io_context, std::chrono::seconds(15));
system_monitor_timer.async_wait(std::bind(poll_system_metrics, std::ref(system_monitor_timer), std::ref(monitor), std::ref(mqtt_client)));
device_manager.load_and_start(ConfigManager::getInstance().getDevicesConfigPath());
WebServer web_server(monitor, device_manager, live_data_cache, ConfigManager::getInstance().getWebServerPort());
web_server.start();
boost::asio::signal_set signals(g_io_context, SIGINT, SIGTERM);
signals.async_wait([&](const boost::system::error_code& error, int signal_number) {
spdlog::warn("Interrupt signal ({}) received. Shutting down.", signal_number);
// a. 首先停止所有数据采集线程
spdlog::info("[Shutdown] A. Stopping device manager services...");
device_manager.stop_all();
// b. 停止Web服务器线程
spdlog::info("[Shutdown] B. Stopping web server...");
web_server.stop();
// c. 断开MQTT连接 (这将释放它对io_context的'劫持')
spdlog::info("[Shutdown] C. Disconnecting from MQTT broker...");
mqtt_client.disconnect();
// d. 最后安全地停止io_context
spdlog::info("[Shutdown] D. Stopping main event loop...");
g_io_context.stop();
});
spdlog::info("All services are running. Press Ctrl+C to exit.");
g_io_context.run();
} catch (const std::exception& e) {
spdlog::critical("An unhandled exception occurred: {}", e.what());
return 1;
}
spdlog::info("Server has been shut down gracefully. Exiting.");
return 0;
}