完成modbus-rtu服务的重构
This commit is contained in:
parent
6d6899549b
commit
e06e32fad2
|
|
@ -41,8 +41,9 @@ add_library(edge_proxy_lib STATIC
|
|||
src/systemMonitor/system_monitor.cc
|
||||
|
||||
#modbus
|
||||
src/modbus/modbus_temperature_humidity.cc
|
||||
src/modbus/modbus_rtu_client.cc
|
||||
src/modbus/modbus_rtu_poller_service.cc
|
||||
src/modbus/generic_modbus_parser.cc
|
||||
# --- Modbus Master ---
|
||||
src/protocol/modbus/modbus_protocol.cc
|
||||
src/modbus/modbus_master_poller.cc
|
||||
|
|
|
|||
58
src/main.cpp
58
src/main.cpp
|
|
@ -4,6 +4,8 @@
|
|||
#include "systemMonitor/system_monitor.h"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include "modbus/modbus_master_poller.h"
|
||||
#include "modbus/modbus_rtu_poller_service.h"
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <csignal>
|
||||
|
|
@ -88,20 +90,52 @@ int main(int argc, char* argv[]) {
|
|||
};
|
||||
|
||||
// 配置并启动Modbus轮询器
|
||||
ModbusDeviceConfig temp_sensor_config = {
|
||||
.device_id = "temp_sensor_workshop1",
|
||||
.ip_address = "192.168.1.120", // 您的Modbus设备的IP地址
|
||||
.port = 502, // Modbus TCP标准端口
|
||||
.slave_id = 1, // 设备的从站ID
|
||||
.start_address = 0, // 要读取的第一个寄存器的地址
|
||||
.quantity = 8, // 从起始地址开始,总共读取多少个寄存器
|
||||
.poll_interval_ms = 2000 // 每2000毫秒轮询一次
|
||||
};
|
||||
// =========================================================================
|
||||
// ModbusDeviceConfig temp_sensor_config = {
|
||||
// .device_id = "temp_sensor_workshop1",
|
||||
// .ip_address = "192.168.1.120", // 您的Modbus设备的IP地址
|
||||
// .port = 502, // Modbus TCP标准端口
|
||||
// .slave_id = 1, // 设备的从站ID
|
||||
// .start_address = 0, // 要读取的第一个寄存器的地址
|
||||
// .quantity = 8, // 从起始地址开始,总共读取多少个寄存器
|
||||
// .poll_interval_ms = 2000 // 每2000毫秒轮询一次
|
||||
// };
|
||||
|
||||
// 创建轮询器实例 (使用std::make_shared确保生命周期)
|
||||
auto poller = std::make_shared<ModbusMasterPoller>(g_io_context, temp_sensor_config, report_to_mqtt);
|
||||
poller->start();
|
||||
// auto poller = std::make_shared<ModbusMasterPoller>(g_io_context, temp_sensor_config, report_to_mqtt);
|
||||
// poller->start();
|
||||
|
||||
|
||||
// ==========================================================================
|
||||
// modbus rtu轮询服务启动
|
||||
ModbusRtuDeviceConfig temp_sensor_config = {
|
||||
.device_id = "rtu_temp_sensor_lab",
|
||||
.port_path = "/dev/ttyS7",
|
||||
.baud_rate = 9600,
|
||||
.slave_id = 0x01,
|
||||
.poll_interval_ms = 5000,
|
||||
.data_points = {
|
||||
{"temperature", 0x0000, ModbusDataType::INT16, 0.1}, // 地址0, 16位有符号, 结果乘以0.1
|
||||
{"humidity", 0x0001, ModbusDataType::UINT16, 0.1} // 地址1, 16位无符号, 结果乘以0.1
|
||||
}
|
||||
};
|
||||
ModbusRtuPollerService temp_sensor_service(temp_sensor_config, report_to_mqtt);
|
||||
temp_sensor_service.start();
|
||||
|
||||
// 示例2:配置一个水表
|
||||
// ModbusRtuDeviceConfig water_meter_config = {
|
||||
// .device_id = "water_meter_main_gate",
|
||||
// .port_path = "/dev/ttyS7", // 假设在同一条RS485总线上
|
||||
// .baud_rate = 9600,
|
||||
// .slave_id = 0x05, // 不同的从站ID
|
||||
// .poll_interval_ms = 10000,
|
||||
// .data_points = {
|
||||
// {"total_flow", 0x0100, ModbusDataType::FLOAT32, 1.0}, // 地址256, 32位浮点数
|
||||
// {"flow_rate", 0x0102, ModbusDataType::FLOAT32, 1.0} // 地址258, 32位浮点数
|
||||
// }
|
||||
// };
|
||||
// ModbusRtuPollerService water_meter_service(water_meter_config, report_to_mqtt);
|
||||
// water_meter_service.start();
|
||||
|
||||
|
||||
spdlog::info("All services are running. Press Ctrl+C to exit.");
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,54 @@
|
|||
// 文件名: src/modbus/generic_modbus_parser.cc
|
||||
#include "generic_modbus_parser.h"
|
||||
#include <stdexcept>
|
||||
#include <cstring> // For std::memcpy
|
||||
|
||||
nlohmann::json GenericModbusParser::parse(const std::map<uint16_t, uint16_t>& registers_map,
|
||||
const std::vector<DataPointConfig>& data_points) {
|
||||
nlohmann::json result_json;
|
||||
|
||||
for (const auto& dp_config : data_points) {
|
||||
try {
|
||||
switch (dp_config.type) {
|
||||
case ModbusDataType::UINT16: {
|
||||
uint16_t raw_val = registers_map.at(dp_config.address);
|
||||
result_json[dp_config.name] = static_cast<double>(raw_val) * dp_config.scale;
|
||||
break;
|
||||
}
|
||||
case ModbusDataType::INT16: {
|
||||
int16_t raw_val = static_cast<int16_t>(registers_map.at(dp_config.address));
|
||||
result_json[dp_config.name] = static_cast<double>(raw_val) * dp_config.scale;
|
||||
break;
|
||||
}
|
||||
case ModbusDataType::UINT32: {
|
||||
uint16_t high = registers_map.at(dp_config.address);
|
||||
uint16_t low = registers_map.at(dp_config.address + 1);
|
||||
uint32_t raw_val = (static_cast<uint32_t>(high) << 16) | low;
|
||||
result_json[dp_config.name] = static_cast<double>(raw_val) * dp_config.scale;
|
||||
break;
|
||||
}
|
||||
case ModbusDataType::INT32: {
|
||||
uint16_t high = registers_map.at(dp_config.address);
|
||||
uint16_t low = registers_map.at(dp_config.address + 1);
|
||||
uint32_t temp_val = (static_cast<uint32_t>(high) << 16) | low;
|
||||
int32_t raw_val = static_cast<int32_t>(temp_val);
|
||||
result_json[dp_config.name] = static_cast<double>(raw_val) * dp_config.scale;
|
||||
break;
|
||||
}
|
||||
case ModbusDataType::FLOAT32: {
|
||||
uint16_t high = registers_map.at(dp_config.address);
|
||||
uint16_t low = registers_map.at(dp_config.address + 1);
|
||||
uint32_t temp_val = (static_cast<uint32_t>(high) << 16) | low;
|
||||
float raw_val;
|
||||
std::memcpy(&raw_val, &temp_val, sizeof(float));
|
||||
result_json[dp_config.name] = static_cast<double>(raw_val) * dp_config.scale;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (const std::out_of_range& e) {
|
||||
// 如果在map中找不到地址,说明轮询失败或配置错误
|
||||
result_json[dp_config.name] = nullptr; // 使用null表示该值读取失败
|
||||
}
|
||||
}
|
||||
return result_json;
|
||||
}
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
// 文件名: src/modbus/generic_modbus_parser.h
|
||||
#ifndef GENERIC_MODBUS_PARSER_H
|
||||
#define GENERIC_MODBUS_PARSER_H
|
||||
|
||||
#include "modbus_common.h"
|
||||
#include <nlohmann/json.hpp>
|
||||
#include <vector>
|
||||
#include <map>
|
||||
|
||||
class GenericModbusParser {
|
||||
public:
|
||||
/**
|
||||
* @brief 将从Modbus设备读取的原始寄存器映射,根据配置解析成JSON对象
|
||||
* @param registers_map 以地址为键,原始寄存器值为值的map
|
||||
* @param data_points 描述如何解析这些寄存器的配置列表
|
||||
* @return 包含所有解析后数据的 nlohmann::json 对象
|
||||
*/
|
||||
static nlohmann::json parse(const std::map<uint16_t, uint16_t>& registers_map,
|
||||
const std::vector<DataPointConfig>& data_points);
|
||||
};
|
||||
|
||||
#endif // GENERIC_MODBUS_PARSER_H
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
// 文件名: src/modbus/modbus_common.h
|
||||
#ifndef MODBUS_COMMON_H
|
||||
#define MODBUS_COMMON_H
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <cstdint>
|
||||
|
||||
// 定义数据类型,用于指导如何解析寄存器
|
||||
enum class ModbusDataType {
|
||||
UINT16, // 16位无符号整数
|
||||
INT16, // 16位有符号整数
|
||||
UINT32, // 32位无符号整数 (占用2个寄存器)
|
||||
INT32, // 32位有符号整数 (占用2个寄存器)
|
||||
FLOAT32 // 32位浮点数 (占用2个寄存器)
|
||||
};
|
||||
|
||||
// 描述一个具体的数据采集点
|
||||
struct DataPointConfig {
|
||||
std::string name; // 数据点的名字, e.g., "temperature", "total_flow"
|
||||
uint16_t address; // Modbus寄存器起始地址
|
||||
ModbusDataType type; // 数据类型
|
||||
double scale = 1.0; // 缩放因子 (原始值 * scale = 最终值)
|
||||
};
|
||||
|
||||
// 描述一个完整的Modbus RTU设备,包含多个采集点
|
||||
struct ModbusRtuDeviceConfig {
|
||||
std::string device_id; // 设备的唯一ID
|
||||
std::string port_path;
|
||||
unsigned int baud_rate;
|
||||
uint8_t slave_id;
|
||||
int poll_interval_ms;
|
||||
std::vector<DataPointConfig> data_points; // 该设备上所有需要采集的数据点
|
||||
};
|
||||
|
||||
#endif // MODBUS_COMMON_H
|
||||
|
|
@ -0,0 +1,100 @@
|
|||
// 文件名: src/modbus/modbus_rtu_poller_service.cc
|
||||
#include "modbus_rtu_poller_service.h"
|
||||
#include "generic_modbus_parser.h" // 使用通用解析器
|
||||
#include "spdlog/spdlog.h"
|
||||
#include <chrono>
|
||||
#include <algorithm> // for std::minmax_element
|
||||
|
||||
ModbusRtuPollerService::ModbusRtuPollerService(ModbusRtuDeviceConfig config, ReportDataCallback report_cb)
|
||||
: m_config(std::move(config)),
|
||||
m_report_callback(std::move(report_cb)) {}
|
||||
|
||||
ModbusRtuPollerService::~ModbusRtuPollerService() {
|
||||
stop();
|
||||
}
|
||||
|
||||
void ModbusRtuPollerService::start() {
|
||||
if (m_thread.joinable()) {
|
||||
spdlog::warn("[Modbus RTU Service] Poller for device '{}' is already running.", m_config.device_id);
|
||||
return;
|
||||
}
|
||||
m_stop_flag = false;
|
||||
// 启动一个新线程,并将 run() 方法作为入口点
|
||||
// 'this' 指针被传递,以便新线程可以调用类的成员函数
|
||||
m_thread = std::thread(&ModbusRtuPollerService::run, this);
|
||||
spdlog::info("[Modbus RTU Service] Poller for device '{}' started in a background thread.", m_config.device_id);
|
||||
}
|
||||
|
||||
void ModbusRtuPollerService::stop() {
|
||||
m_stop_flag = true;
|
||||
if (m_thread.joinable()) {
|
||||
m_thread.join(); // 等待线程安全退出
|
||||
spdlog::info("[Modbus RTU Service] Poller for device '{}' has been stopped.", m_config.device_id);
|
||||
}
|
||||
}
|
||||
|
||||
void ModbusRtuPollerService::run() {
|
||||
// 检查是否有数据点被配置
|
||||
if (m_config.data_points.empty()) {
|
||||
spdlog::warn("[Modbus RTU] Device '{}' has no data points configured. Thread will not run.", m_config.device_id);
|
||||
return;
|
||||
}
|
||||
|
||||
// 设置并打开串口
|
||||
if (!m_client.setPortSettings(m_config.port_path, m_config.baud_rate)) {
|
||||
spdlog::error("[Modbus RTU] Failed to set up serial port '{}' for device '{}'. Thread exiting.", m_config.port_path, m_config.device_id);
|
||||
return;
|
||||
}
|
||||
|
||||
// 1. 为了提高效率,我们计算出需要一次性读取的连续寄存器范围
|
||||
auto [min_it, max_it] = std::minmax_element(m_config.data_points.begin(), m_config.data_points.end(),
|
||||
[](const DataPointConfig& a, const DataPointConfig& b) {
|
||||
return a.address < b.address;
|
||||
});
|
||||
uint16_t start_address = min_it->address;
|
||||
uint16_t last_address = max_it->address;
|
||||
|
||||
// 考虑多寄存器数据类型(如FLOAT32)会占用额外的寄存器
|
||||
if (max_it->type == ModbusDataType::UINT32 || max_it->type == ModbusDataType::INT32 || max_it->type == ModbusDataType::FLOAT32) {
|
||||
last_address += 1; // 32位数据占用2个16位寄存器
|
||||
}
|
||||
uint16_t quantity = last_address - start_address + 1;
|
||||
|
||||
spdlog::info("[Modbus RTU] Device '{}' will poll {} registers starting from address {}.", m_config.device_id, quantity, start_address);
|
||||
|
||||
// 线程主循环
|
||||
while (!m_stop_flag) {
|
||||
try {
|
||||
// 2. 一次性读取所有需要的连续寄存器
|
||||
std::vector<uint16_t> raw_registers = m_client.readHoldingRegisters(m_config.slave_id, start_address, quantity);
|
||||
|
||||
// 3. 将返回的寄存器数组转换为 (地址 -> 值) 的映射,方便解析器按地址查找
|
||||
std::map<uint16_t, uint16_t> registers_map;
|
||||
for (uint16_t i = 0; i < raw_registers.size(); ++i) {
|
||||
registers_map[start_address + i] = raw_registers[i];
|
||||
}
|
||||
|
||||
// 4. 使用通用的、配置驱动的解析器进行解析
|
||||
nlohmann::json data_j = GenericModbusParser::parse(registers_map, m_config.data_points);
|
||||
|
||||
// 5. 封装成 UnifiedData 结构并上报
|
||||
UnifiedData report_data;
|
||||
report_data.device_id = m_config.device_id;
|
||||
report_data.timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
report_data.data_json = data_j.dump();
|
||||
|
||||
if (m_report_callback) {
|
||||
m_report_callback(report_data);
|
||||
}
|
||||
|
||||
} catch (const std::exception& e) {
|
||||
spdlog::error("[Modbus RTU] Error during communication with device '{}': {}", m_config.device_id, e.what());
|
||||
}
|
||||
|
||||
// 在本线程中等待,不影响主线程的事件循环
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(m_config.poll_interval_ms));
|
||||
}
|
||||
|
||||
// 线程退出前关闭串口
|
||||
m_client.closePort();
|
||||
}
|
||||
|
|
@ -0,0 +1,65 @@
|
|||
// 文件名: src/modbus/modbus_rtu_poller_service.h
|
||||
#ifndef MODBUS_RTU_POLLER_SERVICE_H
|
||||
#define MODBUS_RTU_POLLER_SERVICE_H
|
||||
|
||||
#include "protocol/iprotocol_adapter.h"
|
||||
#include "modbus_rtu_client.h"
|
||||
#include "modbus_common.h" // 包含 ModbusRtuDeviceConfig 和 DataPointConfig 定义
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include <string>
|
||||
|
||||
/**
|
||||
* @brief Modbus RTU 轮询服务类
|
||||
* * 负责在一个独立的后台线程中,根据配置周期性地轮询一个Modbus RTU设备。
|
||||
* 这种设计将同步阻塞的串口IO操作与主程序的异步事件循环隔离开来,
|
||||
* 避免了主线程被阻塞。
|
||||
*/
|
||||
class ModbusRtuPollerService {
|
||||
public:
|
||||
/**
|
||||
* @brief 构造函数
|
||||
* @param config 描述了要连接的设备、串口参数以及需要采集的数据点列表
|
||||
* @param report_cb 用于将解析后的数据上报给核心业务层的回调函数
|
||||
*/
|
||||
ModbusRtuPollerService(ModbusRtuDeviceConfig config, ReportDataCallback report_cb);
|
||||
|
||||
/**
|
||||
* @brief 析构函数
|
||||
* * 确保在对象销毁时,后台线程能够被安全地停止和清理。
|
||||
*/
|
||||
~ModbusRtuPollerService();
|
||||
|
||||
// 禁止拷贝和赋值,因为该类管理着一个线程资源
|
||||
ModbusRtuPollerService(const ModbusRtuPollerService&) = delete;
|
||||
ModbusRtuPollerService& operator=(const ModbusRtuPollerService&) = delete;
|
||||
|
||||
/**
|
||||
* @brief 启动轮询服务
|
||||
* * 创建并启动一个新的后台线程来执行 run() 方法中的轮询逻辑。
|
||||
*/
|
||||
void start();
|
||||
|
||||
/**
|
||||
* @brief 停止轮询服务
|
||||
* * 设置停止标志位,并等待后台线程安全地执行完毕并退出。
|
||||
*/
|
||||
void stop();
|
||||
|
||||
private:
|
||||
/**
|
||||
* @brief 线程的主循环函数
|
||||
* * 该函数是后台线程的入口点。它包含了连接串口、周期性轮询、
|
||||
* 解析数据和上报数据的完整逻辑。
|
||||
*/
|
||||
void run();
|
||||
|
||||
ModbusRtuDeviceConfig m_config;
|
||||
ReportDataCallback m_report_callback;
|
||||
ModbusRTUClient m_client;
|
||||
|
||||
std::thread m_thread;
|
||||
std::atomic<bool> m_stop_flag{false};
|
||||
};
|
||||
|
||||
#endif // MODBUS_RTU_POLLER_SERVICE_H
|
||||
91
src/test.cc
91
src/test.cc
|
|
@ -4,97 +4,10 @@
|
|||
#include <string>
|
||||
#include <vector>
|
||||
#include <stdexcept>
|
||||
#include <iomanip> // For std::setw, std::setfill, std::hex, std::dec
|
||||
#include <iomanip>
|
||||
|
||||
// Include the new header files
|
||||
#include "modbus/modbus_rtu_client.h"
|
||||
#include "modbus/modbus_temperature_humidity.h"
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
std::cout << "--- Modbus RTU Sensor Client ---" << std::endl;
|
||||
|
||||
std::string port_path = "/dev/ttyS7";
|
||||
unsigned int baud_rate = 9600;
|
||||
uint8_t slave_id = 0x01;
|
||||
|
||||
if (argc > 1) {
|
||||
port_path = argv[1];
|
||||
}
|
||||
if (argc > 2) {
|
||||
baud_rate = static_cast<unsigned int>(std::stoul(argv[2]));
|
||||
}
|
||||
if (argc > 3) {
|
||||
slave_id = static_cast<uint8_t>(std::stoul(argv[3]));
|
||||
}
|
||||
|
||||
ModbusRTUClient client;
|
||||
|
||||
// Set and open serial port settings
|
||||
if (!client.setPortSettings(port_path, baud_rate, 8, 1, 'N')) {
|
||||
std::cerr << "Failed to set up serial port. Exiting." << std::endl;
|
||||
return 1;
|
||||
}
|
||||
|
||||
std::cout << "\nStarting continuous Modbus polling. Press Ctrl+C to exit." << std::endl;
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
std::vector<uint16_t> registers;
|
||||
|
||||
// Read Temperature (FC 0x03, address 0x0000, 1 register)
|
||||
registers = client.readHoldingRegisters(slave_id, 0x0000, 1);
|
||||
printRegisters("Temperature", registers);
|
||||
|
||||
// Read Humidity (FC 0x03, address 0x0001, 1 register)
|
||||
registers = client.readHoldingRegisters(slave_id, 0x0001, 1);
|
||||
printRegisters("Humidity", registers);
|
||||
|
||||
// Read Temperature and Humidity (FC 0x03, address 0x0000, 2 registers)
|
||||
registers = client.readHoldingRegisters(slave_id, 0x0000, 2);
|
||||
printRegisters("Temperature, Humidity (Combined)", registers);
|
||||
|
||||
// Read Temperature Alarm Status (FC 0x04, address 0x0030, 1 register)
|
||||
registers = client.readInputRegisters(slave_id, 0x0030, 1);
|
||||
printRegisters("Temperature Alarm Status", registers);
|
||||
|
||||
// Read Humidity Alarm Status (FC 0x04, address 0x0031, 1 register)
|
||||
registers = client.readInputRegisters(slave_id, 0x0031, 1);
|
||||
printRegisters("Humidity Alarm Status", registers);
|
||||
|
||||
// Read Device Info (using a dummy address for example, adjust as per device docs)
|
||||
// Assuming 0x0031 is some device common info, not strictly alarm
|
||||
registers = client.readHoldingRegisters(slave_id, 0x0031, 1);
|
||||
printRegisters("Device Info", registers);
|
||||
|
||||
// Example: Read Modbus Address (FC 0x03, address 0x1000, 1 register)
|
||||
// Note: This uses the low-level ModbusRequest/ModbusResponse directly
|
||||
// to show how it can be used if the high-level API is not sufficient or for specific needs.
|
||||
ModbusRequest req_addr = ModbusRequest::createReadRegistersRequest(slave_id, 0x03, 0x1000, 1);
|
||||
ModbusResponse res_addr = client.sendAndReceive(req_addr);
|
||||
// Check if response is valid before parsing
|
||||
if (!res_addr.is_exception) {
|
||||
printRegisters("Modbus Address", res_addr.getRegisters());
|
||||
} else {
|
||||
std::cerr << "Failed to read Modbus Address: ";
|
||||
res_addr.print(); // Print detailed exception
|
||||
}
|
||||
|
||||
// Example: Write a single register (FC 0x06, address 0x0002, value 123)
|
||||
// For demonstration, let's assume register 0x0002 can be written.
|
||||
// Be careful when writing to actual hardware!
|
||||
// std::cout << "Attempting to write 123 to register 0x0002..." << std::endl;
|
||||
// client.writeSingleRegister(slave_id, 0x0002, 123);
|
||||
// std::cout << "Write single register complete (or exception thrown if failed)." << std::endl;
|
||||
|
||||
|
||||
} catch (const std::exception& e) {
|
||||
std::cerr << "--- Error during Modbus communication: " << e.what() << " ---" << std::endl;
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2)); // Poll every 2 seconds
|
||||
}
|
||||
|
||||
client.closePort();
|
||||
std::cout << "\nProgram terminated." << std::endl;
|
||||
printf("HELLO");
|
||||
return 0;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue