bonus-edge-proxy/src/modbus/modbus_master_poller.cc

161 lines
6.3 KiB
C++

// 文件名: src/modbus/modbus_master_poller.cc
#include "modbus_master_poller.h"
#include "protocol/modbus/modbus_protocol.h"
#include "spdlog/spdlog.h"
#include <nlohmann/json.hpp>
#include <chrono>
using boost::asio::ip::tcp;
ModbusMasterPoller::ModbusMasterPoller(boost::asio::io_context& io_context,
ModbusDeviceConfig config,
ReportDataCallback report_cb)
: m_io_context(io_context),
m_socket(io_context),
m_timer(io_context),
m_config(std::move(config)),
m_report_callback(std::move(report_cb)) {}
void ModbusMasterPoller::start() {
do_connect();
}
void ModbusMasterPoller::do_connect() {
auto self = shared_from_this();
tcp::resolver resolver(m_io_context);
spdlog::info("[{}] Connecting to {}:{}...", m_config.device_id, m_config.ip_address, m_config.port);
resolver.async_resolve(m_config.ip_address, std::to_string(m_config.port),
[this, self](const boost::system::error_code& ec, tcp::resolver::results_type endpoints) {
if (ec) {
spdlog::error("[{}] Resolve failed: {}", m_config.device_id, ec.message());
on_error("resolve");
return;
}
boost::asio::async_connect(m_socket, endpoints,
[this, self](const boost::system::error_code& ec, const tcp::endpoint& /*endpoint*/) {
if (ec) {
spdlog::error("[{}] Connect failed: {}", m_config.device_id, ec.message());
on_error("connect");
return;
}
spdlog::info("[{}] Connection successful.", m_config.device_id);
do_poll(); // 立即开始第一次轮询
});
});
}
void ModbusMasterPoller::on_error(const std::string& stage) {
if (m_socket.is_open()) {
m_socket.close();
}
// 5秒后尝试重连
spdlog::info("[{}] Reconnecting in 5 seconds after error at stage: {}", m_config.device_id, stage);
m_timer.expires_after(std::chrono::seconds(5));
m_timer.async_wait([this, self=shared_from_this()](const boost::system::error_code& /*ec*/) {
do_connect();
});
}
void ModbusMasterPoller::do_poll() {
// 1. 创建PDU
auto pdu = modbus::protocol::create_read_holding_registers_pdu(m_config.start_address, m_config.quantity);
// 2. 构建MBAP头
m_write_buffer.clear();
m_write_buffer.resize(7 + pdu.size());
m_transaction_id++;
uint16_t length = 1 + pdu.size(); // UnitID + PDU
m_write_buffer[0] = static_cast<uint8_t>((m_transaction_id >> 8) & 0xFF);
m_write_buffer[1] = static_cast<uint8_t>(m_transaction_id & 0xFF);
m_write_buffer[2] = 0; // Protocol ID
m_write_buffer[3] = 0;
m_write_buffer[4] = static_cast<uint8_t>((length >> 8) & 0xFF);
m_write_buffer[5] = static_cast<uint8_t>(length & 0xFF);
m_write_buffer[6] = m_config.slave_id;
// 3. 附加PDU
std::copy(pdu.begin(), pdu.end(), m_write_buffer.begin() + 7);
// 4. 发送请求
do_write();
}
void ModbusMasterPoller::do_write() {
auto self = shared_from_this();
boost::asio::async_write(m_socket, boost::asio::buffer(m_write_buffer),
[this, self](const boost::system::error_code& ec, std::size_t /*length*/) {
if (ec) {
spdlog::error("[{}] Write failed: {}", m_config.device_id, ec.message());
on_error("write");
return;
}
do_read_header();
});
}
void ModbusMasterPoller::do_read_header() {
auto self = shared_from_this();
boost::asio::async_read(m_socket, boost::asio::buffer(m_read_buffer, 7),
[this, self](const boost::system::error_code& ec, std::size_t length) {
if (ec) {
spdlog::error("[{}] Read header failed: {}", m_config.device_id, ec.message());
on_error("read_header");
return;
}
uint16_t tid = (m_read_buffer[0] << 8) | m_read_buffer[1];
if (tid != m_transaction_id) {
spdlog::warn("[{}] Transaction ID mismatch! Sent {}, Rcvd {}", m_config.device_id, m_transaction_id, tid);
on_error("tid_mismatch");
return;
}
uint16_t pdu_len = (m_read_buffer[4] << 8) | m_read_buffer[5];
do_read_pdu(pdu_len - 1); // Length in header includes UnitID
});
}
void ModbusMasterPoller::do_read_pdu(std::size_t pdu_length) {
auto self = shared_from_this();
boost::asio::async_read(m_socket, boost::asio::buffer(m_read_buffer.data() + 7, pdu_length),
[this, self, pdu_length](const boost::system::error_code& ec, std::size_t /*length*/) {
if (ec) {
spdlog::error("[{}] Read PDU failed: {}", m_config.device_id, ec.message());
on_error("read_pdu");
return;
}
try {
// 解析并上报数据
std::vector<uint8_t> pdu_data(m_read_buffer.data() + 7, m_read_buffer.data() + 7 + pdu_length);
auto registers = modbus::protocol::parse_read_holding_registers_response(pdu_data);
// 将数据转换为JSON
nlohmann::json data_j;
for(size_t i = 0; i < registers.size(); ++i) {
data_j[std::to_string(m_config.start_address + i)] = registers[i];
}
UnifiedData report_data;
report_data.device_id = m_config.device_id;
report_data.timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
report_data.data_json = data_j.dump();
if(m_report_callback) {
m_report_callback(report_data);
}
} catch (const modbus::protocol::exception& e) {
spdlog::error("[{}] Failed to parse Modbus response: {}", m_config.device_id, e.what());
}
// 安排下一次轮询
m_timer.expires_after(std::chrono::milliseconds(m_config.poll_interval_ms));
m_timer.async_wait([this, self](const boost::system::error_code& /*ec*/) {
do_poll();
});
});
}