From 8147ac2f9c87c1dd23cb55524c6fc56c3d35df70 Mon Sep 17 00:00:00 2001 From: GuanYuankai Date: Tue, 14 Oct 2025 18:10:15 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=AD=E7=82=B9=E7=BB=AD=E4=BC=A0=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 5 ++ src/dataCache/cache_uploader.cc | 48 +++++++++++++ src/dataCache/cache_uploader.h | 27 ++++++++ src/dataCache/data_cache.cc | 115 ++++++++++++++++++++++++++++++++ src/dataCache/data_cache.h | 58 ++++++++++++++++ src/main.cpp | 43 +++++++++--- src/mqtt/mqtt_client.cpp | 30 ++++++--- src/mqtt/mqtt_client.h | 16 +++-- 8 files changed, 319 insertions(+), 23 deletions(-) create mode 100644 src/dataCache/cache_uploader.cc create mode 100644 src/dataCache/cache_uploader.h create mode 100644 src/dataCache/data_cache.cc create mode 100644 src/dataCache/data_cache.h diff --git a/CMakeLists.txt b/CMakeLists.txt index fe5de1b..5381081 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,6 +50,10 @@ add_library(edge_proxy_lib STATIC # --- Modbus Master --- src/protocol/modbus/modbus_protocol.cc src/modbus/modbus_master_poller.cc + + # 数据缓存/断点续传 + src/dataCache/data_cache.cc + src/dataCache/cache_uploader.cc ) target_include_directories(edge_proxy_lib PUBLIC @@ -61,6 +65,7 @@ target_link_libraries(edge_proxy_lib PRIVATE Boost::system Boost::thread PahoMqttCpp::paho-mqttpp3 + sqlite3 pthread nlohmann_json ) diff --git a/src/dataCache/cache_uploader.cc b/src/dataCache/cache_uploader.cc new file mode 100644 index 0000000..6c3d90b --- /dev/null +++ b/src/dataCache/cache_uploader.cc @@ -0,0 +1,48 @@ +// 文件名: src/cache_uploader.cc +#include "cache_uploader.h" +#include "spdlog/spdlog.h" + +CacheUploader::CacheUploader(boost::asio::io_context& io_context, MqttClient& mqtt_client, DataCache& data_cache) + : m_io_context(io_context), + m_mqtt_client(mqtt_client), + m_data_cache(data_cache) {} + +void CacheUploader::start_upload() { + // 防止重复启动 + if (m_is_uploading) { + return; + } + // 将任务 post 到 io_context 中,以确保在主线程上执行 + boost::asio::post(m_io_context, [this]() { + if (!m_is_uploading) { + m_is_uploading = true; + spdlog::info("Starting cached data upload..."); + do_upload_batch(); + } + }); +} + +void CacheUploader::do_upload_batch() { + auto batch = m_data_cache.get_batch(50); // 一次处理50条 + + if (batch.empty()) { + spdlog::info("Data cache is empty. Upload finished."); + m_is_uploading = false; + return; + } + + std::vector successfully_sent_ids; + for (const auto& data : batch) { + std::string topic = "devices/" + data.device_id + "/data"; + m_mqtt_client.publish(topic, data.data_json, 1, false); + successfully_sent_ids.push_back(data.id); + } + + m_data_cache.delete_batch(successfully_sent_ids); + spdlog::info("Uploaded and deleted {} records from cache.", successfully_sent_ids.size()); + + // 调度下一次上传,这会给其他asio事件处理的机会 + boost::asio::post(m_io_context, [this]() { + do_upload_batch(); + }); +} \ No newline at end of file diff --git a/src/dataCache/cache_uploader.h b/src/dataCache/cache_uploader.h new file mode 100644 index 0000000..14b45ba --- /dev/null +++ b/src/dataCache/cache_uploader.h @@ -0,0 +1,27 @@ +// 文件名: src/cache_uploader.h +#ifndef CACHE_UPLOADER_H +#define CACHE_UPLOADER_H + +#include "data_cache.h" +#include "mqtt/mqtt_client.h" +#include +#include + +class CacheUploader { +public: + CacheUploader(boost::asio::io_context& io_context, MqttClient& mqtt_client, DataCache& data_cache); + + // 开始上传缓存数据 + void start_upload(); + +private: + // 异步上传一个批次的数据 + void do_upload_batch(); + + boost::asio::io_context& m_io_context; + MqttClient& m_mqtt_client; + DataCache& m_data_cache; + std::atomic m_is_uploading{false}; +}; + +#endif // CACHE_UPLOADER_H \ No newline at end of file diff --git a/src/dataCache/data_cache.cc b/src/dataCache/data_cache.cc new file mode 100644 index 0000000..2ab7986 --- /dev/null +++ b/src/dataCache/data_cache.cc @@ -0,0 +1,115 @@ +// 文件名: src/data_cache.cc +#include "data_cache.h" +#include "spdlog/spdlog.h" + +DataCache::DataCache() {} + +DataCache::~DataCache() { + if (m_db) { + sqlite3_close(m_db); + m_db = nullptr; + } +} + +bool DataCache::open(const std::string& db_path) { + if (sqlite3_open(db_path.c_str(), &m_db) != SQLITE_OK) { + spdlog::critical("Can't open database: {}", sqlite3_errmsg(m_db)); + return false; + } + + const char* sql = "CREATE TABLE IF NOT EXISTS data_cache (" + "id INTEGER PRIMARY KEY AUTOINCREMENT," + "timestamp_ms INTEGER NOT NULL," + "device_id TEXT NOT NULL," + "data_json TEXT NOT NULL);"; + + char* err_msg = nullptr; + if (sqlite3_exec(m_db, sql, 0, 0, &err_msg) != SQLITE_OK) { + spdlog::critical("Failed to create table: {}", err_msg); + sqlite3_free(err_msg); + return false; + } + + spdlog::info("Data cache database opened at '{}'", db_path); + return true; +} + +bool DataCache::add(const UnifiedData& data) { + std::lock_guard lock(m_mutex); // 保证线程安全 + + const char* sql = "INSERT INTO data_cache (timestamp_ms, device_id, data_json) VALUES (?, ?, ?);"; + sqlite3_stmt* stmt = nullptr; + + if (sqlite3_prepare_v2(m_db, sql, -1, &stmt, 0) != SQLITE_OK) { + spdlog::error("Failed to prepare statement: {}", sqlite3_errmsg(m_db)); + return false; + } + + sqlite3_bind_int64(stmt, 1, data.timestamp_ms); + sqlite3_bind_text(stmt, 2, data.device_id.c_str(), -1, SQLITE_STATIC); + sqlite3_bind_text(stmt, 3, data.data_json.c_str(), -1, SQLITE_STATIC); + + if (sqlite3_step(stmt) != SQLITE_DONE) { + spdlog::error("Failed to execute statement: {}", sqlite3_errmsg(m_db)); + sqlite3_finalize(stmt); + return false; + } + + sqlite3_finalize(stmt); + return true; +} + +std::vector DataCache::get_batch(int limit) { + std::vector results; + const char* sql = "SELECT id, timestamp_ms, device_id, data_json FROM data_cache ORDER BY id ASC LIMIT ?;"; + sqlite3_stmt* stmt = nullptr; + + if (sqlite3_prepare_v2(m_db, sql, -1, &stmt, 0) != SQLITE_OK) { + spdlog::error("Failed to prepare statement for get_batch: {}", sqlite3_errmsg(m_db)); + return results; + } + + sqlite3_bind_int(stmt, 1, limit); + + while (sqlite3_step(stmt) == SQLITE_ROW) { + CachedData data; + data.id = sqlite3_column_int64(stmt, 0); + data.timestamp_ms = sqlite3_column_int64(stmt, 1); + data.device_id = reinterpret_cast(sqlite3_column_text(stmt, 2)); + data.data_json = reinterpret_cast(sqlite3_column_text(stmt, 3)); + results.push_back(data); + } + + sqlite3_finalize(stmt); + return results; +} + +bool DataCache::delete_batch(const std::vector& ids) { + if (ids.empty()) { + return true; + } + + std::lock_guard lock(m_mutex); + + sqlite3_exec(m_db, "BEGIN TRANSACTION;", 0, 0, 0); + + const char* sql = "DELETE FROM data_cache WHERE id = ?;"; + sqlite3_stmt* stmt = nullptr; + if (sqlite3_prepare_v2(m_db, sql, -1, &stmt, 0) != SQLITE_OK) { + spdlog::error("Failed to prepare statement for delete_batch: {}", sqlite3_errmsg(m_db)); + sqlite3_exec(m_db, "ROLLBACK;", 0, 0, 0); + return false; + } + + for (int64_t id : ids) { + sqlite3_bind_int64(stmt, 1, id); + if (sqlite3_step(stmt) != SQLITE_DONE) { + spdlog::error("Failed to delete id {}: {}", id, sqlite3_errmsg(m_db)); + } + sqlite3_reset(stmt); + } + + sqlite3_finalize(stmt); + sqlite3_exec(m_db, "COMMIT;", 0, 0, 0); + return true; +} \ No newline at end of file diff --git a/src/dataCache/data_cache.h b/src/dataCache/data_cache.h new file mode 100644 index 0000000..279f947 --- /dev/null +++ b/src/dataCache/data_cache.h @@ -0,0 +1,58 @@ +// 文件名: src/data_cache.h +#ifndef DATA_CACHE_H +#define DATA_CACHE_H + +#include "protocol/iprotocol_adapter.h" // For UnifiedData +#include +#include +#include +#include + +// 为缓存数据添加一个唯一的ID +struct CachedData : public UnifiedData { + int64_t id; +}; + +class DataCache { +public: + DataCache(); + ~DataCache(); + + // 禁止拷贝和赋值 + DataCache(const DataCache&) = delete; + DataCache& operator=(const DataCache&) = delete; + + /** + * @brief 打开(或创建)一个数据库文件并初始化表 + * @param db_path 数据库文件的路径 + * @return 如果成功则返回 true + */ + bool open(const std::string& db_path); + + /** + * @brief 向缓存中添加一条数据 (线程安全) + * @param data 要缓存的 UnifiedData + * @return 如果成功则返回 true + */ + bool add(const UnifiedData& data); + + /** + * @brief 从缓存中获取一批最老的数据 + * @param limit 本次获取的最大数量 + * @return 包含缓存数据的向量 + */ + std::vector get_batch(int limit = 100); + + /** + * @brief 根据ID列表,从缓存中删除一批数据 + * @param ids 要删除的数据的ID列表 + * @return 如果成功则返回 true + */ + bool delete_batch(const std::vector& ids); + +private: + sqlite3* m_db = nullptr; + std::mutex m_mutex; // 用于保证对数据库写入的线程安全 +}; + +#endif // DATA_CACHE_H \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index e08de3b..213eac3 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -5,7 +5,9 @@ #include "mqtt/mqtt_router.h" #include "systemMonitor/system_monitor.h" #include "spdlog/spdlog.h" -#include "deviceManager/device_manager.h" // <<< MODIFIED: 包含新的设备管理器头文件 +#include "deviceManager/device_manager.h" +#include "dataCache/data_cache.h" +#include "dataCache/cache_uploader.h" #include #include @@ -32,6 +34,7 @@ void poll_system_metrics( SystemMonitor::SystemMonitor& monitor, MqttClient& mqtt_client ) { + // ... 此函数内部逻辑保持不变 ... auto cpu_util = monitor.getCpuUtilization(); auto mem_info = monitor.getMemoryInfo(); double mem_total_gb = mem_info.total_kb / 1024.0 / 1024.0; @@ -67,6 +70,22 @@ int main(int argc, char* argv[]) { TCPServer tcp_server(g_io_context, listen_ports, mqtt_client); SystemMonitor::SystemMonitor monitor; + // <<< MODIFIED: 初始化数据缓存和上传器 >>> + DataCache data_cache; + if (!data_cache.open("edge_data_cache.db")) { // 数据库文件将创建在程序运行目录 + spdlog::critical("Failed to initialize data cache. Exiting."); + return 1; + } + CacheUploader cache_uploader(g_io_context, mqtt_client, data_cache); + + // <<< MODIFIED: 设置MQTT连接成功的回调,用于触发断点续传 >>> + mqtt_client.set_connected_handler([&](const std::string& cause){ + spdlog::info("MQTT client connected: {}", cause); + // 当连接成功时,启动缓存上传 + cache_uploader.start_upload(); + }); + + // 现在再连接MQTT mqtt_client.connect(); mqtt_router.start(); @@ -75,16 +94,22 @@ int main(int argc, char* argv[]) { boost::asio::steady_timer system_monitor_timer(g_io_context, std::chrono::seconds(15)); system_monitor_timer.async_wait(std::bind(poll_system_metrics, std::ref(system_monitor_timer), std::ref(monitor), std::ref(mqtt_client))); - // --- 3. 创建统一的数据上报回调函数 --- + // --- 3. <<< MODIFIED: 创建包含缓存逻辑的统一数据上报回调函数 >>> auto report_to_mqtt = [&](const UnifiedData& data) { - std::string topic = "devices/" + data.device_id + "/data"; - // 使用 post 确保 MQTT 发布操作在 io_context 的主事件循环中执行,保证线程安全 - g_io_context.post([&, topic, payload = data.data_json]() { - mqtt_client.publish(topic, payload, 1, false); - }); + if (mqtt_client.is_connected()) { + // 网络正常,直接上报 + std::string topic = "devices/" + data.device_id + "/data"; + g_io_context.post([&, topic, payload = data.data_json]() { + mqtt_client.publish(topic, payload, 1, false); + }); + } else { + // 网络断开,写入缓存 + spdlog::warn("MQTT disconnected. Caching data for device '{}'.", data.device_id); + data_cache.add(data); + } }; - // --- 4. 实例化设备管理器并从文件加载所有设备 --- + // --- 4. 实例化设备管理器并从文件加载所有设备 (使用新的回调) --- DeviceManager device_manager(g_io_context, report_to_mqtt); // 默认从程序运行目录下的 "devices.json" 文件加载 device_manager.load_and_start("../config/devices.json"); @@ -94,8 +119,6 @@ int main(int argc, char* argv[]) { g_io_context.run(); // --- 清理工作 --- - // io_context.run() 返回后,程序将退出 try 块。 - // device_manager 的析构函数会自动被调用,它会负责停止所有设备服务线程。 spdlog::info("Shutting down MQTT client..."); mqtt_client.disconnect(); diff --git a/src/mqtt/mqtt_client.cpp b/src/mqtt/mqtt_client.cpp index a9d9c15..c66da19 100644 --- a/src/mqtt/mqtt_client.cpp +++ b/src/mqtt/mqtt_client.cpp @@ -1,16 +1,21 @@ +// 文件名: mqtt_client.cpp #include "mqtt_client.h" #include "spdlog/spdlog.h" MqttClient::MqttClient(const std::string& server_uri, const std::string& client_id) : client_(server_uri, client_id) { - // 配置连接选项 - conn_opts_.set_clean_session(true); // 清理会话 - conn_opts_.set_automatic_reconnect(true); // 开启自动重连 + conn_opts_.set_clean_session(true); + conn_opts_.set_automatic_reconnect(true); - // 设置一些基本的回调,用于打印日志,方便调试 - client_.set_connected_handler([](const std::string& cause) { + // <<< MODIFIED: 修改连接成功的回调逻辑 >>> + // 这个回调现在会先打印日志,然后检查并调用外部设置的回调函数 + client_.set_connected_handler([this](const std::string& cause) { spdlog::info("MQTT client connected: {}", cause); + // 如果外部设置了回调函数,就调用它 + if (m_connected_handler) { + m_connected_handler(cause); + } }); client_.set_connection_lost_handler([](const std::string& cause) { @@ -23,12 +28,10 @@ MqttClient::MqttClient(const std::string& server_uri, const std::string& client_ void MqttClient::connect() { try { spdlog::info("Connecting to MQTT broker..."); - // connect() 返回一个 token,我们可以 .wait() 来同步等待操作完成 client_.connect(conn_opts_)->wait(); } catch (const mqtt::exception& exc) { spdlog::error("Failed to connect to MQTT broker: {}", exc.what()); - // 在实际项目中,这里可能需要更复杂的重试或异常处理逻辑 - throw; // 向上抛出异常,让主程序知道连接失败 + throw; } } @@ -48,7 +51,7 @@ void MqttClient::publish(const std::string& topic, const std::string& payload, i } try { auto msg = mqtt::make_message(topic, payload, qos, retained); - client_.publish(msg)->wait_for(std::chrono::seconds(2)); // 等待发布完成,设置超时 + client_.publish(msg)->wait_for(std::chrono::seconds(2)); spdlog::debug("Published message to topic '{}': {}", topic, payload); } catch (const mqtt::exception& exc) { spdlog::error("Failed to publish to topic '{}': {}", topic, exc.what()); @@ -70,4 +73,13 @@ void MqttClient::subscribe(const std::string& topic, int qos) { void MqttClient::set_message_callback(message_callback cb) { client_.set_message_callback(std::move(cb)); +} + +// <<< MODIFIED: 新增方法的实现 >>> +void MqttClient::set_connected_handler(connection_handler cb) { + m_connected_handler = std::move(cb); +} + +bool MqttClient::is_connected() const { + return client_.is_connected(); } \ No newline at end of file diff --git a/src/mqtt/mqtt_client.h b/src/mqtt/mqtt_client.h index 3249ad5..a6038aa 100644 --- a/src/mqtt/mqtt_client.h +++ b/src/mqtt/mqtt_client.h @@ -1,3 +1,4 @@ +// 文件名: mqtt_client.h #pragma once #include "mqtt/async_client.h" @@ -6,8 +7,10 @@ class MqttClient { public: - // 为消息回调函数定义一个清晰的类型别名 + // 为回调函数定义清晰的类型别名 using message_callback = std::function; + // <<< MODIFIED: 新增连接成功回调的类型别名 + using connection_handler = std::function; // 构造函数 MqttClient(const std::string& server_uri, const std::string& client_id); @@ -17,13 +20,18 @@ public: void disconnect(); void publish(const std::string& topic, const std::string& payload, int qos = 1, bool retained = false); void subscribe(const std::string& topic, int qos = 1); + // <<< MODIFIED: 新增查询连接状态的接口 + bool is_connected() const; - // 设置消息到达时的回调函数 + // 设置回调函数 void set_message_callback(message_callback cb); + // <<< MODIFIED: 新增设置连接成功回调的接口 + void set_connected_handler(connection_handler cb); private: - // Paho 异步客户端实例 mqtt::async_client client_; - // 连接选项 mqtt::connect_options conn_opts_; + + // <<< MODIFIED: 新增成员变量以存储外部设置的回调 + connection_handler m_connected_handler; }; \ No newline at end of file