diff --git a/CMakeLists.txt b/CMakeLists.txt index 83ad096..62cccbd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,7 +34,7 @@ add_library(edge_proxy_lib STATIC # --- 协议层 --- src/protocol/protocol.cc - src/protocol/protocol_handler.cc + src/protocol/private_protocol_adapter.cc # 系统监视 src/systemMonitor/system_monitor.cc @@ -74,7 +74,6 @@ target_link_libraries(edge_proxy PRIVATE add_executable(test src/test.cc - # src/test-new.cc ) target_link_libraries(test PRIVATE @@ -82,39 +81,3 @@ target_link_libraries(test PRIVATE ) -# ================================================================= -# 独立的 MQTT 功能测试目标 -# ================================================================= -# 创建一个新的可执行文件,名为 mqtt_test_runner -# add_executable(mqtt_test_runner -# src/mqtt_test.cpp -# ) - -# target_link_libraries(mqtt_test_runner PRIVATE -# edge_proxy_lib -# ) - -# ================================================================= -# gtest -# ================================================================= -# enable_testing() -# include(FetchContent) -# FetchContent_Declare( -# googletest -# URL https://github.com/google/googletest/archive/refs/tags/v1.14.0.zip -# ) -# FetchContent_MakeAvailable(googletest) - -# add_executable(run_tests -# tests/mqtt_integration_test.cpp -# ) - -# target_link_libraries(run_tests PRIVATE -# GTest::gtest_main -# PahoMqttCpp::paho-mqttpp3 -# pthread -# ) - -# include(GoogleTest) -# gtest_discover_tests(run_tests) - diff --git a/archive b/archive deleted file mode 100644 index 015560b..0000000 --- a/archive +++ /dev/null @@ -1,76 +0,0 @@ -#ifndef TCP_SERVER_H -#define TCP_SERVER_H - -#include -#include -#include -#include -#include // For std::enable_shared_from_this -#include // For uint8_t, uint16_t, uint32_t -// 使用 Boost.Asio 的命名空间 -using boost::asio::ip::tcp; -// ---------------- CRC32 Declaration (使用 Boost.CRC) ---------------- -inline uint32_t calculate_crc32(const uint8_t* data, size_t length); -// ---------------- Protocol Definition ---------------- -namespace proto { -struct ProtocolConfig { - static constexpr std::size_t kMagicLen = 4; // 魔数 4 字节 - static constexpr std::size_t kMsgTypeLen = 2; // 消息类型 2 字节 - static constexpr std::size_t kPayloadLenFieldLen = 4; // 载荷长度 4 字节 - static constexpr std::size_t kCrcLen = 4; // CRC 校验位 4 字节 - // 完整的头部长度 (Magic + MsgType + PayloadLenField) - static constexpr std::size_t kHeaderLen = kMagicLen + kMsgTypeLen + kPayloadLenFieldLen; // 4 + 2 + 4 = 10 字节 - // 魔数常量 - static constexpr uint8_t kMagic[kMagicLen] = {0x62, 0x6e, 0x73, 0x00}; -}; -// 字节序转换函数声明 -inline uint16_t be16(const uint8_t* p); -inline uint32_t be32(const uint8_t* p); -inline void write_be16(uint8_t* p, uint16_t val); -inline void write_be32(uint8_t* p, uint32_t val); - -struct Message { - uint16_t msg_type = 0; // 消息类型 2 字节 - std::vector payload; - uint32_t received_crc = 0; // 新增字段,用于存储接收到的CRC,方便调试 -}; -inline bool match_magic(const uint8_t* p); -} -// ---------------- Session Class (处理单个客户端连接) ---------------- -class session : public std::enable_shared_from_this { -public: - explicit session(tcp::socket socket); - void start(); -private: - // 自定义缓冲区结构体 - struct ReceiveBuffer { - static constexpr std::size_t capacity_ = 8192; - uint8_t data_[capacity_]; - std::size_t read_idx = 0; // 已读到缓冲区的数据量 (读指针) - std::size_t write_idx = 0; // 已处理/消耗的字节数 (写指针,指向下一个待处理字节) - void compact(); - std::size_t data_size() const; - std::size_t free_space() const; - const uint8_t* current_data() const; - uint8_t* write_ptr(); - } receive_buffer_; - void do_read_some(); - void parse_messages(); - std::string payload_to_ascii_string(const std::vector& payload); - std::string bytes_to_hex_string(const uint8_t* data, std::size_t len); - void on_message(proto::Message msg); - // 修改原有的 send_response 为更通用的 send_message - void send_message(uint16_t msg_type, const std::vector& payload); - tcp::socket socket_; -}; -// ---------------- TCPServer Class ---------------- -// 这个类负责监听端口并接受新的客户端连接 -class TCPServer { -public: - TCPServer(boost::asio::io_context& io_context, uint16_t port); -private: - void do_accept(); - tcp::acceptor acceptor_; -}; - -#endif // SERVER_H diff --git a/src/protocol/protocol_handler.cc b/src/archive/protocol_handler.cc similarity index 93% rename from src/protocol/protocol_handler.cc rename to src/archive/protocol_handler.cc index 6db89d2..9083ba3 100644 --- a/src/protocol/protocol_handler.cc +++ b/src/archive/protocol_handler.cc @@ -36,7 +36,6 @@ void ProtocolHandler::process_raw_data(const uint8_t* data, std::size_t len) { } void ProtocolHandler::parse_messages() { - // (此函数内容与原 session::parse_messages 基本完全相同) for (;;) { std::size_t available = receive_buffer_.data_size(); @@ -70,14 +69,13 @@ void ProtocolHandler::parse_messages() { uint32_t payload_len = proto::be32(data + proto::ProtocolConfig::kMagicLen + proto::ProtocolConfig::kMsgTypeLen); if (payload_len > 10 * 1024 * 1024) { spdlog::error("Received too large payload length: {} bytes. Dropping connection concept.", payload_len); - // 这里不能直接关闭socket,但可以清空缓冲区来丢弃这个错误的数据 receive_buffer_.read_idx = 0; receive_buffer_.write_idx = 0; return; } std::size_t total_len = proto::ProtocolConfig::kHeaderLen + payload_len + proto::ProtocolConfig::kCrcLen; if (available < total_len) { - break; // 半包 + break; } uint32_t calculated_crc = calculate_crc32(data, proto::ProtocolConfig::kHeaderLen + payload_len); @@ -96,16 +94,13 @@ void ProtocolHandler::parse_messages() { msg.payload.assign(payload_ptr, payload_ptr + payload_len); } msg.received_crc = received_crc; - receive_buffer_.write_idx += total_len; - on_message(std::move(msg)); } } void ProtocolHandler::on_message(proto::Message msg) { - // (此函数内容与原 session::on_message 完全相同) switch (msg.msg_type) { case 0x0001: spdlog::info("MsgType=0x0001 received. Payload Len: {}.", msg.payload.size()); @@ -126,7 +121,6 @@ void ProtocolHandler::on_message(proto::Message msg) { } void ProtocolHandler::send_message(uint16_t msg_type, const std::vector& payload) { - // (此函数内容与原 session::send_message 类似,但最后是调用回调) std::vector frame; frame.reserve(proto::ProtocolConfig::kHeaderLen + payload.size() + proto::ProtocolConfig::kCrcLen); @@ -136,17 +130,14 @@ void ProtocolHandler::send_message(uint16_t msg_type, const std::vector proto::write_be16(header_buf, msg_type); proto::write_be32(header_buf + 2, static_cast(payload.size())); frame.insert(frame.end(), header_buf, header_buf + 6); - - // 组装载荷 + frame.insert(frame.end(), payload.begin(), payload.end()); - // 计算并组装CRC uint32_t crc = calculate_crc32(frame.data(), frame.size()); uint8_t crc_buf[4]; proto::write_be32(crc_buf, crc); frame.insert(frame.end(), std::begin(crc_buf), std::end(crc_buf)); - // 通过回调函数将数据帧交还给网络层发送 if (send_callback_) { send_callback_(frame); } diff --git a/src/protocol/protocol_handler.h b/src/archive/protocol_handler.h similarity index 100% rename from src/protocol/protocol_handler.h rename to src/archive/protocol_handler.h diff --git a/src/main.cpp b/src/main.cpp index 6ccab21..aec29bb 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -7,7 +7,7 @@ #include #include #include -#include // for std::bind +#include // 用于 ASIO 服务的全局 io_context boost::asio::io_context g_io_context; @@ -68,9 +68,10 @@ int main(int argc, char* argv[]) { try { constexpr uint16_t tcp_port = 8888; - TCPServer tcp_server(g_io_context, tcp_port); MqttClient mqtt_client("tcp://mqtt-broker:1883", "edge-proxy-main-client"); MqttRouter mqtt_router(mqtt_client); + std::vector listen_ports = { 8888, 502 }; + TCPServer tcp_server(g_io_context, listen_ports, mqtt_client); SystemMonitor::SystemMonitor monitor; mqtt_client.connect(); diff --git a/src/network/tcp_server.cc b/src/network/tcp_server.cc index 146730c..a40ff3f 100644 --- a/src/network/tcp_server.cc +++ b/src/network/tcp_server.cc @@ -1,14 +1,18 @@ #include "tcp_server.h" #include "spdlog/spdlog.h" +#include "protocol/protocol_factory.h" -// --- TcpConnection Implementation --- - -TcpConnection::TcpConnection(tcp::socket socket) : socket_(std::move(socket)) { - // 关键:创建协议处理器,并使用lambda将自身的send方法作为回调注入 +TcpConnection::TcpConnection(tcp::socket socket, MqttClient& mqtt_client, std::unique_ptr adapter) + : socket_(std::move(socket)), + mqtt_client_(mqtt_client), + protocol_adapter_(std::move(adapter)) +{ + // 定义发送回调 auto send_callback = [this](const std::vector& data_to_send) { this->send(data_to_send); }; - protocol_handler_ = std::make_unique(send_callback); + // 为适配器设置发送回调 + protocol_adapter_->set_send_callback(send_callback); } void TcpConnection::start() { @@ -17,7 +21,7 @@ void TcpConnection::start() { } catch (...) { spdlog::info("Client connected. (Could not get remote endpoint details)"); } - do_read_some(); // 开始异步读取循环 + do_read_some(); } void TcpConnection::do_read_some() { @@ -25,18 +29,20 @@ void TcpConnection::do_read_some() { socket_.async_read_some(boost::asio::buffer(read_buffer_), [this, self](boost::system::error_code ec, std::size_t n) { if (!ec) { - // 收到原始数据,直接交给协议处理器 - protocol_handler_->process_raw_data(read_buffer_.data(), n); - // 继续下一次读取 + // <<< MODIFIED: 调用适配器的 process_raw_data + protocol_adapter_->process_raw_data(read_buffer_.data(), n); do_read_some(); } else if (ec == boost::asio::error::eof) { - spdlog::warn("Client closed connection cleanly: {}", socket_.remote_endpoint().address().to_string()); + try { + spdlog::warn("Client closed connection cleanly: {}", socket_.remote_endpoint().address().to_string()); + } catch(...) { + spdlog::warn("Client closed connection cleanly."); + } } else if (ec == boost::asio::error::operation_aborted) { spdlog::debug("Connection operation aborted."); } else { spdlog::error("Connection read error: {}", ec.message()); } - // 发生错误或EOF时,session的shared_ptr将被释放,socket自动关闭 }); } @@ -52,25 +58,45 @@ void TcpConnection::send(const std::vector& data) { }); } -// --- TCPServer Implementation --- - -TCPServer::TCPServer(boost::asio::io_context& io_context, uint16_t port) - : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) { - spdlog::info("TCP Server service initialized on port {}", port); - do_accept(); +TCPServer::TCPServer(boost::asio::io_context& io_context, const std::vector& ports, MqttClient& mqtt_client) + : io_context_(io_context), + mqtt_client_(mqtt_client) +{ + for (uint16_t port : ports) { + spdlog::info("Initializing TCP listener on port {}", port); + // emplace 直接在 map 中构造 acceptor + acceptors_.emplace(port, tcp::acceptor(io_context_, tcp::endpoint(tcp::v4(), port))); + // 为每个端口启动一个独立的 accept 循环 + do_accept(port); + } } -void TCPServer::do_accept() { - acceptor_.async_accept( - [this](boost::system::error_code ec, tcp::socket socket) { +void TCPServer::do_accept(uint16_t port) { + acceptors_.at(port).async_accept( + [this, port](boost::system::error_code ec, tcp::socket socket) { if (!ec) { - // 创建新的 TcpConnection 来处理连接 - std::make_shared(std::move(socket))->start(); + // 1. 定义数据上报回调 + auto report_callback = [this](const UnifiedData& data) { + std::string topic = "devices/" + data.device_id + "/data"; + spdlog::info("Forwarding data to MQTT topic '{}'", topic); + this->mqtt_client_.publish(topic, data.data_json, 1, false); + }; + + // 2. 使用工厂创建适配器 + auto adapter = ProtocolFactory::create_adapter(port, report_callback); + + if (adapter) { + // 3. 创建 TcpConnection,并将适配器注入 + std::make_shared(std::move(socket), mqtt_client_, std::move(adapter))->start(); + } else { + spdlog::error("Failed to create adapter for port {}, closing connection.", port); + socket.close(); // 如果没有合适的适配器,直接关闭连接 + } } else { - spdlog::error("Accept error: {}", ec.message()); + spdlog::error("Accept error on port {}: {}", port, ec.message()); } - // 继续等待下一个连接 - do_accept(); + // 4. 继续等待该端口的下一个连接 + do_accept(port); } ); } \ No newline at end of file diff --git a/src/network/tcp_server.h b/src/network/tcp_server.h index 467ccc7..7f7c765 100644 --- a/src/network/tcp_server.h +++ b/src/network/tcp_server.h @@ -1,18 +1,21 @@ +// 文件名: tcp_server.h #ifndef TCP_SERVER_H #define TCP_SERVER_H #include -#include #include -#include -#include "../protocol/protocol_handler.h" // 包含协议处理器头文件 +#include +#include +#include "iprotocol_adapter.h" +#include "mqtt_client.h" using boost::asio::ip::tcp; -// 代表一个TCP连接,管理socket生命周期和原始数据收发 +class MqttClient; + class TcpConnection : public std::enable_shared_from_this { public: - explicit TcpConnection(tcp::socket socket); + explicit TcpConnection(tcp::socket socket, MqttClient& mqtt_client, std::unique_ptr adapter); void start(); private: @@ -20,20 +23,24 @@ private: void send(const std::vector& data); tcp::socket socket_; - std::array read_buffer_; // 简化的原始字节缓冲区 - - // 每个连接拥有一个协议处理器实例 - std::unique_ptr protocol_handler_; + std::array read_buffer_; + std::unique_ptr protocol_adapter_; + MqttClient& mqtt_client_; }; -// 负责监听端口并接受新连接 class TCPServer { public: - TCPServer(boost::asio::io_context& io_context, uint16_t port); + // <<< MODIFIED: 构造函数接收一个端口列表 + TCPServer(boost::asio::io_context& io_context, const std::vector& ports, MqttClient& mqtt_client); private: - void do_accept(); - tcp::acceptor acceptor_; + // <<< MODIFIED: 为每个端口启动一个accept循环 + void do_accept(uint16_t port); + + boost::asio::io_context& io_context_; + MqttClient& mqtt_client_; + // <<< MODIFIED: 使用 map 来管理多个 acceptor,key是端口号 + std::map acceptors_; }; #endif // TCP_SERVER_H \ No newline at end of file diff --git a/src/protocol/iprotocol_adapter.h b/src/protocol/iprotocol_adapter.h new file mode 100644 index 0000000..f03b65f --- /dev/null +++ b/src/protocol/iprotocol_adapter.h @@ -0,0 +1,56 @@ +// 文件名: iprotocol_adapter.h +#ifndef IPROTOCOL_ADAPTER_H +#define IPROTOCOL_ADAPTER_H + +#include +#include +#include +#include +#include + +/** + * @brief 统一数据格式 (Unified Data Format) + * * 所有协议适配器在解析完特定协议的数据后,都应将其转换为这个标准结构体。 + * 这样做可以使上层业务逻辑(如MQTT转发、数据库存储)与底层具体的协议实现解耦。 + */ +struct UnifiedData { + std::string device_id; // 产生数据的设备唯一ID + int64_t timestamp_ms; // 数据产生的时间戳 (毫秒级UTC) + std::string data_json; // 采用JSON字符串格式,具有良好的灵活性和可扩展性 +}; + +/** + * @brief 定义回调函数,用于将解析后的标准数据上报给核心业务层 + */ +using ReportDataCallback = std::function; + +/** + * @brief 协议适配器接口 (纯虚基类) + * * 所有具体的协议实现(如私有协议、Modbus、OPC-UA)都应继承自这个接口。 + * 它定义了协议适配器的标准行为。 + */ +class IProtocolAdapter { +public: + virtual ~IProtocolAdapter() = default; + + /** + * @brief 定义回调函数类型,用于将需要发送的原始数据帧交由网络层发送 + */ + using SendCallback = std::function&)>; + + /** + * @brief 设置发送回调函数 + * @param cb 网络层(如TCPSession)在创建Adapter实例后,会通过此方法将自身的发送函数注册进来。 + */ + virtual void set_send_callback(SendCallback cb) = 0; + + /** + * @brief 从网络层接收原始字节流的唯一入口点 + * @param data 指向接收到的数据块的指针 + * @param len 数据块的长度 + * * 网络层收到数据后,会调用此函数将数据喂给协议适配器进行处理。 + */ + virtual void process_raw_data(const uint8_t* data, std::size_t len) = 0; +}; + +#endif // IPROTOCOL_ADAPTER_H \ No newline at end of file diff --git a/src/protocol/private_protocol_adapter.cc b/src/protocol/private_protocol_adapter.cc new file mode 100644 index 0000000..a90d087 --- /dev/null +++ b/src/protocol/private_protocol_adapter.cc @@ -0,0 +1,280 @@ +// 文件名: private_protocol_adapter.cc +#include "private_protocol_adapter.h" +#include "spdlog/spdlog.h" +#include +#include +#include +#include +#include +#include // 用于获取时间戳 + +// CRC32计算函数 (与您原来的设计保持一致) +inline uint32_t calculate_crc32(const uint8_t* data, size_t length) { + boost::crc_32_type result; + result.process_bytes(data, length); + return result.checksum(); +} + +// 构造函数,初始化 report_callback_ +PrivateProtocolAdapter::PrivateProtocolAdapter(ReportDataCallback report_cb) + : report_callback_(std::move(report_cb)) {} + +// 实现接口:设置发送回调 +void PrivateProtocolAdapter::set_send_callback(SendCallback cb) { + send_callback_ = std::move(cb); +} + +// 实现接口:处理原始数据 (与您原来的逻辑完全相同) +void PrivateProtocolAdapter::process_raw_data(const uint8_t* data, std::size_t len) { + if (receive_buffer_.free_space() < len) { + spdlog::warn("Buffer has only {} bytes free, but received {} bytes. Compacting.", receive_buffer_.free_space(), len); + receive_buffer_.compact(); + if (receive_buffer_.free_space() < len) { + spdlog::error("Buffer full even after compacting. Cannot process new data. This indicates a protocol error or slow consumer."); + receive_buffer_.read_idx = 0; + receive_buffer_.write_idx = 0; + return; + } + } + std::memcpy(receive_buffer_.write_ptr(), data, len); + receive_buffer_.read_idx += len; + + parse_messages(); +} + +// 解析消息 (与您原来的逻辑完全相同) +void PrivateProtocolAdapter::parse_messages() { + for (;;) { + std::size_t available = receive_buffer_.data_size(); + + if (available < proto::ProtocolConfig::kHeaderLen + proto::ProtocolConfig::kCrcLen) { + break; + } + const uint8_t* data = receive_buffer_.current_data(); + std::size_t offset = 0; + + while (available - offset >= proto::ProtocolConfig::kHeaderLen + proto::ProtocolConfig::kCrcLen) { + if (proto::match_magic(data + offset)) { + break; + } + ++offset; + } + if (offset > 0) { + spdlog::warn("Skipped {} bytes until magic.", offset); + receive_buffer_.write_idx += offset; + available = receive_buffer_.data_size(); + data = receive_buffer_.current_data(); + + if (available < proto::ProtocolConfig::kHeaderLen + proto::ProtocolConfig::kCrcLen) { + break; + } + } + + if (!proto::match_magic(data)) { + break; + } + + uint32_t payload_len = proto::be32(data + proto::ProtocolConfig::kMagicLen + proto::ProtocolConfig::kMsgTypeLen); + if (payload_len > 10 * 1024 * 1024) { + spdlog::error("Received too large payload length: {} bytes. Dropping connection concept.", payload_len); + receive_buffer_.read_idx = 0; + receive_buffer_.write_idx = 0; + return; + } + std::size_t total_len = proto::ProtocolConfig::kHeaderLen + payload_len + proto::ProtocolConfig::kCrcLen; + if (available < total_len) { + break; + } + + uint32_t calculated_crc = calculate_crc32(data, proto::ProtocolConfig::kHeaderLen + payload_len); + uint32_t received_crc = proto::be32(data + proto::ProtocolConfig::kHeaderLen + payload_len); + + if (calculated_crc != received_crc) { + spdlog::error("CRC mismatch! Calculated: 0x{:08x}, Received: 0x{:08x}", calculated_crc, received_crc); + receive_buffer_.write_idx += total_len; + continue; + } + + proto::Message msg; + msg.msg_type = proto::be16(data + proto::ProtocolConfig::kMagicLen); + if (payload_len > 0) { + const uint8_t* payload_ptr = data + proto::ProtocolConfig::kHeaderLen; + msg.payload.assign(payload_ptr, payload_ptr + payload_len); + } + msg.received_crc = received_crc; + receive_buffer_.write_idx += total_len; + on_message(std::move(msg)); + } +} + + +// ================================================================================= +// 核心修改点:处理具体消息 +// 在这里,我们将特定协议的消息,转换为统一的数据格式 UnifiedData 并上报 +// ================================================================================= +void PrivateProtocolAdapter::on_message(proto::Message msg) { + // 使用 using 简化 nlohmann::json 的调用 + using json = nlohmann::json; + + switch (msg.msg_type) { + case 0x0001: // 假设 0x0001 是设备数据上报消息 + { + spdlog::info("MsgType=0x0001 received. Payload Len: {}.", msg.payload.size()); + + // 1. 创建一个 UnifiedData 实例 + UnifiedData report_data; + report_data.device_id = "private_device_01"; // ID应从payload或连接信息中获取 + report_data.timestamp_ms = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch() + ).count(); + + // 2. 创建一个 json 对象来构建 data_json + json data_j; + + // ========================= 【核心转换逻辑】 ========================= + // 在这里,我们根据您的协议定义来解析 payload + // 【示例】:假设 payload 是一个结构化的二进制数据 + // - 第 0-3 字节: 温度 (float, big-endian) + // - 第 4-7 字节: 湿度 (float, big-endian) + // - 第 8-9 字节: 电压 (uint16_t, big-endian, 单位 毫伏) + // - 后面跟着一个字符串设备序列号 + // =================================================================== + if (msg.payload.size() >= 10) { + try { + const uint8_t* p = msg.payload.data(); + + // 解析温度 (float) + uint32_t temp_raw = proto::be32(p); + float temperature; + std::memcpy(&temperature, &temp_raw, sizeof(float)); + + // 解析湿度 (float) + uint32_t hum_raw = proto::be32(p + 4); + float humidity; + std::memcpy(&humidity, &hum_raw, sizeof(float)); + + // 解析电压 (uint16_t) + uint16_t voltage_mv = proto::be16(p + 8); + + // 解析设备序列号 (string) + std::string serial_number; + if (msg.payload.size() > 10) { + serial_number.assign(reinterpret_cast(p + 10), msg.payload.size() - 10); + } + + // 构建 JSON 对象 + data_j["protocol"] = "private_structured"; + data_j["values"] = { + {"temperature", temperature}, + {"humidity", humidity}, + {"voltage_mv", voltage_mv} + }; + data_j["sn"] = serial_number; + + } catch (const json::exception& e) { + spdlog::error("nlohmann::json exception: {}", e.what()); + // 如果JSON构建失败,可以发送一个错误格式的JSON + data_j = { + {"error", "Failed to parse structured payload"}, + {"payload_size", msg.payload.size()} + }; + } + } else { + // 如果payload长度不符合预期,我们也记录一个错误 + spdlog::warn("Payload size {} is too small for structured data.", msg.payload.size()); + data_j = { + {"error", "Invalid payload size"}, + {"payload_size", msg.payload.size()} + }; + } + + report_data.data_json = data_j.dump(); + + spdlog::debug("Generated JSON: {}", report_data.data_json); + + // 4. 通过回调函数将统一格式的数据上报给核心业务层 + if (report_callback_) { + report_callback_(report_data); + } + + // 5. 发送响应给客户端 (原有逻辑) + std::string response_str = "JSON data processed successfully."; + std::vector response_payload(response_str.begin(), response_str.end()); + send_message(0x0002, response_payload); + break; + } + case 0x0100: // 心跳消息 + { + spdlog::info("Heartbeat (MsgType=0x0100) received."); + send_message(0x0101, {}); + break; + } + default: + { + spdlog::warn("Unknown MsgType=0x{:04x} received.", msg.msg_type); + break; + } + } +} + +// 发送消息 (与您原来的逻辑完全相同) +void PrivateProtocolAdapter::send_message(uint16_t msg_type, const std::vector& payload) { + std::vector frame; + frame.reserve(proto::ProtocolConfig::kHeaderLen + payload.size() + proto::ProtocolConfig::kCrcLen); + + frame.insert(frame.end(), std::begin(proto::ProtocolConfig::kMagic), std::end(proto::ProtocolConfig::kMagic)); + uint8_t header_buf[6]; + proto::write_be16(header_buf, msg_type); + proto::write_be32(header_buf + 2, static_cast(payload.size())); + frame.insert(frame.end(), header_buf, header_buf + 6); + + frame.insert(frame.end(), payload.begin(), payload.end()); + + uint32_t crc = calculate_crc32(frame.data(), frame.size()); + uint8_t crc_buf[4]; + proto::write_be32(crc_buf, crc); + frame.insert(frame.end(), std::begin(crc_buf), std::end(crc_buf)); + + if (send_callback_) { + send_callback_(frame); + } +} + +// --- ReceiveBuffer 及辅助函数的实现 (均与您原来的代码相同) --- + +void PrivateProtocolAdapter::ReceiveBuffer::compact() { + if (write_idx > 0) { + std::size_t remaining = data_size(); + if (remaining > 0) { + std::memmove(data_, data_ + write_idx, remaining); + } + read_idx = remaining; + write_idx = 0; + } +} +std::size_t PrivateProtocolAdapter::ReceiveBuffer::data_size() const { return read_idx - write_idx; } +std::size_t PrivateProtocolAdapter::ReceiveBuffer::free_space() const { return capacity_ - read_idx; } +const uint8_t* PrivateProtocolAdapter::ReceiveBuffer::current_data() const { return data_ + write_idx; } +uint8_t* PrivateProtocolAdapter::ReceiveBuffer::write_ptr() { return data_ + read_idx; } + + +std::string PrivateProtocolAdapter::payload_to_ascii_string(const std::vector& payload) { + std::string ascii_str; + ascii_str.reserve(payload.size()); + for (auto byte : payload) { + ascii_str.push_back(std::isprint(byte) ? static_cast(byte) : '.'); + } + return ascii_str; +} + +std::string PrivateProtocolAdapter::bytes_to_hex_string(const uint8_t* data, std::size_t len) { + std::string hex_str; + hex_str.reserve(len * 3); + static const char* digits = "0123456789abcdef"; + for (std::size_t i = 0; i < len; ++i) { + hex_str.push_back(digits[(data[i] >> 4) & 0xF]); + hex_str.push_back(digits[data[i] & 0xF]); + if (i < len - 1) hex_str.push_back(' '); + } + return hex_str; +} \ No newline at end of file diff --git a/src/protocol/private_protocol_adapter.h b/src/protocol/private_protocol_adapter.h new file mode 100644 index 0000000..2baa3d5 --- /dev/null +++ b/src/protocol/private_protocol_adapter.h @@ -0,0 +1,56 @@ +// 文件名: private_protocol_adapter.h +#ifndef PRIVATE_PROTOCOL_ADAPTER_H +#define PRIVATE_PROTOCOL_ADAPTER_H + +#include "iprotocol_adapter.h" // 引入新的接口 +#include "protocol.h" // 依赖您原来的协议定义 (protocol.h) + +#include +#include +#include + +/** + * @brief 私有协议适配器 + * * 这个类实现了 IProtocolAdapter 接口,专门用于处理您之前定义的私有二进制协议。 + * 它的内部逻辑大部分来自于您原来的 ProtocolHandler 类。 + */ +class PrivateProtocolAdapter : public IProtocolAdapter { +public: + /** + * @brief 构造函数 + * @param report_cb 用于将解析出的 UnifiedData 上报给核心业务层的回调函数 + */ + explicit PrivateProtocolAdapter(ReportDataCallback report_cb); + + // --- 实现 IProtocolAdapter 接口 --- + void set_send_callback(SendCallback cb) override; + void process_raw_data(const uint8_t* data, std::size_t len) override; + +private: + // 内部缓冲区,用于处理粘包/半包问题 (与您原来的设计保持一致) + struct ReceiveBuffer { + static constexpr std::size_t capacity_ = 8192; + uint8_t data_[capacity_]; + std::size_t read_idx = 0; + std::size_t write_idx = 0; + void compact(); + std::size_t data_size() const; + std::size_t free_space() const; + const uint8_t* current_data() const; + uint8_t* write_ptr(); + } receive_buffer_; + + // 协议解析与业务处理函数 + void parse_messages(); + void on_message(proto::Message msg); + void send_message(uint16_t msg_type, const std::vector& payload); + + // 辅助函数 + std::string payload_to_ascii_string(const std::vector& payload); + std::string bytes_to_hex_string(const uint8_t* data, std::size_t len); + + ReportDataCallback report_callback_; // 回调到核心业务层 + SendCallback send_callback_; // 回调到网络层 +}; + +#endif // PRIVATE_PROTOCOL_ADAPTER_H \ No newline at end of file diff --git a/src/protocol/protocol_factory.cc b/src/protocol/protocol_factory.cc new file mode 100644 index 0000000..098e8cb --- /dev/null +++ b/src/protocol/protocol_factory.cc @@ -0,0 +1,23 @@ +#include "protocol_factory.h" +#include "private_protocol_adapter.h" +// #include "modbus_tcp_adapter.h" // <-- 未来在这里包含新的协议适配器 + +#include "spdlog/spdlog.h" + +std::unique_ptr ProtocolFactory::create_adapter(uint16_t port, ReportDataCallback report_cb) { + spdlog::info("Creating protocol adapter for port {}", port); + + // 根据端口号决定创建哪个适配器 + if (port == 8888) { // 假设 8001 是您的私有协议端口 + return std::make_unique(std::move(report_cb)); + } + + else if (port == 502) { + //TODO:这里没有写modbus协议的Adapter,快去写 + return std::make_unique(std::move(report_cb)); + } + + // 如果没有匹配的端口,返回空指针 + spdlog::error("No protocol adapter configured for port {}", port); + return nullptr; +} \ No newline at end of file diff --git a/src/protocol/protocol_factory.h b/src/protocol/protocol_factory.h new file mode 100644 index 0000000..7f42d7c --- /dev/null +++ b/src/protocol/protocol_factory.h @@ -0,0 +1,19 @@ +#ifndef PROTOCOL_FACTORY_H +#define PROTOCOL_FACTORY_H + +#include "iprotocol_adapter.h" +#include +#include + +class ProtocolFactory { +public: + /** + * @brief 根据端口号创建对应的协议适配器实例 + * * @param port 客户端连接上来的服务器端口号 + * @param report_cb 用于上报数据的回调函数 + * @return std::unique_ptr 创建好的适配器实例,如果端口不支持则返回 nullptr + */ + static std::unique_ptr create_adapter(uint16_t port, ReportDataCallback report_cb); +}; + +#endif // PROTOCOL_FACTORY_H \ No newline at end of file