修改modbus相关函数,增加反向控制功能

This commit is contained in:
GuanYuankai 2025-10-16 08:47:55 +00:00
parent 480bfdd94f
commit 803bfcc377
12 changed files with 249 additions and 106 deletions

View File

@ -108,18 +108,23 @@ void DeviceManager::load_and_start(const std::string& config_path) {
}
void DeviceManager::stop_all() {
// --- <<< 添加锁 >>> ---
std::lock_guard<std::mutex> lock(m_mutex);
spdlog::info("Stopping all device services...");
for (auto& service : m_rtu_services) {
service->stop();
}
// For Asio-based objects, stopping is usually handled by stopping the io_context.
// If they have explicit stop logic, call it here.
m_rtu_services.clear();
for (auto& poller : m_tcp_pollers) {
poller->stop();
}
m_tcp_pollers.clear();
spdlog::info("All device services stopped.");
}
std::vector<DeviceInfo> DeviceManager::get_all_device_info() const {
// 使用 lock_guard 确保在函数返回前互斥锁能被自动释放
std::lock_guard<std::mutex> lock(m_mutex);
@ -160,4 +165,29 @@ std::vector<DeviceInfo> DeviceManager::get_all_device_info() const {
}
return all_devices;
}
bool DeviceManager::send_control_command(const std::string& device_id, uint16_t address, uint16_t value) {
std::lock_guard<std::mutex> lock(m_mutex);
// 首先,在 TCP poller 列表中查找
for (const auto& poller : m_tcp_pollers) {
if (poller->get_config().device_id == device_id) {
spdlog::info("Found TCP device '{}'. Dispatching write command to address {}.", device_id, address);
poller->write_single_register(address, value);
return true;
}
}
// 然后,在 RTU service 列表中查找
for (const auto& service : m_rtu_services) {
if (service->get_config().device_id == device_id) {
spdlog::info("Found RTU device '{}'. Dispatching write command to address {}.", device_id, address);
service->write_single_register(address, value);
return true;
}
}
spdlog::warn("send_control_command failed: Device with ID '{}' not found.", device_id);
return false;
}

View File

@ -55,6 +55,15 @@ public:
*/
void stop_all();
/**
* @brief Modbus设备发送一个写单个寄存器的命令
* @param device_id ID
* @param address
* @param value
* @return truefalse
*/
bool send_control_command(const std::string& device_id, uint16_t address, uint16_t value);
private:
boost::asio::io_context& m_io_context;
ReportDataCallback m_report_callback;

View File

@ -19,14 +19,6 @@
// 用于 ASIO 服务的全局 io_context
boost::asio::io_context g_io_context;
/**
* @brief (SIGINT, SIGTERM).
*/
// void signalHandler(int signum) {
// spdlog::warn("Interrupt signal ({}) received. Shutting down.", signum);
// g_io_context.stop();
// }
/**
* @brief MQTT
*/
@ -60,17 +52,29 @@ int main(int argc, char* argv[]) {
return 1;
}
// signal(SIGINT, signalHandler);
// signal(SIGTERM, signalHandler);
try {
DataCache data_cache;
MqttClient mqtt_client("tcp://mqtt-broker:1883", "edge-proxy-main-client");
MqttRouter mqtt_router(mqtt_client);
auto report_to_mqtt = [&](const UnifiedData& data) {
if (mqtt_client.is_connected()) {
// 网络正常,直接上报
std::string topic = "devices/" + data.device_id + "/data";
g_io_context.post([&, topic, payload = data.data_json]() {
mqtt_client.publish(topic, payload, 1, false);
});
} else {
// 网络断开,写入缓存
spdlog::warn("MQTT disconnected. Caching data for device '{}'.", data.device_id);
data_cache.add(data);
}
};
DeviceManager device_manager(g_io_context, report_to_mqtt);
MqttRouter mqtt_router(mqtt_client, device_manager);
std::vector<uint16_t> listen_ports = { 8888 };
TCPServer tcp_server(g_io_context, listen_ports, mqtt_client);
SystemMonitor::SystemMonitor monitor;
DataCache data_cache;
if (!data_cache.open("edge_data_cache.db")) {
spdlog::critical("Failed to initialize data cache. Exiting.");
return 1;
@ -90,23 +94,7 @@ int main(int argc, char* argv[]) {
boost::asio::steady_timer system_monitor_timer(g_io_context, std::chrono::seconds(15));
system_monitor_timer.async_wait(std::bind(poll_system_metrics, std::ref(system_monitor_timer), std::ref(monitor), std::ref(mqtt_client)));
// --- 创建包含缓存逻辑的统一数据上报回调函数 >>>
auto report_to_mqtt = [&](const UnifiedData& data) {
if (mqtt_client.is_connected()) {
// 网络正常,直接上报
std::string topic = "devices/" + data.device_id + "/data";
g_io_context.post([&, topic, payload = data.data_json]() {
mqtt_client.publish(topic, payload, 1, false);
});
} else {
// 网络断开,写入缓存
spdlog::warn("MQTT disconnected. Caching data for device '{}'.", data.device_id);
data_cache.add(data);
}
};
// 实例化设备管理器并从文件加载所有设备 (使用新的回调)
DeviceManager device_manager(g_io_context, report_to_mqtt);
device_manager.load_and_start("../config/devices.json");
WebServer web_server(monitor, device_manager,8080);

View File

@ -31,14 +31,12 @@ void ModbusMasterPoller::start() {
do_connect();
}
// --- <<< 新增 stop 方法实现 >>> ---
void ModbusMasterPoller::stop() {
if (!m_is_running.exchange(false)) {
// 如果之前已经是 false说明已经停止了直接返回
return;
}
// 使用 post 确保取消操作在 io_context 的事件循环中执行,保证线程安全
boost::asio::post(m_io_context, [this, self = shared_from_this()](){
spdlog::info("[Modbus TCP] Stopping poller for device '{}'...", m_config.device_id);
@ -54,7 +52,6 @@ void ModbusMasterPoller::stop() {
});
}
// --- <<< 新增 get_config 和 is_running 实现 >>> ---
const ModbusTcpDeviceConfig& ModbusMasterPoller::get_config() const {
return m_config;
}
@ -63,6 +60,74 @@ bool ModbusMasterPoller::is_running() const {
return m_is_running;
}
void ModbusMasterPoller::write_single_register(uint16_t address, uint16_t value) {
if (!m_is_running) {
spdlog::warn("[Modbus TCP] Device '{}': Poller is not running. Cannot write.", m_config.device_id);
return;
}
// 使用 post 将写操作投递到 io_context 线程中执行,以保证线程安全
boost::asio::post(m_io_context, [this, self = shared_from_this(), address, value]() {
spdlog::debug("[Modbus TCP] Device '{}': Writing value {} to address {}.", m_config.device_id, value, address);
// 1. 创建一个独立的写缓冲区
auto write_cmd_buffer = std::make_shared<std::vector<uint8_t>>();
// 2. 构建写命令PDU (功能码0x06)
// PDU = Address (2 bytes) + Value (2 bytes)
std::vector<uint8_t> pdu;
pdu.push_back(static_cast<uint8_t>((address >> 8) & 0xFF));
pdu.push_back(static_cast<uint8_t>(address & 0xFF));
pdu.push_back(static_cast<uint8_t>((value >> 8) & 0xFF));
pdu.push_back(static_cast<uint8_t>(value & 0xFF));
// 3. 构建MBAP头
write_cmd_buffer->resize(7 + pdu.size());
uint16_t tid = m_transaction_id++; // 原子地增加事务ID
uint16_t length = 1 + pdu.size(); // UnitID + PDU
(*write_cmd_buffer)[0] = static_cast<uint8_t>((tid >> 8) & 0xFF);
(*write_cmd_buffer)[1] = static_cast<uint8_t>(tid & 0xFF);
(*write_cmd_buffer)[2] = 0; (*write_cmd_buffer)[3] = 0; // Protocol ID
(*write_cmd_buffer)[4] = static_cast<uint8_t>((length >> 8) & 0xFF);
(*write_cmd_buffer)[5] = static_cast<uint8_t>(length & 0xFF);
(*write_cmd_buffer)[6] = m_config.slave_id;
std::copy(pdu.begin(), pdu.end(), write_cmd_buffer->begin() + 7);
// 4. 异步发送写命令
boost::asio::async_write(m_socket, boost::asio::buffer(*write_cmd_buffer),
[this, self, tid, write_cmd_buffer](const boost::system::error_code& ec, std::size_t) {
if (!m_is_running || ec) {
if (ec != boost::asio::error::operation_aborted) {
spdlog::error("[{}] Write command failed: {}", m_config.device_id, ec.message());
on_error("write_command");
}
return;
}
// 5. 异步读取响应
auto read_cmd_buffer = std::make_shared<std::array<uint8_t, 12>>(); // 写响应通常是12字节
boost::asio::async_read(m_socket, boost::asio::buffer(*read_cmd_buffer, 12),
[this, self, tid, read_cmd_buffer](const boost::system::error_code& ec, std::size_t) {
if (!m_is_running || ec) {
if (ec != boost::asio::error::operation_aborted) {
spdlog::error("[{}] Read command response failed: {}", m_config.device_id, ec.message());
on_error("read_command_response");
}
return;
}
// 简单检查一下事务ID是否匹配
uint16_t resp_tid = ((*read_cmd_buffer)[0] << 8) | (*read_cmd_buffer)[1];
if (resp_tid == tid) {
spdlog::info("[{}] Successfully wrote and received confirmation.", m_config.device_id);
} else {
spdlog::warn("[{}] Write command TID mismatch. Sent {}, Rcvd {}.", m_config.device_id, tid, resp_tid);
}
});
});
});
}
void ModbusMasterPoller::do_connect() {
auto self = shared_from_this();
tcp::resolver resolver(m_io_context);

View File

@ -8,6 +8,7 @@
#include <memory>
#include <string>
#include <vector>
#include <atomic>
class ModbusMasterPoller : public std::enable_shared_from_this<ModbusMasterPoller> {
public:
@ -22,6 +23,13 @@ public:
const ModbusTcpDeviceConfig& get_config() const;
bool is_running() const;
/**
* @brief (线)
* @param address
* @param value
*/
void write_single_register(uint16_t address, uint16_t value);
private:
void do_connect();
void do_poll();
@ -40,7 +48,7 @@ private:
std::vector<uint8_t> m_write_buffer;
std::array<uint8_t, 260> m_read_buffer;
uint16_t m_transaction_id = 0;
std::atomic<uint16_t> m_transaction_id{0};
std::atomic<bool> m_is_running{false};
};

View File

@ -40,20 +40,29 @@ bool ModbusRtuPollerService::is_running() const {
return !m_stop_flag && m_thread.joinable();
}
void ModbusRtuPollerService::write_single_register(uint16_t address, uint16_t value) {
std::lock_guard<std::mutex> lock(m_client_mutex);
try {
spdlog::debug("[Modbus RTU] Device '{}': Writing value {} to address {}.", m_config.device_id, value, address);
m_client.writeSingleRegister(m_config.slave_id, address, value);
spdlog::info("[Modbus RTU] Device '{}': Successfully wrote value {} to address {}.", m_config.device_id, value, address);
} catch (const std::exception& e) {
spdlog::error("[Modbus RTU] Device '{}': Failed to write to address {}: {}", m_config.device_id, address, e.what());
}
}
void ModbusRtuPollerService::run() {
// 检查是否有数据点被配置
if (m_config.data_points.empty()) {
spdlog::warn("[Modbus RTU] Device '{}' has no data points configured. Thread will not run.", m_config.device_id);
return;
}
// 设置并打开串口
if (!m_client.setPortSettings(m_config.port_path, m_config.baud_rate)) {
spdlog::error("[Modbus RTU] Failed to set up serial port '{}' for device '{}'. Thread exiting.", m_config.port_path, m_config.device_id);
return;
}
// 1. 为了提高效率,我们计算出需要一次性读取的连续寄存器范围
auto [min_it, max_it] = std::minmax_element(m_config.data_points.begin(), m_config.data_points.end(),
[](const DataPointConfig& a, const DataPointConfig& b) {
return a.address < b.address;
@ -61,55 +70,49 @@ void ModbusRtuPollerService::run() {
uint16_t start_address = min_it->address;
uint16_t last_address = max_it->address;
// 考虑多寄存器数据类型如FLOAT32会占用额外的寄存器
if (max_it->type == ModbusDataType::UINT32 || max_it->type == ModbusDataType::INT32 || max_it->type == ModbusDataType::FLOAT32) {
last_address += 1; // 32位数据占用2个16位寄存器
last_address += 1;
}
uint16_t quantity = last_address - start_address + 1;
spdlog::info("[Modbus RTU] Device '{}' will poll {} registers starting from address {}.", m_config.device_id, quantity, start_address);
// 线程主循环
while (!m_stop_flag) {
try {
// 2. 一次性读取所有需要的连续寄存器
std::vector<uint16_t> raw_registers = m_client.readHoldingRegisters(m_config.slave_id, start_address, quantity);
// 3. 将返回的寄存器数组转换为 (地址 -> 值) 的映射,方便解析器按地址查找
std::map<uint16_t, uint16_t> registers_map;
for (uint16_t i = 0; i < raw_registers.size(); ++i) {
registers_map[start_address + i] = raw_registers[i];
{ // <--- 创建一个新的作用域来控制锁的生命周期
std::lock_guard<std::mutex> lock(m_client_mutex);
try {
std::vector<uint16_t> raw_registers = m_client.readHoldingRegisters(m_config.slave_id, start_address, quantity);
std::map<uint16_t, uint16_t> registers_map;
for (uint16_t i = 0; i < raw_registers.size(); ++i) {
registers_map[start_address + i] = raw_registers[i];
}
nlohmann::json data_j = GenericModbusParser::parse(registers_map, m_config.data_points);
UnifiedData report_data;
report_data.device_id = m_config.device_id;
report_data.timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
report_data.data_json = data_j.dump();
if (m_report_callback) {
m_report_callback(report_data);
}
} catch (const std::exception& e) {
spdlog::error("[Modbus RTU] Error during communication with device '{}': {}", m_config.device_id, e.what());
}
} // <--- 锁在这里被释放
// 4. 使用通用的、配置驱动的解析器进行解析
nlohmann::json data_j = GenericModbusParser::parse(registers_map, m_config.data_points);
// 5. 封装成 UnifiedData 结构并上报
UnifiedData report_data;
report_data.device_id = m_config.device_id;
report_data.timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
report_data.data_json = data_j.dump();
if (m_report_callback) {
m_report_callback(report_data);
}
} catch (const std::exception& e) {
spdlog::error("[Modbus RTU] Error during communication with device '{}': {}", m_config.device_id, e.what());
}
// 在本线程中等待,不影响主线程的事件循环
auto wake_up_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(m_config.poll_interval_ms);
while (std::chrono::steady_clock::now() < wake_up_time) {
if (m_stop_flag) {
break;
}
// 每次只“打盹”100毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
// 线程退出前关闭串口
m_client.closePort();
spdlog::info("[RTU Poller {}] Run loop finished. Thread exiting.", m_config.device_id);
}

View File

@ -8,6 +8,7 @@
#include <thread>
#include <atomic>
#include <string>
#include <mutex>
/**
* @brief Modbus RTU
@ -46,7 +47,6 @@ public:
*/
void stop();
// --- <<< 新增方法 >>> ---
/**
* @brief (const-ref to avoid copy)
*/
@ -56,7 +56,13 @@ public:
* @brief
*/
bool is_running() const;
// --- <<< END >>> ---
/**
* @brief
* @param address
* @param value
*/
void write_single_register(uint16_t address, uint16_t value);
private:
/**
* @brief 线
@ -71,6 +77,8 @@ private:
std::thread m_thread;
std::atomic<bool> m_stop_flag{false};
std::mutex m_client_mutex;
};
#endif // MODBUS_RTU_POLLER_SERVICE_H

View File

@ -1,26 +1,51 @@
// 文件名: command_handler.cpp
#include "command_handler.h"
#include "spdlog/spdlog.h"
#include "vendor/nlohmann/json.hpp"
using json = nlohmann::json;
CommandHandler::CommandHandler(MqttClient& client) : m_client(client) {}
// 构造函数实现更新
CommandHandler::CommandHandler(MqttClient& client, DeviceManager& deviceManager)
: m_client(client), m_device_manager(deviceManager) {}
void CommandHandler::handle(mqtt::const_message_ptr msg) {
spdlog::info("DataHandler is processing message from topic: {}", msg->get_topic());
const std::string topic = msg->get_topic();
const std::string payload = msg->get_payload_str();
spdlog::info("CommandHandler received a command on topic '{}'", topic);
try {
// (这里是原来 MqttRouter 中处理数据上报的逻辑)
auto received_json = json::parse(msg->get_payload_str());
if (received_json.contains("temperature")) {
double temp = received_json["temperature"];
json forward_json;
forward_json["source_topic"] = msg->get_topic();
forward_json["processed_temp"] = temp;
forward_json["timestamp"] = time(0);
const std::string output_topic = "proxy/processed/temperature";
m_client.publish(output_topic, forward_json.dump());
auto cmd_json = json::parse(payload);
// 校验命令格式
if (!cmd_json.contains("deviceId") || !cmd_json.contains("address") || !cmd_json.contains("value")) {
spdlog::warn("Command JSON from topic '{}' is missing required fields.", topic);
return;
}
} catch (const json::parse_error& e) {
spdlog::error("JSON parse error in DataHandler: {}", e.what());
// 解析命令
std::string device_id = cmd_json.at("deviceId").get<std::string>();
uint16_t address = cmd_json.at("address").get<uint16_t>();
uint16_t value = cmd_json.at("value").get<uint16_t>();
// 通过 DeviceManager 发送控制指令
bool success = m_device_manager.send_control_command(device_id, address, value);
// (可选) 将执行结果反馈到 MQTT形成闭环
json response_json;
response_json["success"] = success;
response_json["original_command"] = cmd_json;
std::string response_topic = "proxy/command/result/" + device_id;
m_client.publish(response_topic, response_json.dump());
if (success) {
spdlog::info("Successfully dispatched command to device '{}'.", device_id);
} else {
spdlog::error("Failed to dispatch command to device '{}'.", device_id);
}
} catch (const json::exception& e) {
spdlog::error("Failed to parse command JSON from topic '{}': {}", topic, e.what());
}
}

View File

@ -1,10 +1,13 @@
// 文件名: command_handler.h
#pragma once
#include "../mqtt_client.h" // 注意 #include 路径的变化
#include "../mqtt_client.h"
#include "deviceManager/device_manager.h"
class CommandHandler {
public:
explicit CommandHandler(MqttClient& client);
explicit CommandHandler(MqttClient& client, DeviceManager& deviceManager);
void handle(mqtt::const_message_ptr msg);
private:
MqttClient& m_client;
DeviceManager& m_device_manager;
};

View File

@ -7,20 +7,24 @@ using json = nlohmann::json;
DataHandler::DataHandler(MqttClient& client) : m_client(client) {}
void DataHandler::handle(mqtt::const_message_ptr msg) {
spdlog::info("DataHandler is processing message from topic: {}", msg->get_topic());
const auto& input_topic = msg->get_topic();
const auto& payload_str = msg->get_payload_str();
spdlog::debug("DataHandler received data from topic: {}", input_topic);
try {
// (这里是原来 MqttRouter 中处理数据上报的逻辑)
auto received_json = json::parse(msg->get_payload_str());
if (received_json.contains("temperature")) {
double temp = received_json["temperature"];
json forward_json;
forward_json["source_topic"] = msg->get_topic();
forward_json["processed_temp"] = temp;
forward_json["timestamp"] = time(0);
const std::string output_topic = "proxy/processed/temperature";
m_client.publish(output_topic, forward_json.dump());
}
auto payload_json = json::parse(payload_str);
payload_json["_proxy_source_topic"] = input_topic;
payload_json["_proxy_processed_ts"] = time(nullptr);
const std::string output_topic = "proxy/processed/data";
m_client.publish(output_topic, payload_json.dump());
spdlog::debug("Successfully forwarded data from '{}' to '{}'", input_topic, output_topic);
} catch (const json::parse_error& e) {
spdlog::error("JSON parse error in DataHandler: {}", e.what());
spdlog::error("JSON parse error in DataHandler for topic '{}': {}", input_topic, e.what());
}
}

View File

@ -2,9 +2,9 @@
#include "spdlog/spdlog.h"
#include "utils/mqtt_topic_matcher.h"
MqttRouter::MqttRouter(MqttClient& client) : m_client(client) {
MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager) : m_client(client) {
m_data_handler = std::make_unique<DataHandler>(m_client);
m_command_handler = std::make_unique<CommandHandler>(m_client);
m_command_handler = std::make_unique<CommandHandler>(m_client, deviceManager);
m_client.set_message_callback([this](mqtt::const_message_ptr msg) {
this->on_message_arrived(std::move(msg));

View File

@ -7,7 +7,7 @@
class MqttRouter {
public:
// 构造函数现在接收 MqttClient 和所有的处理器
MqttRouter(MqttClient& client);
MqttRouter(MqttClient& client, DeviceManager& deviceManager);
void start();
private: