断点续传功能开发

This commit is contained in:
GuanYuankai 2025-10-14 18:10:15 +08:00
parent 5d7a396221
commit 8147ac2f9c
8 changed files with 319 additions and 23 deletions

View File

@ -50,6 +50,10 @@ add_library(edge_proxy_lib STATIC
# --- Modbus Master ---
src/protocol/modbus/modbus_protocol.cc
src/modbus/modbus_master_poller.cc
# /
src/dataCache/data_cache.cc
src/dataCache/cache_uploader.cc
)
target_include_directories(edge_proxy_lib PUBLIC
@ -61,6 +65,7 @@ target_link_libraries(edge_proxy_lib PRIVATE
Boost::system
Boost::thread
PahoMqttCpp::paho-mqttpp3
sqlite3
pthread
nlohmann_json
)

View File

@ -0,0 +1,48 @@
// 文件名: src/cache_uploader.cc
#include "cache_uploader.h"
#include "spdlog/spdlog.h"
CacheUploader::CacheUploader(boost::asio::io_context& io_context, MqttClient& mqtt_client, DataCache& data_cache)
: m_io_context(io_context),
m_mqtt_client(mqtt_client),
m_data_cache(data_cache) {}
void CacheUploader::start_upload() {
// 防止重复启动
if (m_is_uploading) {
return;
}
// 将任务 post 到 io_context 中,以确保在主线程上执行
boost::asio::post(m_io_context, [this]() {
if (!m_is_uploading) {
m_is_uploading = true;
spdlog::info("Starting cached data upload...");
do_upload_batch();
}
});
}
void CacheUploader::do_upload_batch() {
auto batch = m_data_cache.get_batch(50); // 一次处理50条
if (batch.empty()) {
spdlog::info("Data cache is empty. Upload finished.");
m_is_uploading = false;
return;
}
std::vector<int64_t> successfully_sent_ids;
for (const auto& data : batch) {
std::string topic = "devices/" + data.device_id + "/data";
m_mqtt_client.publish(topic, data.data_json, 1, false);
successfully_sent_ids.push_back(data.id);
}
m_data_cache.delete_batch(successfully_sent_ids);
spdlog::info("Uploaded and deleted {} records from cache.", successfully_sent_ids.size());
// 调度下一次上传这会给其他asio事件处理的机会
boost::asio::post(m_io_context, [this]() {
do_upload_batch();
});
}

View File

@ -0,0 +1,27 @@
// 文件名: src/cache_uploader.h
#ifndef CACHE_UPLOADER_H
#define CACHE_UPLOADER_H
#include "data_cache.h"
#include "mqtt/mqtt_client.h"
#include <boost/asio.hpp>
#include <atomic>
class CacheUploader {
public:
CacheUploader(boost::asio::io_context& io_context, MqttClient& mqtt_client, DataCache& data_cache);
// 开始上传缓存数据
void start_upload();
private:
// 异步上传一个批次的数据
void do_upload_batch();
boost::asio::io_context& m_io_context;
MqttClient& m_mqtt_client;
DataCache& m_data_cache;
std::atomic<bool> m_is_uploading{false};
};
#endif // CACHE_UPLOADER_H

115
src/dataCache/data_cache.cc Normal file
View File

@ -0,0 +1,115 @@
// 文件名: src/data_cache.cc
#include "data_cache.h"
#include "spdlog/spdlog.h"
DataCache::DataCache() {}
DataCache::~DataCache() {
if (m_db) {
sqlite3_close(m_db);
m_db = nullptr;
}
}
bool DataCache::open(const std::string& db_path) {
if (sqlite3_open(db_path.c_str(), &m_db) != SQLITE_OK) {
spdlog::critical("Can't open database: {}", sqlite3_errmsg(m_db));
return false;
}
const char* sql = "CREATE TABLE IF NOT EXISTS data_cache ("
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
"timestamp_ms INTEGER NOT NULL,"
"device_id TEXT NOT NULL,"
"data_json TEXT NOT NULL);";
char* err_msg = nullptr;
if (sqlite3_exec(m_db, sql, 0, 0, &err_msg) != SQLITE_OK) {
spdlog::critical("Failed to create table: {}", err_msg);
sqlite3_free(err_msg);
return false;
}
spdlog::info("Data cache database opened at '{}'", db_path);
return true;
}
bool DataCache::add(const UnifiedData& data) {
std::lock_guard<std::mutex> lock(m_mutex); // 保证线程安全
const char* sql = "INSERT INTO data_cache (timestamp_ms, device_id, data_json) VALUES (?, ?, ?);";
sqlite3_stmt* stmt = nullptr;
if (sqlite3_prepare_v2(m_db, sql, -1, &stmt, 0) != SQLITE_OK) {
spdlog::error("Failed to prepare statement: {}", sqlite3_errmsg(m_db));
return false;
}
sqlite3_bind_int64(stmt, 1, data.timestamp_ms);
sqlite3_bind_text(stmt, 2, data.device_id.c_str(), -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 3, data.data_json.c_str(), -1, SQLITE_STATIC);
if (sqlite3_step(stmt) != SQLITE_DONE) {
spdlog::error("Failed to execute statement: {}", sqlite3_errmsg(m_db));
sqlite3_finalize(stmt);
return false;
}
sqlite3_finalize(stmt);
return true;
}
std::vector<CachedData> DataCache::get_batch(int limit) {
std::vector<CachedData> results;
const char* sql = "SELECT id, timestamp_ms, device_id, data_json FROM data_cache ORDER BY id ASC LIMIT ?;";
sqlite3_stmt* stmt = nullptr;
if (sqlite3_prepare_v2(m_db, sql, -1, &stmt, 0) != SQLITE_OK) {
spdlog::error("Failed to prepare statement for get_batch: {}", sqlite3_errmsg(m_db));
return results;
}
sqlite3_bind_int(stmt, 1, limit);
while (sqlite3_step(stmt) == SQLITE_ROW) {
CachedData data;
data.id = sqlite3_column_int64(stmt, 0);
data.timestamp_ms = sqlite3_column_int64(stmt, 1);
data.device_id = reinterpret_cast<const char*>(sqlite3_column_text(stmt, 2));
data.data_json = reinterpret_cast<const char*>(sqlite3_column_text(stmt, 3));
results.push_back(data);
}
sqlite3_finalize(stmt);
return results;
}
bool DataCache::delete_batch(const std::vector<int64_t>& ids) {
if (ids.empty()) {
return true;
}
std::lock_guard<std::mutex> lock(m_mutex);
sqlite3_exec(m_db, "BEGIN TRANSACTION;", 0, 0, 0);
const char* sql = "DELETE FROM data_cache WHERE id = ?;";
sqlite3_stmt* stmt = nullptr;
if (sqlite3_prepare_v2(m_db, sql, -1, &stmt, 0) != SQLITE_OK) {
spdlog::error("Failed to prepare statement for delete_batch: {}", sqlite3_errmsg(m_db));
sqlite3_exec(m_db, "ROLLBACK;", 0, 0, 0);
return false;
}
for (int64_t id : ids) {
sqlite3_bind_int64(stmt, 1, id);
if (sqlite3_step(stmt) != SQLITE_DONE) {
spdlog::error("Failed to delete id {}: {}", id, sqlite3_errmsg(m_db));
}
sqlite3_reset(stmt);
}
sqlite3_finalize(stmt);
sqlite3_exec(m_db, "COMMIT;", 0, 0, 0);
return true;
}

View File

@ -0,0 +1,58 @@
// 文件名: src/data_cache.h
#ifndef DATA_CACHE_H
#define DATA_CACHE_H
#include "protocol/iprotocol_adapter.h" // For UnifiedData
#include <string>
#include <vector>
#include <mutex>
#include <sqlite3.h>
// 为缓存数据添加一个唯一的ID
struct CachedData : public UnifiedData {
int64_t id;
};
class DataCache {
public:
DataCache();
~DataCache();
// 禁止拷贝和赋值
DataCache(const DataCache&) = delete;
DataCache& operator=(const DataCache&) = delete;
/**
* @brief
* @param db_path
* @return true
*/
bool open(const std::string& db_path);
/**
* @brief (线)
* @param data UnifiedData
* @return true
*/
bool add(const UnifiedData& data);
/**
* @brief
* @param limit
* @return
*/
std::vector<CachedData> get_batch(int limit = 100);
/**
* @brief ID列表
* @param ids ID列表
* @return true
*/
bool delete_batch(const std::vector<int64_t>& ids);
private:
sqlite3* m_db = nullptr;
std::mutex m_mutex; // 用于保证对数据库写入的线程安全
};
#endif // DATA_CACHE_H

View File

@ -5,7 +5,9 @@
#include "mqtt/mqtt_router.h"
#include "systemMonitor/system_monitor.h"
#include "spdlog/spdlog.h"
#include "deviceManager/device_manager.h" // <<< MODIFIED: 包含新的设备管理器头文件
#include "deviceManager/device_manager.h"
#include "dataCache/data_cache.h"
#include "dataCache/cache_uploader.h"
#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
@ -32,6 +34,7 @@ void poll_system_metrics(
SystemMonitor::SystemMonitor& monitor,
MqttClient& mqtt_client
) {
// ... 此函数内部逻辑保持不变 ...
auto cpu_util = monitor.getCpuUtilization();
auto mem_info = monitor.getMemoryInfo();
double mem_total_gb = mem_info.total_kb / 1024.0 / 1024.0;
@ -67,6 +70,22 @@ int main(int argc, char* argv[]) {
TCPServer tcp_server(g_io_context, listen_ports, mqtt_client);
SystemMonitor::SystemMonitor monitor;
// <<< MODIFIED: 初始化数据缓存和上传器 >>>
DataCache data_cache;
if (!data_cache.open("edge_data_cache.db")) { // 数据库文件将创建在程序运行目录
spdlog::critical("Failed to initialize data cache. Exiting.");
return 1;
}
CacheUploader cache_uploader(g_io_context, mqtt_client, data_cache);
// <<< MODIFIED: 设置MQTT连接成功的回调用于触发断点续传 >>>
mqtt_client.set_connected_handler([&](const std::string& cause){
spdlog::info("MQTT client connected: {}", cause);
// 当连接成功时,启动缓存上传
cache_uploader.start_upload();
});
// 现在再连接MQTT
mqtt_client.connect();
mqtt_router.start();
@ -75,16 +94,22 @@ int main(int argc, char* argv[]) {
boost::asio::steady_timer system_monitor_timer(g_io_context, std::chrono::seconds(15));
system_monitor_timer.async_wait(std::bind(poll_system_metrics, std::ref(system_monitor_timer), std::ref(monitor), std::ref(mqtt_client)));
// --- 3. 创建统一的数据上报回调函数 ---
// --- 3. <<< MODIFIED: 创建包含缓存逻辑的统一数据上报回调函数 >>>
auto report_to_mqtt = [&](const UnifiedData& data) {
std::string topic = "devices/" + data.device_id + "/data";
// 使用 post 确保 MQTT 发布操作在 io_context 的主事件循环中执行,保证线程安全
g_io_context.post([&, topic, payload = data.data_json]() {
mqtt_client.publish(topic, payload, 1, false);
});
if (mqtt_client.is_connected()) {
// 网络正常,直接上报
std::string topic = "devices/" + data.device_id + "/data";
g_io_context.post([&, topic, payload = data.data_json]() {
mqtt_client.publish(topic, payload, 1, false);
});
} else {
// 网络断开,写入缓存
spdlog::warn("MQTT disconnected. Caching data for device '{}'.", data.device_id);
data_cache.add(data);
}
};
// --- 4. 实例化设备管理器并从文件加载所有设备 ---
// --- 4. 实例化设备管理器并从文件加载所有设备 (使用新的回调) ---
DeviceManager device_manager(g_io_context, report_to_mqtt);
// 默认从程序运行目录下的 "devices.json" 文件加载
device_manager.load_and_start("../config/devices.json");
@ -94,8 +119,6 @@ int main(int argc, char* argv[]) {
g_io_context.run();
// --- 清理工作 ---
// io_context.run() 返回后,程序将退出 try 块。
// device_manager 的析构函数会自动被调用,它会负责停止所有设备服务线程。
spdlog::info("Shutting down MQTT client...");
mqtt_client.disconnect();

View File

@ -1,16 +1,21 @@
// 文件名: mqtt_client.cpp
#include "mqtt_client.h"
#include "spdlog/spdlog.h"
MqttClient::MqttClient(const std::string& server_uri, const std::string& client_id)
: client_(server_uri, client_id) {
// 配置连接选项
conn_opts_.set_clean_session(true); // 清理会话
conn_opts_.set_automatic_reconnect(true); // 开启自动重连
conn_opts_.set_clean_session(true);
conn_opts_.set_automatic_reconnect(true);
// 设置一些基本的回调,用于打印日志,方便调试
client_.set_connected_handler([](const std::string& cause) {
// <<< MODIFIED: 修改连接成功的回调逻辑 >>>
// 这个回调现在会先打印日志,然后检查并调用外部设置的回调函数
client_.set_connected_handler([this](const std::string& cause) {
spdlog::info("MQTT client connected: {}", cause);
// 如果外部设置了回调函数,就调用它
if (m_connected_handler) {
m_connected_handler(cause);
}
});
client_.set_connection_lost_handler([](const std::string& cause) {
@ -23,12 +28,10 @@ MqttClient::MqttClient(const std::string& server_uri, const std::string& client_
void MqttClient::connect() {
try {
spdlog::info("Connecting to MQTT broker...");
// connect() 返回一个 token我们可以 .wait() 来同步等待操作完成
client_.connect(conn_opts_)->wait();
} catch (const mqtt::exception& exc) {
spdlog::error("Failed to connect to MQTT broker: {}", exc.what());
// 在实际项目中,这里可能需要更复杂的重试或异常处理逻辑
throw; // 向上抛出异常,让主程序知道连接失败
throw;
}
}
@ -48,7 +51,7 @@ void MqttClient::publish(const std::string& topic, const std::string& payload, i
}
try {
auto msg = mqtt::make_message(topic, payload, qos, retained);
client_.publish(msg)->wait_for(std::chrono::seconds(2)); // 等待发布完成,设置超时
client_.publish(msg)->wait_for(std::chrono::seconds(2));
spdlog::debug("Published message to topic '{}': {}", topic, payload);
} catch (const mqtt::exception& exc) {
spdlog::error("Failed to publish to topic '{}': {}", topic, exc.what());
@ -70,4 +73,13 @@ void MqttClient::subscribe(const std::string& topic, int qos) {
void MqttClient::set_message_callback(message_callback cb) {
client_.set_message_callback(std::move(cb));
}
// <<< MODIFIED: 新增方法的实现 >>>
void MqttClient::set_connected_handler(connection_handler cb) {
m_connected_handler = std::move(cb);
}
bool MqttClient::is_connected() const {
return client_.is_connected();
}

View File

@ -1,3 +1,4 @@
// 文件名: mqtt_client.h
#pragma once
#include "mqtt/async_client.h"
@ -6,8 +7,10 @@
class MqttClient {
public:
// 为消息回调函数定义一个清晰的类型别名
// 为回调函数定义清晰的类型别名
using message_callback = std::function<void(mqtt::const_message_ptr)>;
// <<< MODIFIED: 新增连接成功回调的类型别名
using connection_handler = std::function<void(const std::string&)>;
// 构造函数
MqttClient(const std::string& server_uri, const std::string& client_id);
@ -17,13 +20,18 @@ public:
void disconnect();
void publish(const std::string& topic, const std::string& payload, int qos = 1, bool retained = false);
void subscribe(const std::string& topic, int qos = 1);
// <<< MODIFIED: 新增查询连接状态的接口
bool is_connected() const;
// 设置消息到达时的回调函数
// 设置回调函数
void set_message_callback(message_callback cb);
// <<< MODIFIED: 新增设置连接成功回调的接口
void set_connected_handler(connection_handler cb);
private:
// Paho 异步客户端实例
mqtt::async_client client_;
// 连接选项
mqtt::connect_options conn_opts_;
// <<< MODIFIED: 新增成员变量以存储外部设置的回调
connection_handler m_connected_handler;
};