From 6d6899549b5f2725d6a9cfa51db6c5c41e18557c Mon Sep 17 00:00:00 2001 From: GuanYuankai Date: Tue, 14 Oct 2025 15:29:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=BA=86modbus=20tcp?= =?UTF-8?q?=E7=9A=84=E8=BD=AE=E8=AF=A2=E4=B8=BB=E7=AB=99=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 6 +- src/main.cpp | 24 ++- src/modbus/modbus_master_poller.cc | 161 ++++++++++++++++++ src/modbus/modbus_master_poller.h | 50 ++++++ src/modbus/modbus_rtu_client.h | 12 -- ...rser.cc => modbus_temperature_humidity.cc} | 2 +- ...parser.h => modbus_temperature_humidity.h} | 4 - src/network/tcp_server.h | 4 +- src/protocol/modbus/modbus_protocol.cc | 72 ++++++++ src/protocol/modbus/modbus_protocol.h | 37 ++++ src/protocol/private_protocol_adapter.cc | 1 - src/protocol/protocol_factory.cc | 10 +- src/test.cc | 2 +- 13 files changed, 357 insertions(+), 28 deletions(-) create mode 100644 src/modbus/modbus_master_poller.cc create mode 100644 src/modbus/modbus_master_poller.h rename src/modbus/{modbus_data_parser.cc => modbus_temperature_humidity.cc} (98%) rename src/modbus/{modbus_data_parser.h => modbus_temperature_humidity.h} (78%) create mode 100644 src/protocol/modbus/modbus_protocol.cc create mode 100644 src/protocol/modbus/modbus_protocol.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 62cccbd..d760466 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -35,13 +35,17 @@ add_library(edge_proxy_lib STATIC # --- 协议层 --- src/protocol/protocol.cc src/protocol/private_protocol_adapter.cc + src/protocol/protocol_factory.cc # 系统监视 src/systemMonitor/system_monitor.cc #modbus - src/modbus/modbus_data_parser.cc + src/modbus/modbus_temperature_humidity.cc src/modbus/modbus_rtu_client.cc + # --- Modbus Master --- + src/protocol/modbus/modbus_protocol.cc + src/modbus/modbus_master_poller.cc ) target_include_directories(edge_proxy_lib PUBLIC diff --git a/src/main.cpp b/src/main.cpp index aec29bb..6149914 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3,6 +3,7 @@ #include "mqtt/mqtt_router.h" #include "systemMonitor/system_monitor.h" #include "spdlog/spdlog.h" +#include "modbus/modbus_master_poller.h" #include #include #include @@ -70,7 +71,7 @@ int main(int argc, char* argv[]) { constexpr uint16_t tcp_port = 8888; MqttClient mqtt_client("tcp://mqtt-broker:1883", "edge-proxy-main-client"); MqttRouter mqtt_router(mqtt_client); - std::vector listen_ports = { 8888, 502 }; + std::vector listen_ports = { 8888 }; TCPServer tcp_server(g_io_context, listen_ports, mqtt_client); SystemMonitor::SystemMonitor monitor; @@ -81,6 +82,27 @@ int main(int argc, char* argv[]) { boost::asio::steady_timer timer(g_io_context, std::chrono::seconds(15)); timer.async_wait(std::bind(poll_system_metrics, std::ref(timer), std::ref(monitor), std::ref(mqtt_client))); + auto report_to_mqtt = [&](const UnifiedData& data) { + std::string topic = "devices/" + data.device_id + "/data"; + mqtt_client.publish(topic, data.data_json, 1, false); + }; + + // 配置并启动Modbus轮询器 + ModbusDeviceConfig temp_sensor_config = { + .device_id = "temp_sensor_workshop1", + .ip_address = "192.168.1.120", // 您的Modbus设备的IP地址 + .port = 502, // Modbus TCP标准端口 + .slave_id = 1, // 设备的从站ID + .start_address = 0, // 要读取的第一个寄存器的地址 + .quantity = 8, // 从起始地址开始,总共读取多少个寄存器 + .poll_interval_ms = 2000 // 每2000毫秒轮询一次 + }; + // ========================================================================= + + // 创建轮询器实例 (使用std::make_shared确保生命周期) + auto poller = std::make_shared(g_io_context, temp_sensor_config, report_to_mqtt); + poller->start(); + spdlog::info("All services are running. Press Ctrl+C to exit."); g_io_context.run(); diff --git a/src/modbus/modbus_master_poller.cc b/src/modbus/modbus_master_poller.cc new file mode 100644 index 0000000..ec3778f --- /dev/null +++ b/src/modbus/modbus_master_poller.cc @@ -0,0 +1,161 @@ +// 文件名: src/modbus/modbus_master_poller.cc +#include "modbus_master_poller.h" +#include "protocol/modbus/modbus_protocol.h" +#include "spdlog/spdlog.h" +#include +#include + +using boost::asio::ip::tcp; + +ModbusMasterPoller::ModbusMasterPoller(boost::asio::io_context& io_context, + ModbusDeviceConfig config, + ReportDataCallback report_cb) + : m_io_context(io_context), + m_socket(io_context), + m_timer(io_context), + m_config(std::move(config)), + m_report_callback(std::move(report_cb)) {} + +void ModbusMasterPoller::start() { + do_connect(); +} + +void ModbusMasterPoller::do_connect() { + auto self = shared_from_this(); + tcp::resolver resolver(m_io_context); + + spdlog::info("[{}] Connecting to {}:{}...", m_config.device_id, m_config.ip_address, m_config.port); + + resolver.async_resolve(m_config.ip_address, std::to_string(m_config.port), + [this, self](const boost::system::error_code& ec, tcp::resolver::results_type endpoints) { + if (ec) { + spdlog::error("[{}] Resolve failed: {}", m_config.device_id, ec.message()); + on_error("resolve"); + return; + } + boost::asio::async_connect(m_socket, endpoints, + [this, self](const boost::system::error_code& ec, const tcp::endpoint& /*endpoint*/) { + if (ec) { + spdlog::error("[{}] Connect failed: {}", m_config.device_id, ec.message()); + on_error("connect"); + return; + } + spdlog::info("[{}] Connection successful.", m_config.device_id); + do_poll(); // 立即开始第一次轮询 + }); + }); +} + +void ModbusMasterPoller::on_error(const std::string& stage) { + if (m_socket.is_open()) { + m_socket.close(); + } + // 5秒后尝试重连 + spdlog::info("[{}] Reconnecting in 5 seconds after error at stage: {}", m_config.device_id, stage); + m_timer.expires_after(std::chrono::seconds(5)); + m_timer.async_wait([this, self=shared_from_this()](const boost::system::error_code& /*ec*/) { + do_connect(); + }); +} + + +void ModbusMasterPoller::do_poll() { + // 1. 创建PDU + auto pdu = modbus::protocol::create_read_holding_registers_pdu(m_config.start_address, m_config.quantity); + + // 2. 构建MBAP头 + m_write_buffer.clear(); + m_write_buffer.resize(7 + pdu.size()); + + m_transaction_id++; + uint16_t length = 1 + pdu.size(); // UnitID + PDU + + m_write_buffer[0] = static_cast((m_transaction_id >> 8) & 0xFF); + m_write_buffer[1] = static_cast(m_transaction_id & 0xFF); + m_write_buffer[2] = 0; // Protocol ID + m_write_buffer[3] = 0; + m_write_buffer[4] = static_cast((length >> 8) & 0xFF); + m_write_buffer[5] = static_cast(length & 0xFF); + m_write_buffer[6] = m_config.slave_id; + + // 3. 附加PDU + std::copy(pdu.begin(), pdu.end(), m_write_buffer.begin() + 7); + + // 4. 发送请求 + do_write(); +} + +void ModbusMasterPoller::do_write() { + auto self = shared_from_this(); + boost::asio::async_write(m_socket, boost::asio::buffer(m_write_buffer), + [this, self](const boost::system::error_code& ec, std::size_t /*length*/) { + if (ec) { + spdlog::error("[{}] Write failed: {}", m_config.device_id, ec.message()); + on_error("write"); + return; + } + do_read_header(); + }); +} + +void ModbusMasterPoller::do_read_header() { + auto self = shared_from_this(); + boost::asio::async_read(m_socket, boost::asio::buffer(m_read_buffer, 7), + [this, self](const boost::system::error_code& ec, std::size_t length) { + if (ec) { + spdlog::error("[{}] Read header failed: {}", m_config.device_id, ec.message()); + on_error("read_header"); + return; + } + uint16_t tid = (m_read_buffer[0] << 8) | m_read_buffer[1]; + if (tid != m_transaction_id) { + spdlog::warn("[{}] Transaction ID mismatch! Sent {}, Rcvd {}", m_config.device_id, m_transaction_id, tid); + on_error("tid_mismatch"); + return; + } + uint16_t pdu_len = (m_read_buffer[4] << 8) | m_read_buffer[5]; + do_read_pdu(pdu_len - 1); // Length in header includes UnitID + }); +} + +void ModbusMasterPoller::do_read_pdu(std::size_t pdu_length) { + auto self = shared_from_this(); + boost::asio::async_read(m_socket, boost::asio::buffer(m_read_buffer.data() + 7, pdu_length), + [this, self, pdu_length](const boost::system::error_code& ec, std::size_t /*length*/) { + if (ec) { + spdlog::error("[{}] Read PDU failed: {}", m_config.device_id, ec.message()); + on_error("read_pdu"); + return; + } + + try { + // 解析并上报数据 + std::vector pdu_data(m_read_buffer.data() + 7, m_read_buffer.data() + 7 + pdu_length); + auto registers = modbus::protocol::parse_read_holding_registers_response(pdu_data); + + // 将数据转换为JSON + nlohmann::json data_j; + for(size_t i = 0; i < registers.size(); ++i) { + data_j[std::to_string(m_config.start_address + i)] = registers[i]; + } + + UnifiedData report_data; + report_data.device_id = m_config.device_id; + report_data.timestamp_ms = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + report_data.data_json = data_j.dump(); + + if(m_report_callback) { + m_report_callback(report_data); + } + + } catch (const modbus::protocol::exception& e) { + spdlog::error("[{}] Failed to parse Modbus response: {}", m_config.device_id, e.what()); + } + + // 安排下一次轮询 + m_timer.expires_after(std::chrono::milliseconds(m_config.poll_interval_ms)); + m_timer.async_wait([this, self](const boost::system::error_code& /*ec*/) { + do_poll(); + }); + }); +} \ No newline at end of file diff --git a/src/modbus/modbus_master_poller.h b/src/modbus/modbus_master_poller.h new file mode 100644 index 0000000..ed9f5af --- /dev/null +++ b/src/modbus/modbus_master_poller.h @@ -0,0 +1,50 @@ +// 文件名: src/modbus/modbus_master_poller.h +#ifndef MODBUS_MASTER_POLLER_H +#define MODBUS_MASTER_POLLER_H + +#include "protocol/iprotocol_adapter.h" // For UnifiedData and ReportDataCallback +#include +#include +#include + +// 配置一个Modbus设备的连接和轮询参数 +struct ModbusDeviceConfig { + std::string device_id; // 自定义设备ID,用于MQTT主题 + std::string ip_address; + uint16_t port; + uint8_t slave_id; + uint16_t start_address; + uint16_t quantity; + int poll_interval_ms; +}; + +class ModbusMasterPoller : public std::enable_shared_from_this { +public: + ModbusMasterPoller(boost::asio::io_context& io_context, + ModbusDeviceConfig config, + ReportDataCallback report_cb); + + // 开始轮询器的工作 (连接和定时) + void start(); + +private: + void do_connect(); + void do_poll(); + void do_write(); + void do_read_header(); + void do_read_pdu(std::size_t pdu_length); + void on_error(const std::string& stage); + + boost::asio::io_context& m_io_context; + boost::asio::ip::tcp::socket m_socket; + boost::asio::steady_timer m_timer; + + ModbusDeviceConfig m_config; + ReportDataCallback m_report_callback; + + std::vector m_write_buffer; + std::array m_read_buffer; // Max Modbus TCP frame size + uint16_t m_transaction_id = 0; +}; + +#endif // MODBUS_MASTER_POLLER_H \ No newline at end of file diff --git a/src/modbus/modbus_rtu_client.h b/src/modbus/modbus_rtu_client.h index bdfdfd1..8960a0b 100644 --- a/src/modbus/modbus_rtu_client.h +++ b/src/modbus/modbus_rtu_client.h @@ -6,21 +6,9 @@ #include // For uint8_t, uint16_t #include // For std::runtime_error, std::invalid_argument -// --- Forward Declarations --- -// Forward declare ModbusRTUClient to allow ModbusRequest and ModbusResponse to be declared -// without circular includes if they needed to reference ModbusRTUClient directly. -// In this case, ModbusRequest and ModbusResponse don't directly reference ModbusRTUClient, -// but it's good practice if there were more complex interdependencies. - -// --- Modbus CRC16 Calculation --- -// Declaration for CRC16 calculation function -//计算Modbus RTU帧的16位CRC校验码。 uint16_t calculateModbusCRC(const std::vector& data); -// --- Modbus Frame Structures --- -// Structure for Modbus Request Parameters (specifically for building a request) -//封装一个Modbus请求的所有参数 struct ModbusRequest { uint8_t slave_id; uint8_t function_code; diff --git a/src/modbus/modbus_data_parser.cc b/src/modbus/modbus_temperature_humidity.cc similarity index 98% rename from src/modbus/modbus_data_parser.cc rename to src/modbus/modbus_temperature_humidity.cc index 0c33eaa..5e3cf2d 100644 --- a/src/modbus/modbus_data_parser.cc +++ b/src/modbus/modbus_temperature_humidity.cc @@ -1,4 +1,4 @@ -#include "modbus_data_parser.h" +#include "modbus_temperature_humidity.h" // --- Generic Data Conversion Functions --- int16_t bytesToSignedInt16(uint16_t val) { diff --git a/src/modbus/modbus_data_parser.h b/src/modbus/modbus_temperature_humidity.h similarity index 78% rename from src/modbus/modbus_data_parser.h rename to src/modbus/modbus_temperature_humidity.h index 6d06c83..16b6690 100644 --- a/src/modbus/modbus_data_parser.h +++ b/src/modbus/modbus_temperature_humidity.h @@ -11,9 +11,6 @@ // --- Generic Data Conversion Functions --- int16_t bytesToSignedInt16(uint16_t val); // Converts a 16-bit unsigned value to signed -// --- Specific Sensor Data Parsing Functions (e.g., for JWST-20 sensor) --- -// This map could be static const in the .cpp if it's truly internal, -// or defined here if parts of it are needed by other header files. const std::map BAUDRATE_MAP = { {0, 1200}, {1, 2400}, {2, 4800}, {3, 9600}, {4, 19200}, {5, 38400}, {6, 57600} @@ -22,7 +19,6 @@ const std::map BAUDRATE_MAP = { // Function to print and interpret a vector of Modbus registers void printRegisters(const std::string& description, const std::vector& registers); -// You can add more specific parsing functions as needed, e.g.: float parseTemperature(uint16_t raw_value); float parseHumidity(uint16_t raw_value); std::string parseAlarmStatus(uint16_t raw_value); diff --git a/src/network/tcp_server.h b/src/network/tcp_server.h index 7f7c765..2a93e84 100644 --- a/src/network/tcp_server.h +++ b/src/network/tcp_server.h @@ -6,8 +6,8 @@ #include #include #include -#include "iprotocol_adapter.h" -#include "mqtt_client.h" +#include "protocol/iprotocol_adapter.h" +#include "mqtt/mqtt_client.h" using boost::asio::ip::tcp; diff --git a/src/protocol/modbus/modbus_protocol.cc b/src/protocol/modbus/modbus_protocol.cc new file mode 100644 index 0000000..ac2bc27 --- /dev/null +++ b/src/protocol/modbus/modbus_protocol.cc @@ -0,0 +1,72 @@ +// 文件名: src/protocol/modbus/modbus_protocol.cc +#include "modbus_protocol.h" + +namespace { // 匿名命名空间,辅助函数仅在本文件可见 + +// 将16位整数写入大端字节序的vector +void write_be16(std::vector& buffer, uint16_t value) { + buffer.push_back(static_cast((value >> 8) & 0xFF)); + buffer.push_back(static_cast(value & 0xFF)); +} + +// 从大端字节流读取16位整数 +uint16_t read_be16(const uint8_t* p) { + return (static_cast(p[0]) << 8) | p[1]; +} + +} // namespace + +namespace modbus::protocol { + +std::vector create_read_holding_registers_pdu(uint16_t start_address, uint16_t quantity) { + std::vector pdu; + pdu.reserve(5); + pdu.push_back(0x03); // Function Code + write_be16(pdu, start_address); + write_be16(pdu, quantity); + return pdu; +} + +std::vector parse_read_holding_registers_response(const std::vector& response_pdu) { + if (response_pdu.empty()) { + throw exception("Response PDU is empty."); + } + + const uint8_t function_code = response_pdu[0]; + + // 检查异常响应 + if (function_code & 0x80) { + if (response_pdu.size() < 2) { + throw exception("Invalid exception response size."); + } + throw exception("Modbus exception received. FC=" + std::to_string(function_code & 0x7F) + " Code=" + std::to_string(response_pdu[1])); + } + + if (function_code != 0x03) { + throw exception("Unexpected function code in response: " + std::to_string(function_code)); + } + + if (response_pdu.size() < 2) { + throw exception("Response PDU too short for byte count."); + } + + const uint8_t byte_count = response_pdu[1]; + if (response_pdu.size() != 2 + byte_count) { + throw exception("Byte count does not match PDU size."); + } + + if (byte_count % 2 != 0) { + throw exception("Byte count is not an even number."); + } + + std::vector registers; + registers.reserve(byte_count / 2); + + for (size_t i = 0; i < byte_count; i += 2) { + registers.push_back(read_be16(response_pdu.data() + 2 + i)); + } + + return registers; +} + +} // namespace modbus::protocol \ No newline at end of file diff --git a/src/protocol/modbus/modbus_protocol.h b/src/protocol/modbus/modbus_protocol.h new file mode 100644 index 0000000..3e5169f --- /dev/null +++ b/src/protocol/modbus/modbus_protocol.h @@ -0,0 +1,37 @@ +// 文件名: src/protocol/modbus/modbus_protocol.h +#ifndef MODBUS_PROTOCOL_H +#define MODBUS_PROTOCOL_H + +#include +#include +#include +#include + +namespace modbus::protocol { + +// 自定义异常类型,方便上层捕获 +class exception : public std::runtime_error { +public: + explicit exception(const std::string& message) : std::runtime_error(message) {} +}; + +/** + * @brief 创建一个"读保持寄存器" (0x03) 的PDU + * @param start_address 起始地址 + * @param quantity 要读取的寄存器数量 + * @return 序列化后的PDU字节流 (不含MBAP头) + */ +std::vector create_read_holding_registers_pdu(uint16_t start_address, uint16_t quantity); + + +/** + * @brief 解析一个"读保持寄存器"的响应PDU + * @param response_pdu 从设备返回的PDU字节流 + * @return 包含所有寄存器值的向量 + * @throws modbus::protocol::exception 如果响应无效 (如异常码、长度错误) + */ +std::vector parse_read_holding_registers_response(const std::vector& response_pdu); + +} // namespace modbus::protocol + +#endif // MODBUS_PROTOCOL_H \ No newline at end of file diff --git a/src/protocol/private_protocol_adapter.cc b/src/protocol/private_protocol_adapter.cc index a90d087..8ac397e 100644 --- a/src/protocol/private_protocol_adapter.cc +++ b/src/protocol/private_protocol_adapter.cc @@ -217,7 +217,6 @@ void PrivateProtocolAdapter::on_message(proto::Message msg) { } } -// 发送消息 (与您原来的逻辑完全相同) void PrivateProtocolAdapter::send_message(uint16_t msg_type, const std::vector& payload) { std::vector frame; frame.reserve(proto::ProtocolConfig::kHeaderLen + payload.size() + proto::ProtocolConfig::kCrcLen); diff --git a/src/protocol/protocol_factory.cc b/src/protocol/protocol_factory.cc index 098e8cb..001e98f 100644 --- a/src/protocol/protocol_factory.cc +++ b/src/protocol/protocol_factory.cc @@ -8,14 +8,14 @@ std::unique_ptr ProtocolFactory::create_adapter(uint16_t port, spdlog::info("Creating protocol adapter for port {}", port); // 根据端口号决定创建哪个适配器 - if (port == 8888) { // 假设 8001 是您的私有协议端口 + if (port == 8888) { return std::make_unique(std::move(report_cb)); } - else if (port == 502) { - //TODO:这里没有写modbus协议的Adapter,快去写 - return std::make_unique(std::move(report_cb)); - } + // !!modbus大部分情况下是master式的,不适用这里slave式的协议 + // else if (port == 502) { + // return std::make_unique(std::move(report_cb)); + // } // 如果没有匹配的端口,返回空指针 spdlog::error("No protocol adapter configured for port {}", port); diff --git a/src/test.cc b/src/test.cc index e44668e..85d4c6b 100644 --- a/src/test.cc +++ b/src/test.cc @@ -8,7 +8,7 @@ // Include the new header files #include "modbus/modbus_rtu_client.h" -#include "modbus/modbus_data_parser.h" +#include "modbus/modbus_temperature_humidity.h" int main(int argc, char* argv[]) { std::cout << "--- Modbus RTU Sensor Client ---" << std::endl;