// main.cpp (修改后) #include #include #include #include #include #include #include "alarm/alarm_service.h" #include "config/config_manager.h" #include "dataCache/cache_uploader.h" #include "dataCache/data_cache.h" #include "dataCache/live_data_cache.h" #include "dataStorage/data_storage.h" #include "deviceManager/device_manager.h" #include "mqtt/mqtt_client.h" #include "mqtt/mqtt_router.h" #include "network/tcp_server.h" #include "nlohmann/json.hpp" #include "spdlog/spdlog.h" #include "systemMonitor/system_monitor.h" #include "tts/piper_tts_interface.h" #include "videoService/video_service_manager.h" #include "web/web_server.h" // [新增] 包含 postprocess.h 以便调用 deinitPostProcess #include "rknn/postprocess.h" boost::asio::io_context g_io_context; /** * @brief 周期性轮询系统状态并发布到 MQTT */ void poll_system_metrics(boost::asio::steady_timer &timer, SystemMonitor::SystemMonitor &monitor, MqttClient &mqtt_client, AlarmService &alarm_service) { // ... (此函数保持不变) ... if (g_io_context.stopped()) return; auto cpu_util = monitor.getCpuUtilization(); auto mem_info = monitor.getMemoryInfo(); double mem_total_gb = mem_info.total_kb / 1024.0 / 1024.0; double mem_usage_percentage = (mem_info.total_kb > 0) ? (100.0 * (mem_info.total_kb - mem_info.available_kb) / mem_info.total_kb) : 0.0; auto thermalInfoString = monitor.getChipTemperature(); std::string topic = "proxy/system_status"; std::string payload; try { nlohmann::json payload_json; payload_json["cpu_usage"] = cpu_util.totalUsagePercentage; payload_json["mem_total_gb"] = mem_total_gb; payload_json["mem_usage_percentage"] = mem_usage_percentage; payload_json["thermal_info"] = nlohmann::json::parse(thermalInfoString); payload = payload_json.dump(); } catch (const nlohmann::json::parse_error &e) { spdlog::error("Failed to parse thermalInfo JSON: {}. Sending partial data.", e.what()); nlohmann::json fallback_json; fallback_json["cpu_usage"] = cpu_util.totalUsagePercentage; fallback_json["mem_total_gb"] = mem_total_gb; fallback_json["mem_usage_percentage"] = mem_usage_percentage; fallback_json["thermal_info_error"] = "parsing_failed"; fallback_json["raw_thermal_info"] = thermalInfoString; payload = fallback_json.dump(); } alarm_service.process_system_data(payload); mqtt_client.publish(topic, payload); spdlog::debug("System metrics published."); timer.expires_at(timer.expiry() + std::chrono::seconds(15)); timer.async_wait(std::bind(poll_system_metrics, std::ref(timer), std::ref(monitor), std::ref(mqtt_client), std::ref(alarm_service))); } int main(int argc, char *argv[]) { const std::string config_path = "/app/config/config.json"; if (!ConfigManager::getInstance().load(config_path)) { std::cerr << "Failed to load configuration from " << config_path << ". Running with defaults, but this may cause issues." << std::endl; } auto &config = ConfigManager::getInstance(); try { spdlog::set_level(spdlog::level::from_str(config.getLogLevel())); spdlog::info("Edge Proxy starting up..."); } catch (const spdlog::spdlog_ex &ex) { std::cerr << "Log initialization failed: " << ex.what() << std::endl; return 1; } spdlog::info("Initializing Data Storage..."); auto &data_storage = DataStorage::getInstance(); if (!data_storage.initialize(config.getDataStorageDbPath())) { spdlog::critical("Failed to initialize DataStorage. Exiting."); return 1; } try { spdlog::info("Initializing Video Service..."); VideoServiceManager video_manager; // ... (DataCache, MqttClient, TTSService, AlarmService ... 初始化保持不变) // ... DataCache data_cache; LiveDataCache live_data_cache; MqttClient mqtt_client(config.getMqttBroker(), config.getMqttClientID()); PiperTTSInterface tts_service(config.getPiperExecutablePath(), config.getPiperModelPath()); AlarmService alarm_service(g_io_context, tts_service, mqtt_client); if (!alarm_service.load_rules(config.getAlarmRulesPath())) { spdlog::error("Failed to load alarm rules. Alarms may be disabled."); } // ... (report_to_mqtt, DeviceManager, MqttRouter ... 初始化保持不变) ... auto report_to_mqtt = [&](const UnifiedData &data) { if (data_storage.storeProcessedData(data)) { spdlog::debug("Successfully stored PROCESSED data for device '{}'", data.device_id); } else { spdlog::error("Failed to store PROCESSED data for device '{}'", data.device_id); } live_data_cache.update_data(data.device_id, data.data_json); alarm_service.process_device_data(data.device_id, data.data_json); if (mqtt_client.is_connected()) { std::string topic = "devices/" + data.device_id + "/data"; g_io_context.post([&, topic, payload = data.data_json]() { mqtt_client.publish(topic, payload, 1, false); }); } else { spdlog::warn("MQTT disconnected. Caching data for device '{}'.", data.device_id); data_cache.add(data); } }; DeviceManager device_manager(g_io_context, report_to_mqtt); MqttRouter mqtt_router(mqtt_client, device_manager); std::vector listen_ports = config.getTcpServerPorts(); TCPServer tcp_server(g_io_context, listen_ports, mqtt_client); SystemMonitor::SystemMonitor monitor; if (!data_cache.open(config.getDataCacheDbPath())) { spdlog::critical("Failed to initialize data cache at '{}'. Exiting.", config.getDataCacheDbPath()); return 1; } CacheUploader cache_uploader(g_io_context, mqtt_client, data_cache); mqtt_client.set_connected_handler([&](const std::string &cause) { spdlog::info("MQTT client connected: {}", cause); cache_uploader.start_upload(); }); mqtt_client.connect(); mqtt_router.start(); monitor.getCpuUtilization(); boost::asio::steady_timer system_monitor_timer(g_io_context, std::chrono::seconds(15)); system_monitor_timer.async_wait(std::bind( poll_system_metrics, std::ref(system_monitor_timer), std::ref(monitor), std::ref(mqtt_client), std::ref(alarm_service))); device_manager.load_and_start(config.getDevicesConfigPath()); WebServer web_server(monitor, device_manager, live_data_cache, alarm_service, config.getWebServerPort()); web_server.set_shutdown_handler([&]() { spdlog::warn("Received shutdown command from Web API. Shutting down."); g_io_context.stop(); // <-- 这将触发 signals.async_wait 下方的所有清理逻辑 }); web_server.start(); // ----------------------------------------------------------------- // [修改] 关键修改点 1: 视频服务启动逻辑 // ----------------------------------------------------------------- // (旧代码已删除) // [新代码] // 1. 从主 config 获取 video_config.json 的 *路径* std::string video_config_path = config.getVideoConfigPath(); // 2. 让 video_manager 自己加载该配置文件 if (video_manager.load_config(video_config_path)) { // 3. 启动服务 (load_and_start 内部会检查 "enabled" 标志) video_manager.load_and_start(); } else { spdlog::error("Failed to load video configuration from {}. Video " "services will not start.", video_config_path); } // ----------------------------------------------------------------- boost::asio::signal_set signals(g_io_context, SIGINT, SIGTERM); signals.async_wait( [&](const boost::system::error_code &error, int signal_number) { spdlog::warn("Interrupt signal ({}) received. Shutting down.", signal_number); // a. 停止所有数据采集 spdlog::info("[Shutdown] A. Stopping device manager services..."); device_manager.stop_all(); // b. 停止Web服务器 spdlog::info("[Shutdown] B. Stopping web server..."); web_server.stop(); // c. 停止告警服务 (释放TTS线程) spdlog::info("[Shutdown] C. Stopping alarm service..."); alarm_service.stop(); // d. 断开MQTT spdlog::info("[Shutdown] D. Disconnecting from MQTT broker..."); mqtt_client.disconnect(); // e. 停止视频服务 spdlog::info("[Shutdown] E. Stopping video Service loop..."); video_manager.stop_all(); spdlog::info("[Shutdown] F. De-initializing postprocess library..."); deinitPostProcess(); // f. 最后,安全地停止io_context spdlog::info("[Shutdown] G. Stopping main event loop..."); g_io_context.stop(); }); spdlog::info("All services are running. Press Ctrl+C to exit."); g_io_context.run(); } catch (const std::exception &e) { spdlog::critical("An unhandled exception occurred: {}", e.what()); return 1; } spdlog::info("Server has been shut down gracefully. Exiting."); return 0; }