Compare commits
6 Commits
fd293b3ce0
...
444ad2fd53
| Author | SHA1 | Date |
|---|---|---|
|
|
444ad2fd53 | |
|
|
5ea24af1a0 | |
|
|
f06e9b5c7f | |
|
|
461f4a3f32 | |
|
|
803bfcc377 | |
|
|
480bfdd94f |
|
|
@ -1,13 +0,0 @@
|
|||
{
|
||||
"name": "Edge Proxy Dev (gyk)",
|
||||
"dockerComposeFile": [
|
||||
"../../docker-compose.yml"
|
||||
],
|
||||
"service": "edge-proxy-dev", // 指向共享的服务
|
||||
"workspaceFolder": "/app",
|
||||
"remoteUser": "gyk", // 只在进入时切换用户
|
||||
"customizations": {
|
||||
"vscode": { "extensions": ["ms-vscode.cpptools", "ms-vscode.cmake-tools"] }
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,12 +0,0 @@
|
|||
{
|
||||
"name": "Edge Proxy Dev (zql)",
|
||||
"dockerComposeFile": [
|
||||
"../../docker-compose.yml"
|
||||
],
|
||||
"service": "edge-proxy-dev", // 指向共享的服务
|
||||
"workspaceFolder": "/app",
|
||||
"remoteUser": "zql", // 只在进入时切换用户
|
||||
"customizations": {
|
||||
"vscode": { "extensions": ["ms-vscode.cpptools", "ms-vscode.cmake-tools"] }
|
||||
}
|
||||
}
|
||||
|
|
@ -73,6 +73,8 @@
|
|||
"variant": "cpp",
|
||||
"forward_list": "cpp",
|
||||
"ranges": "cpp",
|
||||
"valarray": "cpp"
|
||||
"valarray": "cpp",
|
||||
"charconv": "cpp",
|
||||
"unordered_set": "cpp"
|
||||
}
|
||||
}
|
||||
|
|
@ -49,7 +49,7 @@ add_library(edge_proxy_lib STATIC
|
|||
|
||||
#modbus
|
||||
src/modbus/modbus_rtu_client.cc
|
||||
src/modbus/modbus_rtu_poller_service.cc
|
||||
src/modbus/modbus_rtu_bus_service.cc
|
||||
src/modbus/generic_modbus_parser.cc
|
||||
# --- Modbus Master ---
|
||||
src/protocol/modbus/modbus_protocol.cc
|
||||
|
|
|
|||
|
|
@ -13,14 +13,15 @@
|
|||
]
|
||||
},
|
||||
{
|
||||
"enabled": false,
|
||||
"device_id": "water_meter_main_gate",
|
||||
"enabled": true,
|
||||
"device_id": "rotary encoder",
|
||||
"port_path": "/dev/ttyS7",
|
||||
"baud_rate": 9600,
|
||||
"slave_id": 5,
|
||||
"poll_interval_ms": 10000,
|
||||
"slave_id": 111,
|
||||
"poll_interval_ms": 5000,
|
||||
"data_points": [
|
||||
{"name": "total_flow", "address": 256, "type": "FLOAT32", "scale": 1.0}
|
||||
{"name": "count", "address": 1, "type": "INT16", "scale": 1.0},
|
||||
{"name": "total_count", "address": 2, "type": "INT16", "scale": 1.0}
|
||||
]
|
||||
},
|
||||
{
|
||||
|
|
@ -49,5 +50,10 @@
|
|||
{"name": "valve_status","address": 104, "type": "UINT16", "scale": 1.0}
|
||||
]
|
||||
}
|
||||
]
|
||||
],
|
||||
"modbus_rtu_bus_configs": {
|
||||
"/dev/ttyS7": {
|
||||
"inter_device_delay_ms": 150
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
#ifndef LIVE_DATA_CACHE_H
|
||||
#define LIVE_DATA_CACHE_H
|
||||
|
||||
#include <string>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
|
||||
/**
|
||||
* @brief 一个线程安全的、用于存储所有设备最新上报数据的内存缓存。
|
||||
* 设计为单例或共享对象,供数据回调和API服务同时访问。
|
||||
*/
|
||||
class LiveDataCache {
|
||||
public:
|
||||
/**
|
||||
* @brief 更新或插入一个设备的最新数据。
|
||||
* @param device_id 设备的唯一ID。
|
||||
* @param json_data 设备上报的完整JSON数据字符串。
|
||||
*/
|
||||
void update_data(const std::string& device_id, const std::string& json_data) {
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
m_latest_data[device_id] = json_data;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief 获取所有设备最新数据的快照。
|
||||
* @return 一个从设备ID到其最新JSON数据的map。
|
||||
*/
|
||||
std::map<std::string, std::string> get_all_data() const {
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
return m_latest_data; // 返回一个副本以保证线程安全
|
||||
}
|
||||
|
||||
private:
|
||||
mutable std::mutex m_mutex;
|
||||
std::map<std::string, std::string> m_latest_data;
|
||||
};
|
||||
|
||||
#endif // LIVE_DATA_CACHE_H
|
||||
|
|
@ -8,7 +8,7 @@
|
|||
|
||||
using json = nlohmann::json;
|
||||
|
||||
// 辅助函数:将JSON中的字符串类型映射到C++枚举
|
||||
|
||||
static ModbusDataType string_to_modbus_data_type(const std::string& type_str) {
|
||||
static const std::map<std::string, ModbusDataType> type_map = {
|
||||
{"UINT16", ModbusDataType::UINT16},
|
||||
|
|
@ -44,8 +44,10 @@ void DeviceManager::load_and_start(const std::string& config_path) {
|
|||
try {
|
||||
json config_json = json::parse(config_file);
|
||||
|
||||
// --- 加载 Modbus RTU 设备 ---
|
||||
// --- MODIFIED: 加载 Modbus RTU 设备的逻辑 ---
|
||||
if (config_json.contains("modbus_rtu_devices")) {
|
||||
// 1. 先将所有启用的设备按 port_path (总线) 分组
|
||||
std::map<std::string, std::vector<ModbusRtuDeviceConfig>> rtu_device_groups;
|
||||
for (const auto& dev_json : config_json["modbus_rtu_devices"]) {
|
||||
if (!dev_json.value("enabled", false)) continue;
|
||||
|
||||
|
|
@ -61,18 +63,27 @@ void DeviceManager::load_and_start(const std::string& config_path) {
|
|||
dp_json.at("name").get<std::string>(),
|
||||
(uint16_t)dp_json.at("address").get<int>(),
|
||||
string_to_modbus_data_type(dp_json.at("type").get<std::string>()),
|
||||
dp_json.value("scale", 1.0) // .value for optional fields
|
||||
dp_json.value("scale", 1.0)
|
||||
});
|
||||
}
|
||||
|
||||
auto service = std::make_unique<ModbusRtuPollerService>(config, m_report_callback);
|
||||
// 将解析后的配置加入对应的分组
|
||||
rtu_device_groups[config.port_path].push_back(config);
|
||||
}
|
||||
|
||||
// 2. 为每个分组 (即每个物理串口) 创建一个 BusService
|
||||
for (const auto& pair : rtu_device_groups) {
|
||||
const std::string& port_path = pair.first;
|
||||
const std::vector<ModbusRtuDeviceConfig>& devices_on_bus = pair.second;
|
||||
|
||||
auto service = std::make_unique<ModbusRtuBusService>(port_path, devices_on_bus, m_report_callback);
|
||||
service->start();
|
||||
m_rtu_services.push_back(std::move(service));
|
||||
spdlog::info("Started Modbus RTU service for device '{}'.", config.device_id);
|
||||
m_rtu_bus_services.push_back(std::move(service));
|
||||
spdlog::info("Started Modbus RTU Bus service for port '{}' with {} device(s).", port_path, devices_on_bus.size());
|
||||
}
|
||||
}
|
||||
|
||||
// --- 加载 Modbus TCP 设备 ---
|
||||
// --- 加载 Modbus TCP 设备 (逻辑保持不变) ---
|
||||
if (config_json.contains("modbus_tcp_devices")) {
|
||||
for (const auto& dev_json : config_json["modbus_tcp_devices"]) {
|
||||
if (!dev_json.value("enabled", false)) continue;
|
||||
|
|
@ -84,7 +95,6 @@ void DeviceManager::load_and_start(const std::string& config_path) {
|
|||
config.slave_id = dev_json.at("slave_id").get<uint8_t>();
|
||||
config.poll_interval_ms = dev_json.at("poll_interval_ms").get<int>();
|
||||
|
||||
// 解析 data_points
|
||||
for (const auto& dp_json : dev_json.at("data_points")) {
|
||||
config.data_points.push_back({
|
||||
dp_json.at("name").get<std::string>(),
|
||||
|
|
@ -108,56 +118,83 @@ void DeviceManager::load_and_start(const std::string& config_path) {
|
|||
}
|
||||
|
||||
void DeviceManager::stop_all() {
|
||||
// --- <<< 添加锁 >>> ---
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
spdlog::info("Stopping all device services...");
|
||||
for (auto& service : m_rtu_services) {
|
||||
|
||||
// 停止所有 RTU 总线服务
|
||||
for (auto& service : m_rtu_bus_services) {
|
||||
service->stop();
|
||||
}
|
||||
// For Asio-based objects, stopping is usually handled by stopping the io_context.
|
||||
// If they have explicit stop logic, call it here.
|
||||
m_rtu_bus_services.clear();
|
||||
|
||||
// 停止所有 TCP 轮询服务
|
||||
for (auto& poller : m_tcp_pollers) {
|
||||
poller->stop();
|
||||
}
|
||||
m_tcp_pollers.clear();
|
||||
|
||||
spdlog::info("All device services stopped.");
|
||||
}
|
||||
|
||||
std::vector<DeviceInfo> DeviceManager::get_all_device_info() const {
|
||||
// 使用 lock_guard 确保在函数返回前互斥锁能被自动释放
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
|
||||
std::vector<DeviceInfo> all_devices;
|
||||
all_devices.reserve(m_rtu_services.size() + m_tcp_pollers.size());
|
||||
all_devices.reserve(m_tcp_pollers.size()); // 初始容量可以先按TCP设备数算
|
||||
|
||||
// 遍历 RTU 设备
|
||||
for (const auto& service : m_rtu_services) {
|
||||
// !! 前提: ModbusRtuPollerService 必须有 get_config() 方法
|
||||
const auto& config = service->get_config();
|
||||
|
||||
DeviceInfo info;
|
||||
info.id = config.device_id;
|
||||
info.type = "ModbusRTU";
|
||||
info.is_running = service->is_running(); // 假设 service 有 is_running() 方法
|
||||
info.connection_details["Port Path"] = config.port_path;
|
||||
info.connection_details["Baud Rate"] = std::to_string(config.baud_rate);
|
||||
info.connection_details["Slave ID"] = std::to_string(config.slave_id);
|
||||
|
||||
all_devices.push_back(info);
|
||||
// 遍历所有 RTU 总线服务
|
||||
for (const auto& service : m_rtu_bus_services) {
|
||||
const auto& configs_on_bus = service->get_all_device_configs();
|
||||
for (const auto& config : configs_on_bus) {
|
||||
DeviceInfo info;
|
||||
info.id = config.device_id;
|
||||
info.type = "ModbusRTU";
|
||||
info.is_running = service->is_running();
|
||||
info.connection_details["Port Path"] = config.port_path;
|
||||
info.connection_details["Baud Rate"] = std::to_string(config.baud_rate);
|
||||
info.connection_details["Slave ID"] = std::to_string(config.slave_id);
|
||||
all_devices.push_back(info);
|
||||
}
|
||||
}
|
||||
|
||||
// 遍历 TCP 设备
|
||||
// 遍历 TCP 设备 (逻辑不变)
|
||||
for (const auto& poller : m_tcp_pollers) {
|
||||
// !! 前提: ModbusMasterPoller 必须有 get_config() 方法
|
||||
const auto& config = poller->get_config();
|
||||
|
||||
DeviceInfo info;
|
||||
info.id = config.device_id;
|
||||
info.type = "ModbusTCP";
|
||||
info.is_running = poller->is_running(); // 假设 poller 有 is_running() 方法
|
||||
info.is_running = poller->is_running();
|
||||
info.connection_details["IP Address"] = config.ip_address;
|
||||
info.connection_details["Port"] = std::to_string(config.port);
|
||||
info.connection_details["Slave ID"] = std::to_string(config.slave_id);
|
||||
|
||||
all_devices.push_back(info);
|
||||
}
|
||||
|
||||
return all_devices;
|
||||
}
|
||||
|
||||
bool DeviceManager::send_control_command(const std::string& device_id, uint16_t address, uint16_t value) {
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
|
||||
// 首先,在 TCP poller 列表中查找
|
||||
for (const auto& poller : m_tcp_pollers) {
|
||||
if (poller->get_config().device_id == device_id) {
|
||||
spdlog::info("Found TCP device '{}'. Dispatching write command to address {}.", device_id, address);
|
||||
poller->write_single_register(address, value);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// 然后,在 RTU bus service 列表中查找哪个服务管理着该设备
|
||||
for (const auto& service : m_rtu_bus_services) {
|
||||
if (service->manages_device(device_id)) {
|
||||
spdlog::info("Found RTU device '{}' on a bus. Dispatching write command.", device_id);
|
||||
service->write_single_register(device_id, address, value);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
spdlog::warn("send_control_command failed: Device with ID '{}' not found in any service.", device_id);
|
||||
return false;
|
||||
}
|
||||
|
|
@ -3,7 +3,7 @@
|
|||
#define DEVICE_MANAGER_H
|
||||
|
||||
#include "protocol/iprotocol_adapter.h"
|
||||
#include "modbus/modbus_rtu_poller_service.h"
|
||||
#include "modbus/modbus_rtu_bus_service.h"
|
||||
#include "modbus/modbus_master_poller.h"
|
||||
#include <boost/asio.hpp>
|
||||
#include <vector>
|
||||
|
|
@ -55,16 +55,23 @@ public:
|
|||
*/
|
||||
void stop_all();
|
||||
|
||||
/**
|
||||
* @brief 向指定的Modbus设备发送一个写单个寄存器的命令
|
||||
* @param device_id 目标设备的唯一ID
|
||||
* @param address 要写入的寄存器地址
|
||||
* @param value 要写入的值
|
||||
* @return 如果找到设备并成功分派命令,则返回true;否则返回false
|
||||
*/
|
||||
bool send_control_command(const std::string& device_id, uint16_t address, uint16_t value);
|
||||
|
||||
private:
|
||||
boost::asio::io_context& m_io_context;
|
||||
ReportDataCallback m_report_callback;
|
||||
|
||||
// 用于存储正在运行的服务实例,以管理其生命周期
|
||||
std::vector<std::unique_ptr<ModbusRtuPollerService>> m_rtu_services;
|
||||
std::vector<std::unique_ptr<ModbusRtuBusService>> m_rtu_bus_services;
|
||||
std::vector<std::shared_ptr<ModbusMasterPoller>> m_tcp_pollers;
|
||||
|
||||
// --- <<< 新增成员 >>> ---
|
||||
// mutable 关键字允许在 const 成员函数中修改它 (例如在 get_all_device_info 中加锁)
|
||||
mutable std::mutex m_mutex;
|
||||
};
|
||||
|
||||
|
|
|
|||
52
src/main.cpp
52
src/main.cpp
|
|
@ -9,6 +9,7 @@
|
|||
#include "dataCache/data_cache.h"
|
||||
#include "dataCache/cache_uploader.h"
|
||||
#include "web/web_server.h"
|
||||
#include "dataCache/live_data_cache.h"
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
|
|
@ -19,14 +20,6 @@
|
|||
// 用于 ASIO 服务的全局 io_context
|
||||
boost::asio::io_context g_io_context;
|
||||
|
||||
/**
|
||||
* @brief 处理终止信号 (SIGINT, SIGTERM).
|
||||
*/
|
||||
// void signalHandler(int signum) {
|
||||
// spdlog::warn("Interrupt signal ({}) received. Shutting down.", signum);
|
||||
// g_io_context.stop();
|
||||
// }
|
||||
|
||||
/**
|
||||
* @brief 周期性轮询系统状态并发布到 MQTT
|
||||
*/
|
||||
|
|
@ -60,17 +53,31 @@ int main(int argc, char* argv[]) {
|
|||
return 1;
|
||||
}
|
||||
|
||||
// signal(SIGINT, signalHandler);
|
||||
// signal(SIGTERM, signalHandler);
|
||||
|
||||
try {
|
||||
DataCache data_cache;
|
||||
LiveDataCache live_data_cache;
|
||||
MqttClient mqtt_client("tcp://mqtt-broker:1883", "edge-proxy-main-client");
|
||||
MqttRouter mqtt_router(mqtt_client);
|
||||
auto report_to_mqtt = [&](const UnifiedData& data) {
|
||||
live_data_cache.update_data(data.device_id, data.data_json);
|
||||
if (mqtt_client.is_connected()) {
|
||||
// 网络正常,直接上报
|
||||
std::string topic = "devices/" + data.device_id + "/data";
|
||||
g_io_context.post([&, topic, payload = data.data_json]() {
|
||||
mqtt_client.publish(topic, payload, 1, false);
|
||||
});
|
||||
} else {
|
||||
// 网络断开,写入缓存
|
||||
spdlog::warn("MQTT disconnected. Caching data for device '{}'.", data.device_id);
|
||||
data_cache.add(data);
|
||||
}
|
||||
};
|
||||
|
||||
DeviceManager device_manager(g_io_context, report_to_mqtt);
|
||||
MqttRouter mqtt_router(mqtt_client, device_manager);
|
||||
std::vector<uint16_t> listen_ports = { 8888 };
|
||||
TCPServer tcp_server(g_io_context, listen_ports, mqtt_client);
|
||||
SystemMonitor::SystemMonitor monitor;
|
||||
|
||||
DataCache data_cache;
|
||||
if (!data_cache.open("edge_data_cache.db")) {
|
||||
spdlog::critical("Failed to initialize data cache. Exiting.");
|
||||
return 1;
|
||||
|
|
@ -90,29 +97,12 @@ int main(int argc, char* argv[]) {
|
|||
boost::asio::steady_timer system_monitor_timer(g_io_context, std::chrono::seconds(15));
|
||||
system_monitor_timer.async_wait(std::bind(poll_system_metrics, std::ref(system_monitor_timer), std::ref(monitor), std::ref(mqtt_client)));
|
||||
|
||||
// --- 创建包含缓存逻辑的统一数据上报回调函数 >>>
|
||||
auto report_to_mqtt = [&](const UnifiedData& data) {
|
||||
if (mqtt_client.is_connected()) {
|
||||
// 网络正常,直接上报
|
||||
std::string topic = "devices/" + data.device_id + "/data";
|
||||
g_io_context.post([&, topic, payload = data.data_json]() {
|
||||
mqtt_client.publish(topic, payload, 1, false);
|
||||
});
|
||||
} else {
|
||||
// 网络断开,写入缓存
|
||||
spdlog::warn("MQTT disconnected. Caching data for device '{}'.", data.device_id);
|
||||
data_cache.add(data);
|
||||
}
|
||||
};
|
||||
|
||||
// 实例化设备管理器并从文件加载所有设备 (使用新的回调)
|
||||
DeviceManager device_manager(g_io_context, report_to_mqtt);
|
||||
device_manager.load_and_start("../config/devices.json");
|
||||
|
||||
WebServer web_server(monitor, device_manager,8080);
|
||||
WebServer web_server(monitor, device_manager, live_data_cache, 8080);
|
||||
web_server.start();
|
||||
|
||||
// --- 2. <<< MODIFIED: 设置 Boost.Asio 风格的信号处理器 >>> ---
|
||||
boost::asio::signal_set signals(g_io_context, SIGINT, SIGTERM);
|
||||
signals.async_wait([&](const boost::system::error_code& error, int signal_number) {
|
||||
spdlog::warn("Interrupt signal ({}) received. Shutting down.", signal_number);
|
||||
|
|
|
|||
|
|
@ -31,14 +31,12 @@ void ModbusMasterPoller::start() {
|
|||
do_connect();
|
||||
}
|
||||
|
||||
// --- <<< 新增 stop 方法实现 >>> ---
|
||||
|
||||
void ModbusMasterPoller::stop() {
|
||||
if (!m_is_running.exchange(false)) {
|
||||
// 如果之前已经是 false,说明已经停止了,直接返回
|
||||
return;
|
||||
}
|
||||
|
||||
// 使用 post 确保取消操作在 io_context 的事件循环中执行,保证线程安全
|
||||
|
||||
boost::asio::post(m_io_context, [this, self = shared_from_this()](){
|
||||
spdlog::info("[Modbus TCP] Stopping poller for device '{}'...", m_config.device_id);
|
||||
|
||||
|
|
@ -54,7 +52,6 @@ void ModbusMasterPoller::stop() {
|
|||
});
|
||||
}
|
||||
|
||||
// --- <<< 新增 get_config 和 is_running 实现 >>> ---
|
||||
const ModbusTcpDeviceConfig& ModbusMasterPoller::get_config() const {
|
||||
return m_config;
|
||||
}
|
||||
|
|
@ -63,6 +60,74 @@ bool ModbusMasterPoller::is_running() const {
|
|||
return m_is_running;
|
||||
}
|
||||
|
||||
void ModbusMasterPoller::write_single_register(uint16_t address, uint16_t value) {
|
||||
if (!m_is_running) {
|
||||
spdlog::warn("[Modbus TCP] Device '{}': Poller is not running. Cannot write.", m_config.device_id);
|
||||
return;
|
||||
}
|
||||
|
||||
// 使用 post 将写操作投递到 io_context 线程中执行,以保证线程安全
|
||||
boost::asio::post(m_io_context, [this, self = shared_from_this(), address, value]() {
|
||||
spdlog::debug("[Modbus TCP] Device '{}': Writing value {} to address {}.", m_config.device_id, value, address);
|
||||
|
||||
// 1. 创建一个独立的写缓冲区
|
||||
auto write_cmd_buffer = std::make_shared<std::vector<uint8_t>>();
|
||||
|
||||
// 2. 构建写命令PDU (功能码0x06)
|
||||
// PDU = Address (2 bytes) + Value (2 bytes)
|
||||
std::vector<uint8_t> pdu;
|
||||
pdu.push_back(static_cast<uint8_t>((address >> 8) & 0xFF));
|
||||
pdu.push_back(static_cast<uint8_t>(address & 0xFF));
|
||||
pdu.push_back(static_cast<uint8_t>((value >> 8) & 0xFF));
|
||||
pdu.push_back(static_cast<uint8_t>(value & 0xFF));
|
||||
|
||||
// 3. 构建MBAP头
|
||||
write_cmd_buffer->resize(7 + pdu.size());
|
||||
uint16_t tid = m_transaction_id++; // 原子地增加事务ID
|
||||
uint16_t length = 1 + pdu.size(); // UnitID + PDU
|
||||
|
||||
(*write_cmd_buffer)[0] = static_cast<uint8_t>((tid >> 8) & 0xFF);
|
||||
(*write_cmd_buffer)[1] = static_cast<uint8_t>(tid & 0xFF);
|
||||
(*write_cmd_buffer)[2] = 0; (*write_cmd_buffer)[3] = 0; // Protocol ID
|
||||
(*write_cmd_buffer)[4] = static_cast<uint8_t>((length >> 8) & 0xFF);
|
||||
(*write_cmd_buffer)[5] = static_cast<uint8_t>(length & 0xFF);
|
||||
(*write_cmd_buffer)[6] = m_config.slave_id;
|
||||
std::copy(pdu.begin(), pdu.end(), write_cmd_buffer->begin() + 7);
|
||||
|
||||
// 4. 异步发送写命令
|
||||
boost::asio::async_write(m_socket, boost::asio::buffer(*write_cmd_buffer),
|
||||
[this, self, tid, write_cmd_buffer](const boost::system::error_code& ec, std::size_t) {
|
||||
if (!m_is_running || ec) {
|
||||
if (ec != boost::asio::error::operation_aborted) {
|
||||
spdlog::error("[{}] Write command failed: {}", m_config.device_id, ec.message());
|
||||
on_error("write_command");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 5. 异步读取响应
|
||||
auto read_cmd_buffer = std::make_shared<std::array<uint8_t, 12>>(); // 写响应通常是12字节
|
||||
boost::asio::async_read(m_socket, boost::asio::buffer(*read_cmd_buffer, 12),
|
||||
[this, self, tid, read_cmd_buffer](const boost::system::error_code& ec, std::size_t) {
|
||||
if (!m_is_running || ec) {
|
||||
if (ec != boost::asio::error::operation_aborted) {
|
||||
spdlog::error("[{}] Read command response failed: {}", m_config.device_id, ec.message());
|
||||
on_error("read_command_response");
|
||||
}
|
||||
return;
|
||||
}
|
||||
// 简单检查一下事务ID是否匹配
|
||||
uint16_t resp_tid = ((*read_cmd_buffer)[0] << 8) | (*read_cmd_buffer)[1];
|
||||
if (resp_tid == tid) {
|
||||
spdlog::info("[{}] Successfully wrote and received confirmation.", m_config.device_id);
|
||||
} else {
|
||||
spdlog::warn("[{}] Write command TID mismatch. Sent {}, Rcvd {}.", m_config.device_id, tid, resp_tid);
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void ModbusMasterPoller::do_connect() {
|
||||
auto self = shared_from_this();
|
||||
tcp::resolver resolver(m_io_context);
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@
|
|||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <atomic>
|
||||
|
||||
class ModbusMasterPoller : public std::enable_shared_from_this<ModbusMasterPoller> {
|
||||
public:
|
||||
|
|
@ -22,6 +23,13 @@ public:
|
|||
const ModbusTcpDeviceConfig& get_config() const;
|
||||
bool is_running() const;
|
||||
|
||||
/**
|
||||
* @brief (线程安全) 异步地向设备写入单个寄存器
|
||||
* @param address 寄存器地址
|
||||
* @param value 要写入的值
|
||||
*/
|
||||
void write_single_register(uint16_t address, uint16_t value);
|
||||
|
||||
private:
|
||||
void do_connect();
|
||||
void do_poll();
|
||||
|
|
@ -40,7 +48,7 @@ private:
|
|||
|
||||
std::vector<uint8_t> m_write_buffer;
|
||||
std::array<uint8_t, 260> m_read_buffer;
|
||||
uint16_t m_transaction_id = 0;
|
||||
std::atomic<uint16_t> m_transaction_id{0};
|
||||
|
||||
std::atomic<bool> m_is_running{false};
|
||||
};
|
||||
|
|
|
|||
|
|
@ -0,0 +1,150 @@
|
|||
// 文件名: src/modbus/modbus_rtu_bus_service.cc (注意文件名已更改)
|
||||
#include "modbus_rtu_bus_service.h"
|
||||
#include "generic_modbus_parser.h"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include <chrono>
|
||||
#include <algorithm>
|
||||
|
||||
// --- MODIFIED: 构造函数接收一个设备列表 ---
|
||||
ModbusRtuBusService::ModbusRtuBusService(std::string port_path,
|
||||
std::vector<ModbusRtuDeviceConfig> devices,
|
||||
ReportDataCallback report_cb)
|
||||
: m_port_path(std::move(port_path)),
|
||||
m_devices(std::move(devices)),
|
||||
m_report_callback(std::move(report_cb)) {}
|
||||
|
||||
ModbusRtuBusService::~ModbusRtuBusService() {
|
||||
stop();
|
||||
}
|
||||
|
||||
// --- start, stop, is_running 逻辑基本不变 ---
|
||||
void ModbusRtuBusService::start() {
|
||||
if (m_thread.joinable()) { return; }
|
||||
m_stop_flag = false;
|
||||
m_thread = std::thread(&ModbusRtuBusService::run, this);
|
||||
spdlog::info("[Modbus RTU Bus] Service for port '{}' started.", m_port_path);
|
||||
}
|
||||
|
||||
void ModbusRtuBusService::stop() {
|
||||
if (m_stop_flag.exchange(true)) { return; }
|
||||
if (m_thread.joinable()) {
|
||||
m_thread.join();
|
||||
spdlog::info("[Modbus RTU Bus] Service for port '{}' has been stopped.", m_port_path);
|
||||
}
|
||||
}
|
||||
bool ModbusRtuBusService::is_running() const {
|
||||
return !m_stop_flag && m_thread.joinable();
|
||||
}
|
||||
|
||||
|
||||
// --- NEW: 新增辅助方法 ---
|
||||
bool ModbusRtuBusService::manages_device(const std::string& device_id) const {
|
||||
for (const auto& device_config : m_devices) {
|
||||
if (device_config.device_id == device_id) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// ---写方法现在需要 device_id 来查找 slave_id ---
|
||||
void ModbusRtuBusService::write_single_register(const std::string& device_id, uint16_t address, uint16_t value) {
|
||||
uint8_t slave_id = 0;
|
||||
bool device_found = false;
|
||||
// 首先根据 device_id 找到对应的 slave_id
|
||||
for (const auto& config : m_devices) {
|
||||
if (config.device_id == device_id) {
|
||||
slave_id = config.slave_id;
|
||||
device_found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!device_found) {
|
||||
spdlog::warn("[Modbus RTU Bus] Write failed: Device '{}' not managed by service on port '{}'.", device_id, m_port_path);
|
||||
return;
|
||||
}
|
||||
|
||||
// 锁定串口资源,执行写操作
|
||||
std::lock_guard<std::mutex> lock(m_client_mutex);
|
||||
try {
|
||||
spdlog::debug("[Modbus RTU Bus] Port '{}': Writing to device '{}' (Slave ID {})...", m_port_path, device_id, slave_id);
|
||||
m_client.writeSingleRegister(slave_id, address, value);
|
||||
spdlog::info("[Modbus RTU Bus] Port '{}': Successfully wrote to device '{}'.", m_port_path, device_id);
|
||||
} catch (const std::exception& e) {
|
||||
spdlog::error("[Modbus RTU Bus] Port '{}': Failed to write to device '{}': {}", m_port_path, device_id, e.what());
|
||||
}
|
||||
}
|
||||
|
||||
const std::vector<ModbusRtuDeviceConfig>& ModbusRtuBusService::get_all_device_configs() const {
|
||||
return m_devices;
|
||||
}
|
||||
|
||||
// --- 以串行轮询多个设备 ---
|
||||
void ModbusRtuBusService::run() {
|
||||
if (m_devices.empty()) {
|
||||
spdlog::warn("[Modbus RTU Bus] No devices configured for port '{}'. Thread will not run.", m_port_path);
|
||||
return;
|
||||
}
|
||||
|
||||
// 只需要设置一次串口参数
|
||||
// 我们假设同一总线上的设备波特率等参数一致,这是 Modbus RTU 的基本要求
|
||||
if (!m_client.setPortSettings(m_port_path, m_devices[0].baud_rate)) {
|
||||
spdlog::error("[Modbus RTU Bus] Failed to set up serial port '{}'. Thread exiting.", m_port_path);
|
||||
return;
|
||||
}
|
||||
|
||||
// 主循环
|
||||
while (!m_stop_flag) {
|
||||
// 依次轮询此总线上的每一个设备
|
||||
for (const auto& config : m_devices) {
|
||||
if (m_stop_flag) break; // 每次轮询前都检查停止标志
|
||||
|
||||
{ // 创建作用域以控制锁
|
||||
std::lock_guard<std::mutex> lock(m_client_mutex);
|
||||
try {
|
||||
// --- 针对当前设备计算轮询范围 ---
|
||||
if (config.data_points.empty()) continue;
|
||||
auto [min_it, max_it] = std::minmax_element(config.data_points.begin(), config.data_points.end(),
|
||||
[](const DataPointConfig& a, const DataPointConfig& b) { return a.address < b.address; });
|
||||
uint16_t start_address = min_it->address;
|
||||
uint16_t last_address = max_it->address;
|
||||
if (max_it->type >= ModbusDataType::UINT32) { last_address += 1; }
|
||||
uint16_t quantity = last_address - start_address + 1;
|
||||
|
||||
std::vector<uint16_t> raw_registers = m_client.readHoldingRegisters(config.slave_id, start_address, quantity);
|
||||
|
||||
std::map<uint16_t, uint16_t> registers_map;
|
||||
for (uint16_t i = 0; i < raw_registers.size(); ++i) {
|
||||
registers_map[start_address + i] = raw_registers[i];
|
||||
}
|
||||
nlohmann::json data_j = GenericModbusParser::parse(registers_map, config.data_points);
|
||||
UnifiedData report_data;
|
||||
report_data.device_id = config.device_id;
|
||||
report_data.timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
report_data.data_json = data_j.dump();
|
||||
if (m_report_callback) {
|
||||
m_report_callback(report_data);
|
||||
}
|
||||
} catch (const std::exception& e) {
|
||||
spdlog::error("[Modbus RTU Bus] Port '{}', Device '{}': Communication error: {}", m_port_path, config.device_id, e.what());
|
||||
}
|
||||
} // 锁释放
|
||||
|
||||
// 在轮询下一个设备前,短暂延时,给总线和设备响应时间
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(150));
|
||||
}
|
||||
|
||||
// 完成一轮所有设备的轮询后,等待一个大的周期
|
||||
if (!m_stop_flag) {
|
||||
// 这里的等待时间可以根据需求调整,例如使用第一个设备的 poll_interval_ms
|
||||
auto wake_up_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(m_devices[0].poll_interval_ms);
|
||||
while (std::chrono::steady_clock::now() < wake_up_time && !m_stop_flag) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
m_client.closePort();
|
||||
spdlog::info("[RTU Bus Service {}] Run loop finished. Thread exiting.", m_port_path);
|
||||
}
|
||||
|
|
@ -0,0 +1,65 @@
|
|||
// 文件名: src/modbus/modbus_rtu_bus_service.h (注意文件名已更改)
|
||||
#ifndef MODBUS_RTU_BUS_SERVICE_H
|
||||
#define MODBUS_RTU_BUS_SERVICE_H
|
||||
|
||||
#include "protocol/iprotocol_adapter.h"
|
||||
#include "modbus_rtu_client.h"
|
||||
#include "modbus_common.h"
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include <string>
|
||||
#include <vector> // <--- 新增
|
||||
#include <mutex>
|
||||
|
||||
/**
|
||||
* @brief Modbus RTU 总线服务类
|
||||
* * 负责在一个独立的后台线程中,管理一个物理串口,并依次轮询该串口上的所有设备。
|
||||
*/
|
||||
class ModbusRtuBusService {
|
||||
public:
|
||||
/**
|
||||
* @brief 构造函数
|
||||
* @param port_path 该总线所使用的串口路径, e.g., "/dev/ttyS0"
|
||||
* @param devices 该总线上所有设备的配置列表
|
||||
* @param report_cb 用于上报数据的回调函数
|
||||
*/
|
||||
ModbusRtuBusService(std::string port_path,
|
||||
std::vector<ModbusRtuDeviceConfig> devices,
|
||||
ReportDataCallback report_cb);
|
||||
~ModbusRtuBusService();
|
||||
|
||||
ModbusRtuBusService(const ModbusRtuBusService&) = delete;
|
||||
ModbusRtuBusService& operator=(const ModbusRtuBusService&) = delete;
|
||||
|
||||
void start();
|
||||
void stop();
|
||||
bool is_running() const;
|
||||
|
||||
/**
|
||||
* @brief 检查该总线服务是否管理着指定的设备ID
|
||||
*/
|
||||
bool manages_device(const std::string& device_id) const;
|
||||
const std::vector<ModbusRtuDeviceConfig>& get_all_device_configs() const;
|
||||
|
||||
/**
|
||||
* @brief (线程安全) 向总线上的某个设备写入单个寄存器
|
||||
* @param device_id 目标设备的ID
|
||||
* @param address 寄存器地址
|
||||
* @param value 要写入的值
|
||||
*/
|
||||
void write_single_register(const std::string& device_id, uint16_t address, uint16_t value);
|
||||
|
||||
private:
|
||||
void run();
|
||||
|
||||
std::string m_port_path;
|
||||
std::vector<ModbusRtuDeviceConfig> m_devices;
|
||||
ReportDataCallback m_report_callback;
|
||||
ModbusRTUClient m_client;
|
||||
|
||||
std::thread m_thread;
|
||||
std::atomic<bool> m_stop_flag{false};
|
||||
std::mutex m_client_mutex; // 保护 m_client 的互斥锁
|
||||
};
|
||||
|
||||
#endif // MODBUS_RTU_BUS_SERVICE_H
|
||||
|
|
@ -1,115 +0,0 @@
|
|||
// 文件名: src/modbus/modbus_rtu_poller_service.cc
|
||||
#include "modbus_rtu_poller_service.h"
|
||||
#include "generic_modbus_parser.h" // 使用通用解析器
|
||||
#include "spdlog/spdlog.h"
|
||||
#include <chrono>
|
||||
#include <algorithm> // for std::minmax_element
|
||||
|
||||
ModbusRtuPollerService::ModbusRtuPollerService(ModbusRtuDeviceConfig config, ReportDataCallback report_cb)
|
||||
: m_config(std::move(config)),
|
||||
m_report_callback(std::move(report_cb)) {}
|
||||
|
||||
ModbusRtuPollerService::~ModbusRtuPollerService() {
|
||||
stop();
|
||||
}
|
||||
|
||||
void ModbusRtuPollerService::start() {
|
||||
if (m_thread.joinable()) {
|
||||
spdlog::warn("[Modbus RTU Service] Poller for device '{}' is already running.", m_config.device_id);
|
||||
return;
|
||||
}
|
||||
m_stop_flag = false;
|
||||
// 启动一个新线程,并将 run() 方法作为入口点
|
||||
// 'this' 指针被传递,以便新线程可以调用类的成员函数
|
||||
m_thread = std::thread(&ModbusRtuPollerService::run, this);
|
||||
spdlog::info("[Modbus RTU Service] Poller for device '{}' started in a background thread.", m_config.device_id);
|
||||
}
|
||||
|
||||
void ModbusRtuPollerService::stop() {
|
||||
m_stop_flag = true;
|
||||
if (m_thread.joinable()) {
|
||||
m_thread.join(); // 等待线程安全退出
|
||||
spdlog::info("[Modbus RTU Service] Poller for device '{}' has been stopped.", m_config.device_id);
|
||||
}
|
||||
}
|
||||
|
||||
const ModbusRtuDeviceConfig& ModbusRtuPollerService::get_config() const {
|
||||
return m_config;
|
||||
}
|
||||
bool ModbusRtuPollerService::is_running() const {
|
||||
return !m_stop_flag && m_thread.joinable();
|
||||
}
|
||||
|
||||
void ModbusRtuPollerService::run() {
|
||||
// 检查是否有数据点被配置
|
||||
if (m_config.data_points.empty()) {
|
||||
spdlog::warn("[Modbus RTU] Device '{}' has no data points configured. Thread will not run.", m_config.device_id);
|
||||
return;
|
||||
}
|
||||
|
||||
// 设置并打开串口
|
||||
if (!m_client.setPortSettings(m_config.port_path, m_config.baud_rate)) {
|
||||
spdlog::error("[Modbus RTU] Failed to set up serial port '{}' for device '{}'. Thread exiting.", m_config.port_path, m_config.device_id);
|
||||
return;
|
||||
}
|
||||
|
||||
// 1. 为了提高效率,我们计算出需要一次性读取的连续寄存器范围
|
||||
auto [min_it, max_it] = std::minmax_element(m_config.data_points.begin(), m_config.data_points.end(),
|
||||
[](const DataPointConfig& a, const DataPointConfig& b) {
|
||||
return a.address < b.address;
|
||||
});
|
||||
uint16_t start_address = min_it->address;
|
||||
uint16_t last_address = max_it->address;
|
||||
|
||||
// 考虑多寄存器数据类型(如FLOAT32)会占用额外的寄存器
|
||||
if (max_it->type == ModbusDataType::UINT32 || max_it->type == ModbusDataType::INT32 || max_it->type == ModbusDataType::FLOAT32) {
|
||||
last_address += 1; // 32位数据占用2个16位寄存器
|
||||
}
|
||||
uint16_t quantity = last_address - start_address + 1;
|
||||
|
||||
spdlog::info("[Modbus RTU] Device '{}' will poll {} registers starting from address {}.", m_config.device_id, quantity, start_address);
|
||||
|
||||
// 线程主循环
|
||||
while (!m_stop_flag) {
|
||||
try {
|
||||
// 2. 一次性读取所有需要的连续寄存器
|
||||
std::vector<uint16_t> raw_registers = m_client.readHoldingRegisters(m_config.slave_id, start_address, quantity);
|
||||
|
||||
// 3. 将返回的寄存器数组转换为 (地址 -> 值) 的映射,方便解析器按地址查找
|
||||
std::map<uint16_t, uint16_t> registers_map;
|
||||
for (uint16_t i = 0; i < raw_registers.size(); ++i) {
|
||||
registers_map[start_address + i] = raw_registers[i];
|
||||
}
|
||||
|
||||
// 4. 使用通用的、配置驱动的解析器进行解析
|
||||
nlohmann::json data_j = GenericModbusParser::parse(registers_map, m_config.data_points);
|
||||
|
||||
// 5. 封装成 UnifiedData 结构并上报
|
||||
UnifiedData report_data;
|
||||
report_data.device_id = m_config.device_id;
|
||||
report_data.timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
report_data.data_json = data_j.dump();
|
||||
|
||||
if (m_report_callback) {
|
||||
m_report_callback(report_data);
|
||||
}
|
||||
|
||||
} catch (const std::exception& e) {
|
||||
spdlog::error("[Modbus RTU] Error during communication with device '{}': {}", m_config.device_id, e.what());
|
||||
}
|
||||
|
||||
// 在本线程中等待,不影响主线程的事件循环
|
||||
auto wake_up_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(m_config.poll_interval_ms);
|
||||
while (std::chrono::steady_clock::now() < wake_up_time) {
|
||||
if (m_stop_flag) {
|
||||
break;
|
||||
}
|
||||
// 每次只“打盹”100毫秒
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
}
|
||||
|
||||
// 线程退出前关闭串口
|
||||
m_client.closePort();
|
||||
spdlog::info("[RTU Poller {}] Run loop finished. Thread exiting.", m_config.device_id);
|
||||
}
|
||||
|
|
@ -1,76 +0,0 @@
|
|||
// 文件名: src/modbus/modbus_rtu_poller_service.h
|
||||
#ifndef MODBUS_RTU_POLLER_SERVICE_H
|
||||
#define MODBUS_RTU_POLLER_SERVICE_H
|
||||
|
||||
#include "protocol/iprotocol_adapter.h"
|
||||
#include "modbus_rtu_client.h"
|
||||
#include "modbus_common.h" // 包含 ModbusRtuDeviceConfig 和 DataPointConfig 定义
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include <string>
|
||||
|
||||
/**
|
||||
* @brief Modbus RTU 轮询服务类
|
||||
* * 负责在一个独立的后台线程中,根据配置周期性地轮询一个Modbus RTU设备。
|
||||
* 这种设计将同步阻塞的串口IO操作与主程序的异步事件循环隔离开来,
|
||||
* 避免了主线程被阻塞。
|
||||
*/
|
||||
class ModbusRtuPollerService {
|
||||
public:
|
||||
/**
|
||||
* @brief 构造函数
|
||||
* @param config 描述了要连接的设备、串口参数以及需要采集的数据点列表
|
||||
* @param report_cb 用于将解析后的数据上报给核心业务层的回调函数
|
||||
*/
|
||||
ModbusRtuPollerService(ModbusRtuDeviceConfig config, ReportDataCallback report_cb);
|
||||
|
||||
/**
|
||||
* @brief 析构函数
|
||||
* * 确保在对象销毁时,后台线程能够被安全地停止和清理。
|
||||
*/
|
||||
~ModbusRtuPollerService();
|
||||
|
||||
// 禁止拷贝和赋值,因为该类管理着一个线程资源
|
||||
ModbusRtuPollerService(const ModbusRtuPollerService&) = delete;
|
||||
ModbusRtuPollerService& operator=(const ModbusRtuPollerService&) = delete;
|
||||
|
||||
/**
|
||||
* @brief 启动轮询服务
|
||||
* * 创建并启动一个新的后台线程来执行 run() 方法中的轮询逻辑。
|
||||
*/
|
||||
void start();
|
||||
|
||||
/**
|
||||
* @brief 停止轮询服务
|
||||
* * 设置停止标志位,并等待后台线程安全地执行完毕并退出。
|
||||
*/
|
||||
void stop();
|
||||
|
||||
// --- <<< 新增方法 >>> ---
|
||||
/**
|
||||
* @brief 获取设备的配置信息 (const-ref to avoid copy)
|
||||
*/
|
||||
const ModbusRtuDeviceConfig& get_config() const;
|
||||
|
||||
/**
|
||||
* @brief 检查轮询服务是否正在运行
|
||||
*/
|
||||
bool is_running() const;
|
||||
// --- <<< END >>> ---
|
||||
private:
|
||||
/**
|
||||
* @brief 线程的主循环函数
|
||||
* * 该函数是后台线程的入口点。它包含了连接串口、周期性轮询、
|
||||
* 解析数据和上报数据的完整逻辑。
|
||||
*/
|
||||
void run();
|
||||
|
||||
ModbusRtuDeviceConfig m_config;
|
||||
ReportDataCallback m_report_callback;
|
||||
ModbusRTUClient m_client;
|
||||
|
||||
std::thread m_thread;
|
||||
std::atomic<bool> m_stop_flag{false};
|
||||
};
|
||||
|
||||
#endif // MODBUS_RTU_POLLER_SERVICE_H
|
||||
|
|
@ -1,26 +1,51 @@
|
|||
// 文件名: command_handler.cpp
|
||||
#include "command_handler.h"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include "vendor/nlohmann/json.hpp"
|
||||
|
||||
using json = nlohmann::json;
|
||||
|
||||
CommandHandler::CommandHandler(MqttClient& client) : m_client(client) {}
|
||||
// 构造函数实现更新
|
||||
CommandHandler::CommandHandler(MqttClient& client, DeviceManager& deviceManager)
|
||||
: m_client(client), m_device_manager(deviceManager) {}
|
||||
|
||||
void CommandHandler::handle(mqtt::const_message_ptr msg) {
|
||||
spdlog::info("DataHandler is processing message from topic: {}", msg->get_topic());
|
||||
const std::string topic = msg->get_topic();
|
||||
const std::string payload = msg->get_payload_str();
|
||||
|
||||
spdlog::info("CommandHandler received a command on topic '{}'", topic);
|
||||
|
||||
try {
|
||||
// (这里是原来 MqttRouter 中处理数据上报的逻辑)
|
||||
auto received_json = json::parse(msg->get_payload_str());
|
||||
if (received_json.contains("temperature")) {
|
||||
double temp = received_json["temperature"];
|
||||
json forward_json;
|
||||
forward_json["source_topic"] = msg->get_topic();
|
||||
forward_json["processed_temp"] = temp;
|
||||
forward_json["timestamp"] = time(0);
|
||||
const std::string output_topic = "proxy/processed/temperature";
|
||||
m_client.publish(output_topic, forward_json.dump());
|
||||
auto cmd_json = json::parse(payload);
|
||||
|
||||
// 校验命令格式
|
||||
if (!cmd_json.contains("deviceId") || !cmd_json.contains("address") || !cmd_json.contains("value")) {
|
||||
spdlog::warn("Command JSON from topic '{}' is missing required fields.", topic);
|
||||
return;
|
||||
}
|
||||
} catch (const json::parse_error& e) {
|
||||
spdlog::error("JSON parse error in DataHandler: {}", e.what());
|
||||
|
||||
// 解析命令
|
||||
std::string device_id = cmd_json.at("deviceId").get<std::string>();
|
||||
uint16_t address = cmd_json.at("address").get<uint16_t>();
|
||||
uint16_t value = cmd_json.at("value").get<uint16_t>();
|
||||
|
||||
// 通过 DeviceManager 发送控制指令
|
||||
bool success = m_device_manager.send_control_command(device_id, address, value);
|
||||
|
||||
// (可选) 将执行结果反馈到 MQTT,形成闭环
|
||||
json response_json;
|
||||
response_json["success"] = success;
|
||||
response_json["original_command"] = cmd_json;
|
||||
std::string response_topic = "proxy/command/result/" + device_id;
|
||||
m_client.publish(response_topic, response_json.dump());
|
||||
|
||||
if (success) {
|
||||
spdlog::info("Successfully dispatched command to device '{}'.", device_id);
|
||||
} else {
|
||||
spdlog::error("Failed to dispatch command to device '{}'.", device_id);
|
||||
}
|
||||
|
||||
} catch (const json::exception& e) {
|
||||
spdlog::error("Failed to parse command JSON from topic '{}': {}", topic, e.what());
|
||||
}
|
||||
}
|
||||
|
|
@ -1,10 +1,13 @@
|
|||
// 文件名: command_handler.h
|
||||
#pragma once
|
||||
#include "../mqtt_client.h" // 注意 #include 路径的变化
|
||||
#include "../mqtt_client.h"
|
||||
#include "deviceManager/device_manager.h"
|
||||
|
||||
class CommandHandler {
|
||||
public:
|
||||
explicit CommandHandler(MqttClient& client);
|
||||
explicit CommandHandler(MqttClient& client, DeviceManager& deviceManager);
|
||||
void handle(mqtt::const_message_ptr msg);
|
||||
private:
|
||||
MqttClient& m_client;
|
||||
DeviceManager& m_device_manager;
|
||||
};
|
||||
|
|
@ -7,20 +7,24 @@ using json = nlohmann::json;
|
|||
DataHandler::DataHandler(MqttClient& client) : m_client(client) {}
|
||||
|
||||
void DataHandler::handle(mqtt::const_message_ptr msg) {
|
||||
spdlog::info("DataHandler is processing message from topic: {}", msg->get_topic());
|
||||
const auto& input_topic = msg->get_topic();
|
||||
const auto& payload_str = msg->get_payload_str();
|
||||
|
||||
spdlog::debug("DataHandler received data from topic: {}", input_topic);
|
||||
|
||||
try {
|
||||
// (这里是原来 MqttRouter 中处理数据上报的逻辑)
|
||||
auto received_json = json::parse(msg->get_payload_str());
|
||||
if (received_json.contains("temperature")) {
|
||||
double temp = received_json["temperature"];
|
||||
json forward_json;
|
||||
forward_json["source_topic"] = msg->get_topic();
|
||||
forward_json["processed_temp"] = temp;
|
||||
forward_json["timestamp"] = time(0);
|
||||
const std::string output_topic = "proxy/processed/temperature";
|
||||
m_client.publish(output_topic, forward_json.dump());
|
||||
}
|
||||
auto payload_json = json::parse(payload_str);
|
||||
|
||||
payload_json["_proxy_source_topic"] = input_topic;
|
||||
payload_json["_proxy_processed_ts"] = time(nullptr);
|
||||
|
||||
const std::string output_topic = "proxy/processed/data";
|
||||
|
||||
m_client.publish(output_topic, payload_json.dump());
|
||||
|
||||
spdlog::debug("Successfully forwarded data from '{}' to '{}'", input_topic, output_topic);
|
||||
|
||||
} catch (const json::parse_error& e) {
|
||||
spdlog::error("JSON parse error in DataHandler: {}", e.what());
|
||||
spdlog::error("JSON parse error in DataHandler for topic '{}': {}", input_topic, e.what());
|
||||
}
|
||||
}
|
||||
|
|
@ -2,9 +2,9 @@
|
|||
#include "spdlog/spdlog.h"
|
||||
#include "utils/mqtt_topic_matcher.h"
|
||||
|
||||
MqttRouter::MqttRouter(MqttClient& client) : m_client(client) {
|
||||
MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager) : m_client(client) {
|
||||
m_data_handler = std::make_unique<DataHandler>(m_client);
|
||||
m_command_handler = std::make_unique<CommandHandler>(m_client);
|
||||
m_command_handler = std::make_unique<CommandHandler>(m_client, deviceManager);
|
||||
|
||||
m_client.set_message_callback([this](mqtt::const_message_ptr msg) {
|
||||
this->on_message_arrived(std::move(msg));
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@
|
|||
class MqttRouter {
|
||||
public:
|
||||
// 构造函数现在接收 MqttClient 和所有的处理器
|
||||
MqttRouter(MqttClient& client);
|
||||
MqttRouter(MqttClient& client, DeviceManager& deviceManager);
|
||||
void start();
|
||||
|
||||
private:
|
||||
|
|
|
|||
96
src/test.cc
96
src/test.cc
|
|
@ -1,13 +1,91 @@
|
|||
#include <iostream>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <stdexcept>
|
||||
#include <iomanip>
|
||||
// #include "dataStorage/data_storage.h" // 包含你的头文件
|
||||
// 假设这些头文件已经包含在你的主程序中
|
||||
#include "modbus/modbus_common.h" // 包含 ModbusRtuDeviceConfig, DataPointConfig, ModbusDataType
|
||||
#include "modbus/modbus_rtu_poller_service.h" // 包含 ModbusRtuPollerService 类
|
||||
|
||||
// 模拟一个从设备接收到的原始数据
|
||||
// void mockDataReceiving() {
|
||||
// DataStorage& storage = DataStorage::getInstance();
|
||||
|
||||
// // 1. 初始化数据库(只需要做一次)
|
||||
|
||||
// if (!storage.initialize("/app/db/my_app_data.db")) {
|
||||
// std::cerr << "Failed to initialize database. Exiting." << std::endl;
|
||||
// return;
|
||||
// }
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
printf("HELLO");
|
||||
return 0;
|
||||
}
|
||||
// // 2. 模拟收到一批原始数据
|
||||
// for (int i = 0; i < 10; ++i) {
|
||||
// std::string raw_payload = "RAW_SENSOR_DATA_" + std::to_string(i) + "_VALUE=123";
|
||||
// std::string device_id = "sensor-alpha-001";
|
||||
// std::string protocol = "custom_protocol_v1";
|
||||
|
||||
// // 2.1 存储原始数据
|
||||
// if (!storage.storeRawData(device_id, raw_payload, protocol)) {
|
||||
// std::cerr << "Failed to store raw data for " << device_id << std::endl;
|
||||
// }
|
||||
// }
|
||||
|
||||
// // 3. 模拟协议解析器工作,并将数据转换为 UnifiedData 格式
|
||||
// for (int i = 0; i < 5; ++i) {
|
||||
// UnifiedData data;
|
||||
// data.device_id = "sensor-beta-002";
|
||||
// data.timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
|
||||
// // 构造一个复杂的 JSON 字符串
|
||||
// data.data_json = "{"
|
||||
// "\"temperature\": " + std::to_string(25.5 + i * 0.1) + ","
|
||||
// "\"humidity\": " + std::to_string(60.2 - i * 0.5) + ","
|
||||
// "\"status\": \"active\","
|
||||
// "\"error_code\": 0"
|
||||
// "}";
|
||||
|
||||
// // 3.2 存储处理后的数据
|
||||
// if (!storage.storeProcessedData(data)) {
|
||||
// std::cerr << "Failed to store processed data for " << data.device_id << std::endl;
|
||||
// }
|
||||
// }
|
||||
|
||||
// std::cout << "Data storage simulation finished." << std::endl;
|
||||
// }
|
||||
|
||||
// int main() {
|
||||
// mockDataReceiving();
|
||||
// return 0;
|
||||
// }
|
||||
// 假设这些头文件已经包含在你的主程序中
|
||||
// #include "modbus/modbus_common.h" // 包含 ModbusRtuDeviceConfig, DataPointConfig, ModbusDataType
|
||||
// #include "modbus_rtu_poller_service.h" // 包含 ModbusRtuPollerService 类
|
||||
// #include "data_storage.h" // 上一个问题中的数据存储类,用来接收最终数据
|
||||
// #include <iostream>
|
||||
// 这是你的主程序或某个初始化函数中的代码
|
||||
int main() {
|
||||
// 1. 创建具体的数据点配置列表
|
||||
std::vector<DataPointConfig> data_points;
|
||||
|
||||
// 配置第一个数据点:温度
|
||||
DataPointConfig temp_config;
|
||||
temp_config.name = "temperature_celsius";
|
||||
temp_config.address = 0x01;
|
||||
temp_config.type = ModbusDataType::FLOAT32;
|
||||
temp_config.scale = 1.0; // 假设原始值需要乘以0.1才能得到真实温度
|
||||
data_points.push_back(temp_config);
|
||||
// 配置第二个数据点:状态
|
||||
DataPointConfig status_config;
|
||||
status_config.name = "device_status";
|
||||
status_config.address = 0x03;
|
||||
status_config.type = ModbusDataType::FLOAT32;
|
||||
status_config.scale = 1.0; // 不需要缩放
|
||||
data_points.push_back(status_config);
|
||||
// 2. 创建完整的设备配置
|
||||
ModbusRtuDeviceConfig device_config;
|
||||
device_config.device_id = "boiler-room-sensor-01"; // 给设备一个唯一的ID,很重要!
|
||||
device_config.port_path = "/dev/ttyS7"; // Linux下的串口设备名
|
||||
device_config.baud_rate = 9600; // 波特率
|
||||
device_config.slave_id = 0x02; // Modbus从站地址
|
||||
device_config.poll_interval_ms = 2000; // 每2秒轮询一次
|
||||
device_config.data_points = data_points; // 将我们刚刚配置的数据点列表放进去
|
||||
// ... 接下来进入管理层 ...
|
||||
}
|
||||
|
|
@ -3,9 +3,9 @@
|
|||
#include "spdlog/spdlog.h"
|
||||
|
||||
// 构造函数现在需要调用基类的构造函数
|
||||
WebServer::WebServer(SystemMonitor::SystemMonitor& monitor, DeviceManager& deviceManager, uint16_t port)
|
||||
WebServer::WebServer(SystemMonitor::SystemMonitor& monitor, DeviceManager& deviceManager, LiveDataCache& liveDataCache,uint16_t port)
|
||||
// 注意基类已经改为 crow::Crow<crow::CORSHandler>()
|
||||
: crow::Crow<crow::CORSHandler>(), m_monitor(monitor),m_device_manager(deviceManager), m_port(port)
|
||||
: crow::Crow<crow::CORSHandler>(), m_monitor(monitor), m_device_manager(deviceManager), m_live_data_cache(liveDataCache), m_port(port)
|
||||
{
|
||||
// ================= [ 新增 CORS 配置 ] =================
|
||||
// 获取 CORS 中间件的引用
|
||||
|
|
@ -65,7 +65,6 @@ void WebServer::setup_routes() {
|
|||
return response;
|
||||
});
|
||||
|
||||
// ================= [ 新增设备列表 API ] =================
|
||||
CROW_ROUTE((*this), "/api/devices").methods("GET"_method)
|
||||
([this] {
|
||||
// 这一行不变,从 DeviceManager 获取最新的设备信息
|
||||
|
|
@ -92,5 +91,22 @@ void WebServer::setup_routes() {
|
|||
|
||||
return crow::json::wvalue(devices_json);
|
||||
});
|
||||
// ==========================================================
|
||||
|
||||
// ================= [ 新增实时数据 API ] =================
|
||||
CROW_ROUTE((*this), "/api/data/latest").methods("GET"_method)
|
||||
([this] {
|
||||
// 从缓存中获取所有设备的最新数据
|
||||
auto latest_data_map = m_live_data_cache.get_all_data();
|
||||
|
||||
crow::json::wvalue response;
|
||||
for (const auto& pair : latest_data_map) {
|
||||
// pair.first 是 device_id (string)
|
||||
// pair.second 是 json_data (string)
|
||||
// 我们需要将 json_data 字符串再次解析为 crow 的 json 对象
|
||||
response[pair.first] = crow::json::load(pair.second);
|
||||
}
|
||||
|
||||
return response;
|
||||
});
|
||||
|
||||
}
|
||||
|
|
@ -8,6 +8,7 @@
|
|||
|
||||
#include "systemMonitor/system_monitor.h"
|
||||
#include "deviceManager/device_manager.h"
|
||||
#include "dataCache/live_data_cache.h"
|
||||
|
||||
#include <thread>
|
||||
|
||||
|
|
@ -16,7 +17,11 @@
|
|||
class WebServer : public crow::Crow<crow::CORSHandler> {
|
||||
|
||||
public:
|
||||
WebServer(SystemMonitor::SystemMonitor& monitor, DeviceManager& deviceManager, uint16_t port = 8080);
|
||||
WebServer(SystemMonitor::SystemMonitor& monitor,
|
||||
DeviceManager& deviceManager,
|
||||
LiveDataCache& liveDataCache,
|
||||
uint16_t port = 8080
|
||||
);
|
||||
~WebServer();
|
||||
|
||||
WebServer(const WebServer&) = delete;
|
||||
|
|
@ -30,6 +35,8 @@ private:
|
|||
|
||||
SystemMonitor::SystemMonitor& m_monitor;
|
||||
DeviceManager& m_device_manager;
|
||||
LiveDataCache& m_live_data_cache;
|
||||
|
||||
uint16_t m_port;
|
||||
|
||||
std::thread m_thread;
|
||||
|
|
|
|||
Binary file not shown.
Loading…
Reference in New Issue