新增设备实时状态缓存模块
This commit is contained in:
parent
461f4a3f32
commit
f06e9b5c7f
|
|
@ -1,7 +1,7 @@
|
|||
{
|
||||
"modbus_rtu_devices": [
|
||||
{
|
||||
"enabled": false,
|
||||
"enabled": true,
|
||||
"device_id": "rtu_temp_sensor_lab",
|
||||
"port_path": "/dev/ttyS7",
|
||||
"baud_rate": 9600,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,38 @@
|
|||
#ifndef LIVE_DATA_CACHE_H
|
||||
#define LIVE_DATA_CACHE_H
|
||||
|
||||
#include <string>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
|
||||
/**
|
||||
* @brief 一个线程安全的、用于存储所有设备最新上报数据的内存缓存。
|
||||
* 设计为单例或共享对象,供数据回调和API服务同时访问。
|
||||
*/
|
||||
class LiveDataCache {
|
||||
public:
|
||||
/**
|
||||
* @brief 更新或插入一个设备的最新数据。
|
||||
* @param device_id 设备的唯一ID。
|
||||
* @param json_data 设备上报的完整JSON数据字符串。
|
||||
*/
|
||||
void update_data(const std::string& device_id, const std::string& json_data) {
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
m_latest_data[device_id] = json_data;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief 获取所有设备最新数据的快照。
|
||||
* @return 一个从设备ID到其最新JSON数据的map。
|
||||
*/
|
||||
std::map<std::string, std::string> get_all_data() const {
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
return m_latest_data; // 返回一个副本以保证线程安全
|
||||
}
|
||||
|
||||
private:
|
||||
mutable std::mutex m_mutex;
|
||||
std::map<std::string, std::string> m_latest_data;
|
||||
};
|
||||
|
||||
#endif // LIVE_DATA_CACHE_H
|
||||
|
|
@ -9,6 +9,7 @@
|
|||
#include "dataCache/data_cache.h"
|
||||
#include "dataCache/cache_uploader.h"
|
||||
#include "web/web_server.h"
|
||||
#include "dataCache/live_data_cache.h"
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
|
|
@ -53,9 +54,11 @@ int main(int argc, char* argv[]) {
|
|||
}
|
||||
|
||||
try {
|
||||
DataCache data_cache;
|
||||
DataCache data_cache;
|
||||
LiveDataCache live_data_cache;
|
||||
MqttClient mqtt_client("tcp://mqtt-broker:1883", "edge-proxy-main-client");
|
||||
auto report_to_mqtt = [&](const UnifiedData& data) {
|
||||
live_data_cache.update_data(data.device_id, data.data_json);
|
||||
if (mqtt_client.is_connected()) {
|
||||
// 网络正常,直接上报
|
||||
std::string topic = "devices/" + data.device_id + "/data";
|
||||
|
|
@ -97,10 +100,9 @@ int main(int argc, char* argv[]) {
|
|||
|
||||
device_manager.load_and_start("../config/devices.json");
|
||||
|
||||
WebServer web_server(monitor, device_manager,8080);
|
||||
WebServer web_server(monitor, device_manager, live_data_cache, 8080);
|
||||
web_server.start();
|
||||
|
||||
// --- 2. <<< MODIFIED: 设置 Boost.Asio 风格的信号处理器 >>> ---
|
||||
boost::asio::signal_set signals(g_io_context, SIGINT, SIGTERM);
|
||||
signals.async_wait([&](const boost::system::error_code& error, int signal_number) {
|
||||
spdlog::warn("Interrupt signal ({}) received. Shutting down.", signal_number);
|
||||
|
|
|
|||
|
|
@ -3,9 +3,9 @@
|
|||
#include "spdlog/spdlog.h"
|
||||
|
||||
// 构造函数现在需要调用基类的构造函数
|
||||
WebServer::WebServer(SystemMonitor::SystemMonitor& monitor, DeviceManager& deviceManager, uint16_t port)
|
||||
WebServer::WebServer(SystemMonitor::SystemMonitor& monitor, DeviceManager& deviceManager, LiveDataCache& liveDataCache,uint16_t port)
|
||||
// 注意基类已经改为 crow::Crow<crow::CORSHandler>()
|
||||
: crow::Crow<crow::CORSHandler>(), m_monitor(monitor),m_device_manager(deviceManager), m_port(port)
|
||||
: crow::Crow<crow::CORSHandler>(), m_monitor(monitor), m_device_manager(deviceManager), m_live_data_cache(liveDataCache), m_port(port)
|
||||
{
|
||||
// ================= [ 新增 CORS 配置 ] =================
|
||||
// 获取 CORS 中间件的引用
|
||||
|
|
@ -65,7 +65,6 @@ void WebServer::setup_routes() {
|
|||
return response;
|
||||
});
|
||||
|
||||
// ================= [ 新增设备列表 API ] =================
|
||||
CROW_ROUTE((*this), "/api/devices").methods("GET"_method)
|
||||
([this] {
|
||||
// 这一行不变,从 DeviceManager 获取最新的设备信息
|
||||
|
|
@ -92,5 +91,22 @@ void WebServer::setup_routes() {
|
|||
|
||||
return crow::json::wvalue(devices_json);
|
||||
});
|
||||
// ==========================================================
|
||||
|
||||
// ================= [ 新增实时数据 API ] =================
|
||||
CROW_ROUTE((*this), "/api/data/latest").methods("GET"_method)
|
||||
([this] {
|
||||
// 从缓存中获取所有设备的最新数据
|
||||
auto latest_data_map = m_live_data_cache.get_all_data();
|
||||
|
||||
crow::json::wvalue response;
|
||||
for (const auto& pair : latest_data_map) {
|
||||
// pair.first 是 device_id (string)
|
||||
// pair.second 是 json_data (string)
|
||||
// 我们需要将 json_data 字符串再次解析为 crow 的 json 对象
|
||||
response[pair.first] = crow::json::load(pair.second);
|
||||
}
|
||||
|
||||
return response;
|
||||
});
|
||||
|
||||
}
|
||||
|
|
@ -8,6 +8,7 @@
|
|||
|
||||
#include "systemMonitor/system_monitor.h"
|
||||
#include "deviceManager/device_manager.h"
|
||||
#include "dataCache/live_data_cache.h"
|
||||
|
||||
#include <thread>
|
||||
|
||||
|
|
@ -16,7 +17,11 @@
|
|||
class WebServer : public crow::Crow<crow::CORSHandler> {
|
||||
|
||||
public:
|
||||
WebServer(SystemMonitor::SystemMonitor& monitor, DeviceManager& deviceManager, uint16_t port = 8080);
|
||||
WebServer(SystemMonitor::SystemMonitor& monitor,
|
||||
DeviceManager& deviceManager,
|
||||
LiveDataCache& liveDataCache,
|
||||
uint16_t port = 8080
|
||||
);
|
||||
~WebServer();
|
||||
|
||||
WebServer(const WebServer&) = delete;
|
||||
|
|
@ -30,6 +35,8 @@ private:
|
|||
|
||||
SystemMonitor::SystemMonitor& m_monitor;
|
||||
DeviceManager& m_device_manager;
|
||||
LiveDataCache& m_live_data_cache;
|
||||
|
||||
uint16_t m_port;
|
||||
|
||||
std::thread m_thread;
|
||||
|
|
|
|||
Loading…
Reference in New Issue