#include "network/tcp_server.h" #include "mqtt/mqtt_client.h" #include "mqtt/mqtt_router.h" #include "systemMonitor/system_monitor.h" #include "spdlog/spdlog.h" #include "modbus/modbus_master_poller.h" #include "modbus/modbus_rtu_poller_service.h" #include #include #include #include #include // 用于 ASIO 服务的全局 io_context boost::asio::io_context g_io_context; /** * @brief 处理终止信号 (SIGINT, SIGTERM). * * 这个函数会优雅地停止 Boost.Asio 的 io_context, * 这将导致 main() 函数中的 io_context.run() 调用解除阻塞, * 从而允许程序干净地退出。 * * @param signum 接收到的信号编号。 */ void signalHandler(int signum) { spdlog::warn("Interrupt signal ({}) received. Shutting down.", signum); // 向 io_context 提交一个停止事件。这是一种线程安全的方式 // 来告诉 io_context 停止其事件循环。 g_io_context.stop(); } void poll_system_metrics( boost::asio::steady_timer& timer, SystemMonitor::SystemMonitor& monitor, MqttClient& mqtt_client ) { // a. 采集数据 auto cpu_util = monitor.getCpuUtilization(); auto mem_info = monitor.getMemoryInfo(); double mem_total_gb = mem_info.total_kb / 1024.0 / 1024.0; // b. (示例) 将数据通过 MQTT 发送出去 std::string topic = "proxy/system_status"; std::string payload = "{\"cpu_usage\":" + std::to_string(cpu_util.totalUsagePercentage) + ",\"mem_total_gb\":" + std::to_string(mem_total_gb) + "}"; mqtt_client.publish(topic, payload); spdlog::debug("System metrics published."); // c. 将定时器重置到 15 秒后 timer.expires_at(timer.expiry() + std::chrono::seconds(15)); // d. 异步等待定时器,到期后再次调用本函数,形成循环 timer.async_wait(std::bind(poll_system_metrics, std::ref(timer), std::ref(monitor), std::ref(mqtt_client))); } int main(int argc, char* argv[]) { try { spdlog::set_level(spdlog::level::debug); // 设置日志级别为 debug,方便调试 spdlog::info("Edge Proxy starting up..."); } catch (const spdlog::spdlog_ex& ex) { std::cerr << "Log initialization failed: " << ex.what() << std::endl; return 1; } // 注册信号处理器 signal(SIGINT, signalHandler); signal(SIGTERM, signalHandler); try { constexpr uint16_t tcp_port = 8888; MqttClient mqtt_client("tcp://mqtt-broker:1883", "edge-proxy-main-client"); MqttRouter mqtt_router(mqtt_client); std::vector listen_ports = { 8888 }; TCPServer tcp_server(g_io_context, listen_ports, mqtt_client); SystemMonitor::SystemMonitor monitor; mqtt_client.connect(); mqtt_router.start(); monitor.getCpuUtilization(); boost::asio::steady_timer timer(g_io_context, std::chrono::seconds(15)); timer.async_wait(std::bind(poll_system_metrics, std::ref(timer), std::ref(monitor), std::ref(mqtt_client))); auto report_to_mqtt = [&](const UnifiedData& data) { std::string topic = "devices/" + data.device_id + "/data"; mqtt_client.publish(topic, data.data_json, 1, false); }; // 配置并启动Modbus轮询器 // ModbusDeviceConfig temp_sensor_config = { // .device_id = "temp_sensor_workshop1", // .ip_address = "192.168.1.120", // 您的Modbus设备的IP地址 // .port = 502, // Modbus TCP标准端口 // .slave_id = 1, // 设备的从站ID // .start_address = 0, // 要读取的第一个寄存器的地址 // .quantity = 8, // 从起始地址开始,总共读取多少个寄存器 // .poll_interval_ms = 2000 // 每2000毫秒轮询一次 // }; // 创建轮询器实例 (使用std::make_shared确保生命周期) // auto poller = std::make_shared(g_io_context, temp_sensor_config, report_to_mqtt); // poller->start(); // ========================================================================== // modbus rtu轮询服务启动 ModbusRtuDeviceConfig temp_sensor_config = { .device_id = "rtu_temp_sensor_lab", .port_path = "/dev/ttyS7", .baud_rate = 9600, .slave_id = 0x01, .poll_interval_ms = 5000, .data_points = { {"temperature", 0x0000, ModbusDataType::INT16, 0.1}, // 地址0, 16位有符号, 结果乘以0.1 {"humidity", 0x0001, ModbusDataType::UINT16, 0.1} // 地址1, 16位无符号, 结果乘以0.1 } }; ModbusRtuPollerService temp_sensor_service(temp_sensor_config, report_to_mqtt); temp_sensor_service.start(); // 示例2:配置一个水表 // ModbusRtuDeviceConfig water_meter_config = { // .device_id = "water_meter_main_gate", // .port_path = "/dev/ttyS7", // 假设在同一条RS485总线上 // .baud_rate = 9600, // .slave_id = 0x05, // 不同的从站ID // .poll_interval_ms = 10000, // .data_points = { // {"total_flow", 0x0100, ModbusDataType::FLOAT32, 1.0}, // 地址256, 32位浮点数 // {"flow_rate", 0x0102, ModbusDataType::FLOAT32, 1.0} // 地址258, 32位浮点数 // } // }; // ModbusRtuPollerService water_meter_service(water_meter_config, report_to_mqtt); // water_meter_service.start(); spdlog::info("All services are running. Press Ctrl+C to exit."); g_io_context.run(); spdlog::info("Shutting down MQTT client..."); mqtt_client.disconnect(); } catch (const std::exception& e) { spdlog::critical("An unhandled exception occurred: {}", e.what()); return 1; } spdlog::info("Server has been shut down gracefully. Exiting."); return 0; }