使用工厂模式来重构TCP协议
This commit is contained in:
parent
57f2c88298
commit
4a633e4f5f
|
|
@ -34,7 +34,7 @@ add_library(edge_proxy_lib STATIC
|
|||
|
||||
# --- 协议层 ---
|
||||
src/protocol/protocol.cc
|
||||
src/protocol/protocol_handler.cc
|
||||
src/protocol/private_protocol_adapter.cc
|
||||
|
||||
# 系统监视
|
||||
src/systemMonitor/system_monitor.cc
|
||||
|
|
@ -74,7 +74,6 @@ target_link_libraries(edge_proxy PRIVATE
|
|||
add_executable(test
|
||||
src/test.cc
|
||||
|
||||
# src/test-new.cc
|
||||
)
|
||||
|
||||
target_link_libraries(test PRIVATE
|
||||
|
|
@ -82,39 +81,3 @@ target_link_libraries(test PRIVATE
|
|||
)
|
||||
|
||||
|
||||
# =================================================================
|
||||
# 独立的 MQTT 功能测试目标
|
||||
# =================================================================
|
||||
# 创建一个新的可执行文件,名为 mqtt_test_runner
|
||||
# add_executable(mqtt_test_runner
|
||||
# src/mqtt_test.cpp
|
||||
# )
|
||||
|
||||
# target_link_libraries(mqtt_test_runner PRIVATE
|
||||
# edge_proxy_lib
|
||||
# )
|
||||
|
||||
# =================================================================
|
||||
# gtest
|
||||
# =================================================================
|
||||
# enable_testing()
|
||||
# include(FetchContent)
|
||||
# FetchContent_Declare(
|
||||
# googletest
|
||||
# URL https://github.com/google/googletest/archive/refs/tags/v1.14.0.zip
|
||||
# )
|
||||
# FetchContent_MakeAvailable(googletest)
|
||||
|
||||
# add_executable(run_tests
|
||||
# tests/mqtt_integration_test.cpp
|
||||
# )
|
||||
|
||||
# target_link_libraries(run_tests PRIVATE
|
||||
# GTest::gtest_main
|
||||
# PahoMqttCpp::paho-mqttpp3
|
||||
# pthread
|
||||
# )
|
||||
|
||||
# include(GoogleTest)
|
||||
# gtest_discover_tests(run_tests)
|
||||
|
||||
|
|
|
|||
76
archive
76
archive
|
|
@ -1,76 +0,0 @@
|
|||
#ifndef TCP_SERVER_H
|
||||
#define TCP_SERVER_H
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/crc.hpp>
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <memory> // For std::enable_shared_from_this
|
||||
#include <cstdint> // For uint8_t, uint16_t, uint32_t
|
||||
// 使用 Boost.Asio 的命名空间
|
||||
using boost::asio::ip::tcp;
|
||||
// ---------------- CRC32 Declaration (使用 Boost.CRC) ----------------
|
||||
inline uint32_t calculate_crc32(const uint8_t* data, size_t length);
|
||||
// ---------------- Protocol Definition ----------------
|
||||
namespace proto {
|
||||
struct ProtocolConfig {
|
||||
static constexpr std::size_t kMagicLen = 4; // 魔数 4 字节
|
||||
static constexpr std::size_t kMsgTypeLen = 2; // 消息类型 2 字节
|
||||
static constexpr std::size_t kPayloadLenFieldLen = 4; // 载荷长度 4 字节
|
||||
static constexpr std::size_t kCrcLen = 4; // CRC 校验位 4 字节
|
||||
// 完整的头部长度 (Magic + MsgType + PayloadLenField)
|
||||
static constexpr std::size_t kHeaderLen = kMagicLen + kMsgTypeLen + kPayloadLenFieldLen; // 4 + 2 + 4 = 10 字节
|
||||
// 魔数常量
|
||||
static constexpr uint8_t kMagic[kMagicLen] = {0x62, 0x6e, 0x73, 0x00};
|
||||
};
|
||||
// 字节序转换函数声明
|
||||
inline uint16_t be16(const uint8_t* p);
|
||||
inline uint32_t be32(const uint8_t* p);
|
||||
inline void write_be16(uint8_t* p, uint16_t val);
|
||||
inline void write_be32(uint8_t* p, uint32_t val);
|
||||
|
||||
struct Message {
|
||||
uint16_t msg_type = 0; // 消息类型 2 字节
|
||||
std::vector<uint8_t> payload;
|
||||
uint32_t received_crc = 0; // 新增字段,用于存储接收到的CRC,方便调试
|
||||
};
|
||||
inline bool match_magic(const uint8_t* p);
|
||||
}
|
||||
// ---------------- Session Class (处理单个客户端连接) ----------------
|
||||
class session : public std::enable_shared_from_this<session> {
|
||||
public:
|
||||
explicit session(tcp::socket socket);
|
||||
void start();
|
||||
private:
|
||||
// 自定义缓冲区结构体
|
||||
struct ReceiveBuffer {
|
||||
static constexpr std::size_t capacity_ = 8192;
|
||||
uint8_t data_[capacity_];
|
||||
std::size_t read_idx = 0; // 已读到缓冲区的数据量 (读指针)
|
||||
std::size_t write_idx = 0; // 已处理/消耗的字节数 (写指针,指向下一个待处理字节)
|
||||
void compact();
|
||||
std::size_t data_size() const;
|
||||
std::size_t free_space() const;
|
||||
const uint8_t* current_data() const;
|
||||
uint8_t* write_ptr();
|
||||
} receive_buffer_;
|
||||
void do_read_some();
|
||||
void parse_messages();
|
||||
std::string payload_to_ascii_string(const std::vector<uint8_t>& payload);
|
||||
std::string bytes_to_hex_string(const uint8_t* data, std::size_t len);
|
||||
void on_message(proto::Message msg);
|
||||
// 修改原有的 send_response 为更通用的 send_message
|
||||
void send_message(uint16_t msg_type, const std::vector<uint8_t>& payload);
|
||||
tcp::socket socket_;
|
||||
};
|
||||
// ---------------- TCPServer Class ----------------
|
||||
// 这个类负责监听端口并接受新的客户端连接
|
||||
class TCPServer {
|
||||
public:
|
||||
TCPServer(boost::asio::io_context& io_context, uint16_t port);
|
||||
private:
|
||||
void do_accept();
|
||||
tcp::acceptor acceptor_;
|
||||
};
|
||||
|
||||
#endif // SERVER_H
|
||||
|
|
@ -36,7 +36,6 @@ void ProtocolHandler::process_raw_data(const uint8_t* data, std::size_t len) {
|
|||
}
|
||||
|
||||
void ProtocolHandler::parse_messages() {
|
||||
// (此函数内容与原 session::parse_messages 基本完全相同)
|
||||
for (;;) {
|
||||
std::size_t available = receive_buffer_.data_size();
|
||||
|
||||
|
|
@ -70,14 +69,13 @@ void ProtocolHandler::parse_messages() {
|
|||
uint32_t payload_len = proto::be32(data + proto::ProtocolConfig::kMagicLen + proto::ProtocolConfig::kMsgTypeLen);
|
||||
if (payload_len > 10 * 1024 * 1024) {
|
||||
spdlog::error("Received too large payload length: {} bytes. Dropping connection concept.", payload_len);
|
||||
// 这里不能直接关闭socket,但可以清空缓冲区来丢弃这个错误的数据
|
||||
receive_buffer_.read_idx = 0;
|
||||
receive_buffer_.write_idx = 0;
|
||||
return;
|
||||
}
|
||||
std::size_t total_len = proto::ProtocolConfig::kHeaderLen + payload_len + proto::ProtocolConfig::kCrcLen;
|
||||
if (available < total_len) {
|
||||
break; // 半包
|
||||
break;
|
||||
}
|
||||
|
||||
uint32_t calculated_crc = calculate_crc32(data, proto::ProtocolConfig::kHeaderLen + payload_len);
|
||||
|
|
@ -96,16 +94,13 @@ void ProtocolHandler::parse_messages() {
|
|||
msg.payload.assign(payload_ptr, payload_ptr + payload_len);
|
||||
}
|
||||
msg.received_crc = received_crc;
|
||||
|
||||
receive_buffer_.write_idx += total_len;
|
||||
|
||||
on_message(std::move(msg));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ProtocolHandler::on_message(proto::Message msg) {
|
||||
// (此函数内容与原 session::on_message 完全相同)
|
||||
switch (msg.msg_type) {
|
||||
case 0x0001:
|
||||
spdlog::info("MsgType=0x0001 received. Payload Len: {}.", msg.payload.size());
|
||||
|
|
@ -126,7 +121,6 @@ void ProtocolHandler::on_message(proto::Message msg) {
|
|||
}
|
||||
|
||||
void ProtocolHandler::send_message(uint16_t msg_type, const std::vector<uint8_t>& payload) {
|
||||
// (此函数内容与原 session::send_message 类似,但最后是调用回调)
|
||||
std::vector<uint8_t> frame;
|
||||
frame.reserve(proto::ProtocolConfig::kHeaderLen + payload.size() + proto::ProtocolConfig::kCrcLen);
|
||||
|
||||
|
|
@ -136,17 +130,14 @@ void ProtocolHandler::send_message(uint16_t msg_type, const std::vector<uint8_t>
|
|||
proto::write_be16(header_buf, msg_type);
|
||||
proto::write_be32(header_buf + 2, static_cast<uint32_t>(payload.size()));
|
||||
frame.insert(frame.end(), header_buf, header_buf + 6);
|
||||
|
||||
// 组装载荷
|
||||
|
||||
frame.insert(frame.end(), payload.begin(), payload.end());
|
||||
|
||||
// 计算并组装CRC
|
||||
uint32_t crc = calculate_crc32(frame.data(), frame.size());
|
||||
uint8_t crc_buf[4];
|
||||
proto::write_be32(crc_buf, crc);
|
||||
frame.insert(frame.end(), std::begin(crc_buf), std::end(crc_buf));
|
||||
|
||||
// 通过回调函数将数据帧交还给网络层发送
|
||||
if (send_callback_) {
|
||||
send_callback_(frame);
|
||||
}
|
||||
|
|
@ -7,7 +7,7 @@
|
|||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <csignal>
|
||||
#include <iostream>
|
||||
#include <functional> // for std::bind
|
||||
#include <functional>
|
||||
|
||||
// 用于 ASIO 服务的全局 io_context
|
||||
boost::asio::io_context g_io_context;
|
||||
|
|
@ -68,9 +68,10 @@ int main(int argc, char* argv[]) {
|
|||
|
||||
try {
|
||||
constexpr uint16_t tcp_port = 8888;
|
||||
TCPServer tcp_server(g_io_context, tcp_port);
|
||||
MqttClient mqtt_client("tcp://mqtt-broker:1883", "edge-proxy-main-client");
|
||||
MqttRouter mqtt_router(mqtt_client);
|
||||
std::vector<uint16_t> listen_ports = { 8888, 502 };
|
||||
TCPServer tcp_server(g_io_context, listen_ports, mqtt_client);
|
||||
SystemMonitor::SystemMonitor monitor;
|
||||
|
||||
mqtt_client.connect();
|
||||
|
|
|
|||
|
|
@ -1,14 +1,18 @@
|
|||
#include "tcp_server.h"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include "protocol/protocol_factory.h"
|
||||
|
||||
// --- TcpConnection Implementation ---
|
||||
|
||||
TcpConnection::TcpConnection(tcp::socket socket) : socket_(std::move(socket)) {
|
||||
// 关键:创建协议处理器,并使用lambda将自身的send方法作为回调注入
|
||||
TcpConnection::TcpConnection(tcp::socket socket, MqttClient& mqtt_client, std::unique_ptr<IProtocolAdapter> adapter)
|
||||
: socket_(std::move(socket)),
|
||||
mqtt_client_(mqtt_client),
|
||||
protocol_adapter_(std::move(adapter))
|
||||
{
|
||||
// 定义发送回调
|
||||
auto send_callback = [this](const std::vector<uint8_t>& data_to_send) {
|
||||
this->send(data_to_send);
|
||||
};
|
||||
protocol_handler_ = std::make_unique<ProtocolHandler>(send_callback);
|
||||
// 为适配器设置发送回调
|
||||
protocol_adapter_->set_send_callback(send_callback);
|
||||
}
|
||||
|
||||
void TcpConnection::start() {
|
||||
|
|
@ -17,7 +21,7 @@ void TcpConnection::start() {
|
|||
} catch (...) {
|
||||
spdlog::info("Client connected. (Could not get remote endpoint details)");
|
||||
}
|
||||
do_read_some(); // 开始异步读取循环
|
||||
do_read_some();
|
||||
}
|
||||
|
||||
void TcpConnection::do_read_some() {
|
||||
|
|
@ -25,18 +29,20 @@ void TcpConnection::do_read_some() {
|
|||
socket_.async_read_some(boost::asio::buffer(read_buffer_),
|
||||
[this, self](boost::system::error_code ec, std::size_t n) {
|
||||
if (!ec) {
|
||||
// 收到原始数据,直接交给协议处理器
|
||||
protocol_handler_->process_raw_data(read_buffer_.data(), n);
|
||||
// 继续下一次读取
|
||||
// <<< MODIFIED: 调用适配器的 process_raw_data
|
||||
protocol_adapter_->process_raw_data(read_buffer_.data(), n);
|
||||
do_read_some();
|
||||
} else if (ec == boost::asio::error::eof) {
|
||||
spdlog::warn("Client closed connection cleanly: {}", socket_.remote_endpoint().address().to_string());
|
||||
try {
|
||||
spdlog::warn("Client closed connection cleanly: {}", socket_.remote_endpoint().address().to_string());
|
||||
} catch(...) {
|
||||
spdlog::warn("Client closed connection cleanly.");
|
||||
}
|
||||
} else if (ec == boost::asio::error::operation_aborted) {
|
||||
spdlog::debug("Connection operation aborted.");
|
||||
} else {
|
||||
spdlog::error("Connection read error: {}", ec.message());
|
||||
}
|
||||
// 发生错误或EOF时,session的shared_ptr将被释放,socket自动关闭
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -52,25 +58,45 @@ void TcpConnection::send(const std::vector<uint8_t>& data) {
|
|||
});
|
||||
}
|
||||
|
||||
// --- TCPServer Implementation ---
|
||||
|
||||
TCPServer::TCPServer(boost::asio::io_context& io_context, uint16_t port)
|
||||
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) {
|
||||
spdlog::info("TCP Server service initialized on port {}", port);
|
||||
do_accept();
|
||||
TCPServer::TCPServer(boost::asio::io_context& io_context, const std::vector<uint16_t>& ports, MqttClient& mqtt_client)
|
||||
: io_context_(io_context),
|
||||
mqtt_client_(mqtt_client)
|
||||
{
|
||||
for (uint16_t port : ports) {
|
||||
spdlog::info("Initializing TCP listener on port {}", port);
|
||||
// emplace 直接在 map 中构造 acceptor
|
||||
acceptors_.emplace(port, tcp::acceptor(io_context_, tcp::endpoint(tcp::v4(), port)));
|
||||
// 为每个端口启动一个独立的 accept 循环
|
||||
do_accept(port);
|
||||
}
|
||||
}
|
||||
|
||||
void TCPServer::do_accept() {
|
||||
acceptor_.async_accept(
|
||||
[this](boost::system::error_code ec, tcp::socket socket) {
|
||||
void TCPServer::do_accept(uint16_t port) {
|
||||
acceptors_.at(port).async_accept(
|
||||
[this, port](boost::system::error_code ec, tcp::socket socket) {
|
||||
if (!ec) {
|
||||
// 创建新的 TcpConnection 来处理连接
|
||||
std::make_shared<TcpConnection>(std::move(socket))->start();
|
||||
// 1. 定义数据上报回调
|
||||
auto report_callback = [this](const UnifiedData& data) {
|
||||
std::string topic = "devices/" + data.device_id + "/data";
|
||||
spdlog::info("Forwarding data to MQTT topic '{}'", topic);
|
||||
this->mqtt_client_.publish(topic, data.data_json, 1, false);
|
||||
};
|
||||
|
||||
// 2. 使用工厂创建适配器
|
||||
auto adapter = ProtocolFactory::create_adapter(port, report_callback);
|
||||
|
||||
if (adapter) {
|
||||
// 3. 创建 TcpConnection,并将适配器注入
|
||||
std::make_shared<TcpConnection>(std::move(socket), mqtt_client_, std::move(adapter))->start();
|
||||
} else {
|
||||
spdlog::error("Failed to create adapter for port {}, closing connection.", port);
|
||||
socket.close(); // 如果没有合适的适配器,直接关闭连接
|
||||
}
|
||||
} else {
|
||||
spdlog::error("Accept error: {}", ec.message());
|
||||
spdlog::error("Accept error on port {}: {}", port, ec.message());
|
||||
}
|
||||
// 继续等待下一个连接
|
||||
do_accept();
|
||||
// 4. 继续等待该端口的下一个连接
|
||||
do_accept(port);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
@ -1,18 +1,21 @@
|
|||
// 文件名: tcp_server.h
|
||||
#ifndef TCP_SERVER_H
|
||||
#define TCP_SERVER_H
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
#include <array>
|
||||
#include "../protocol/protocol_handler.h" // 包含协议处理器头文件
|
||||
#include <vector>
|
||||
#include <map>
|
||||
#include "iprotocol_adapter.h"
|
||||
#include "mqtt_client.h"
|
||||
|
||||
using boost::asio::ip::tcp;
|
||||
|
||||
// 代表一个TCP连接,管理socket生命周期和原始数据收发
|
||||
class MqttClient;
|
||||
|
||||
class TcpConnection : public std::enable_shared_from_this<TcpConnection> {
|
||||
public:
|
||||
explicit TcpConnection(tcp::socket socket);
|
||||
explicit TcpConnection(tcp::socket socket, MqttClient& mqtt_client, std::unique_ptr<IProtocolAdapter> adapter);
|
||||
void start();
|
||||
|
||||
private:
|
||||
|
|
@ -20,20 +23,24 @@ private:
|
|||
void send(const std::vector<uint8_t>& data);
|
||||
|
||||
tcp::socket socket_;
|
||||
std::array<uint8_t, 4096> read_buffer_; // 简化的原始字节缓冲区
|
||||
|
||||
// 每个连接拥有一个协议处理器实例
|
||||
std::unique_ptr<ProtocolHandler> protocol_handler_;
|
||||
std::array<uint8_t, 8192> read_buffer_;
|
||||
std::unique_ptr<IProtocolAdapter> protocol_adapter_;
|
||||
MqttClient& mqtt_client_;
|
||||
};
|
||||
|
||||
// 负责监听端口并接受新连接
|
||||
class TCPServer {
|
||||
public:
|
||||
TCPServer(boost::asio::io_context& io_context, uint16_t port);
|
||||
// <<< MODIFIED: 构造函数接收一个端口列表
|
||||
TCPServer(boost::asio::io_context& io_context, const std::vector<uint16_t>& ports, MqttClient& mqtt_client);
|
||||
|
||||
private:
|
||||
void do_accept();
|
||||
tcp::acceptor acceptor_;
|
||||
// <<< MODIFIED: 为每个端口启动一个accept循环
|
||||
void do_accept(uint16_t port);
|
||||
|
||||
boost::asio::io_context& io_context_;
|
||||
MqttClient& mqtt_client_;
|
||||
// <<< MODIFIED: 使用 map 来管理多个 acceptor,key是端口号
|
||||
std::map<uint16_t, tcp::acceptor> acceptors_;
|
||||
};
|
||||
|
||||
#endif // TCP_SERVER_H
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
// 文件名: iprotocol_adapter.h
|
||||
#ifndef IPROTOCOL_ADAPTER_H
|
||||
#define IPROTOCOL_ADAPTER_H
|
||||
|
||||
#include <functional>
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
|
||||
/**
|
||||
* @brief 统一数据格式 (Unified Data Format)
|
||||
* * 所有协议适配器在解析完特定协议的数据后,都应将其转换为这个标准结构体。
|
||||
* 这样做可以使上层业务逻辑(如MQTT转发、数据库存储)与底层具体的协议实现解耦。
|
||||
*/
|
||||
struct UnifiedData {
|
||||
std::string device_id; // 产生数据的设备唯一ID
|
||||
int64_t timestamp_ms; // 数据产生的时间戳 (毫秒级UTC)
|
||||
std::string data_json; // 采用JSON字符串格式,具有良好的灵活性和可扩展性
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief 定义回调函数,用于将解析后的标准数据上报给核心业务层
|
||||
*/
|
||||
using ReportDataCallback = std::function<void(const UnifiedData&)>;
|
||||
|
||||
/**
|
||||
* @brief 协议适配器接口 (纯虚基类)
|
||||
* * 所有具体的协议实现(如私有协议、Modbus、OPC-UA)都应继承自这个接口。
|
||||
* 它定义了协议适配器的标准行为。
|
||||
*/
|
||||
class IProtocolAdapter {
|
||||
public:
|
||||
virtual ~IProtocolAdapter() = default;
|
||||
|
||||
/**
|
||||
* @brief 定义回调函数类型,用于将需要发送的原始数据帧交由网络层发送
|
||||
*/
|
||||
using SendCallback = std::function<void(const std::vector<uint8_t>&)>;
|
||||
|
||||
/**
|
||||
* @brief 设置发送回调函数
|
||||
* @param cb 网络层(如TCPSession)在创建Adapter实例后,会通过此方法将自身的发送函数注册进来。
|
||||
*/
|
||||
virtual void set_send_callback(SendCallback cb) = 0;
|
||||
|
||||
/**
|
||||
* @brief 从网络层接收原始字节流的唯一入口点
|
||||
* @param data 指向接收到的数据块的指针
|
||||
* @param len 数据块的长度
|
||||
* * 网络层收到数据后,会调用此函数将数据喂给协议适配器进行处理。
|
||||
*/
|
||||
virtual void process_raw_data(const uint8_t* data, std::size_t len) = 0;
|
||||
};
|
||||
|
||||
#endif // IPROTOCOL_ADAPTER_H
|
||||
|
|
@ -0,0 +1,280 @@
|
|||
// 文件名: private_protocol_adapter.cc
|
||||
#include "private_protocol_adapter.h"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include <boost/crc.hpp>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include <cstring>
|
||||
#include <cctype>
|
||||
#include <algorithm>
|
||||
#include <chrono> // 用于获取时间戳
|
||||
|
||||
// CRC32计算函数 (与您原来的设计保持一致)
|
||||
inline uint32_t calculate_crc32(const uint8_t* data, size_t length) {
|
||||
boost::crc_32_type result;
|
||||
result.process_bytes(data, length);
|
||||
return result.checksum();
|
||||
}
|
||||
|
||||
// 构造函数,初始化 report_callback_
|
||||
PrivateProtocolAdapter::PrivateProtocolAdapter(ReportDataCallback report_cb)
|
||||
: report_callback_(std::move(report_cb)) {}
|
||||
|
||||
// 实现接口:设置发送回调
|
||||
void PrivateProtocolAdapter::set_send_callback(SendCallback cb) {
|
||||
send_callback_ = std::move(cb);
|
||||
}
|
||||
|
||||
// 实现接口:处理原始数据 (与您原来的逻辑完全相同)
|
||||
void PrivateProtocolAdapter::process_raw_data(const uint8_t* data, std::size_t len) {
|
||||
if (receive_buffer_.free_space() < len) {
|
||||
spdlog::warn("Buffer has only {} bytes free, but received {} bytes. Compacting.", receive_buffer_.free_space(), len);
|
||||
receive_buffer_.compact();
|
||||
if (receive_buffer_.free_space() < len) {
|
||||
spdlog::error("Buffer full even after compacting. Cannot process new data. This indicates a protocol error or slow consumer.");
|
||||
receive_buffer_.read_idx = 0;
|
||||
receive_buffer_.write_idx = 0;
|
||||
return;
|
||||
}
|
||||
}
|
||||
std::memcpy(receive_buffer_.write_ptr(), data, len);
|
||||
receive_buffer_.read_idx += len;
|
||||
|
||||
parse_messages();
|
||||
}
|
||||
|
||||
// 解析消息 (与您原来的逻辑完全相同)
|
||||
void PrivateProtocolAdapter::parse_messages() {
|
||||
for (;;) {
|
||||
std::size_t available = receive_buffer_.data_size();
|
||||
|
||||
if (available < proto::ProtocolConfig::kHeaderLen + proto::ProtocolConfig::kCrcLen) {
|
||||
break;
|
||||
}
|
||||
const uint8_t* data = receive_buffer_.current_data();
|
||||
std::size_t offset = 0;
|
||||
|
||||
while (available - offset >= proto::ProtocolConfig::kHeaderLen + proto::ProtocolConfig::kCrcLen) {
|
||||
if (proto::match_magic(data + offset)) {
|
||||
break;
|
||||
}
|
||||
++offset;
|
||||
}
|
||||
if (offset > 0) {
|
||||
spdlog::warn("Skipped {} bytes until magic.", offset);
|
||||
receive_buffer_.write_idx += offset;
|
||||
available = receive_buffer_.data_size();
|
||||
data = receive_buffer_.current_data();
|
||||
|
||||
if (available < proto::ProtocolConfig::kHeaderLen + proto::ProtocolConfig::kCrcLen) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!proto::match_magic(data)) {
|
||||
break;
|
||||
}
|
||||
|
||||
uint32_t payload_len = proto::be32(data + proto::ProtocolConfig::kMagicLen + proto::ProtocolConfig::kMsgTypeLen);
|
||||
if (payload_len > 10 * 1024 * 1024) {
|
||||
spdlog::error("Received too large payload length: {} bytes. Dropping connection concept.", payload_len);
|
||||
receive_buffer_.read_idx = 0;
|
||||
receive_buffer_.write_idx = 0;
|
||||
return;
|
||||
}
|
||||
std::size_t total_len = proto::ProtocolConfig::kHeaderLen + payload_len + proto::ProtocolConfig::kCrcLen;
|
||||
if (available < total_len) {
|
||||
break;
|
||||
}
|
||||
|
||||
uint32_t calculated_crc = calculate_crc32(data, proto::ProtocolConfig::kHeaderLen + payload_len);
|
||||
uint32_t received_crc = proto::be32(data + proto::ProtocolConfig::kHeaderLen + payload_len);
|
||||
|
||||
if (calculated_crc != received_crc) {
|
||||
spdlog::error("CRC mismatch! Calculated: 0x{:08x}, Received: 0x{:08x}", calculated_crc, received_crc);
|
||||
receive_buffer_.write_idx += total_len;
|
||||
continue;
|
||||
}
|
||||
|
||||
proto::Message msg;
|
||||
msg.msg_type = proto::be16(data + proto::ProtocolConfig::kMagicLen);
|
||||
if (payload_len > 0) {
|
||||
const uint8_t* payload_ptr = data + proto::ProtocolConfig::kHeaderLen;
|
||||
msg.payload.assign(payload_ptr, payload_ptr + payload_len);
|
||||
}
|
||||
msg.received_crc = received_crc;
|
||||
receive_buffer_.write_idx += total_len;
|
||||
on_message(std::move(msg));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// =================================================================================
|
||||
// 核心修改点:处理具体消息
|
||||
// 在这里,我们将特定协议的消息,转换为统一的数据格式 UnifiedData 并上报
|
||||
// =================================================================================
|
||||
void PrivateProtocolAdapter::on_message(proto::Message msg) {
|
||||
// 使用 using 简化 nlohmann::json 的调用
|
||||
using json = nlohmann::json;
|
||||
|
||||
switch (msg.msg_type) {
|
||||
case 0x0001: // 假设 0x0001 是设备数据上报消息
|
||||
{
|
||||
spdlog::info("MsgType=0x0001 received. Payload Len: {}.", msg.payload.size());
|
||||
|
||||
// 1. 创建一个 UnifiedData 实例
|
||||
UnifiedData report_data;
|
||||
report_data.device_id = "private_device_01"; // ID应从payload或连接信息中获取
|
||||
report_data.timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch()
|
||||
).count();
|
||||
|
||||
// 2. 创建一个 json 对象来构建 data_json
|
||||
json data_j;
|
||||
|
||||
// ========================= 【核心转换逻辑】 =========================
|
||||
// 在这里,我们根据您的协议定义来解析 payload
|
||||
// 【示例】:假设 payload 是一个结构化的二进制数据
|
||||
// - 第 0-3 字节: 温度 (float, big-endian)
|
||||
// - 第 4-7 字节: 湿度 (float, big-endian)
|
||||
// - 第 8-9 字节: 电压 (uint16_t, big-endian, 单位 毫伏)
|
||||
// - 后面跟着一个字符串设备序列号
|
||||
// ===================================================================
|
||||
if (msg.payload.size() >= 10) {
|
||||
try {
|
||||
const uint8_t* p = msg.payload.data();
|
||||
|
||||
// 解析温度 (float)
|
||||
uint32_t temp_raw = proto::be32(p);
|
||||
float temperature;
|
||||
std::memcpy(&temperature, &temp_raw, sizeof(float));
|
||||
|
||||
// 解析湿度 (float)
|
||||
uint32_t hum_raw = proto::be32(p + 4);
|
||||
float humidity;
|
||||
std::memcpy(&humidity, &hum_raw, sizeof(float));
|
||||
|
||||
// 解析电压 (uint16_t)
|
||||
uint16_t voltage_mv = proto::be16(p + 8);
|
||||
|
||||
// 解析设备序列号 (string)
|
||||
std::string serial_number;
|
||||
if (msg.payload.size() > 10) {
|
||||
serial_number.assign(reinterpret_cast<const char*>(p + 10), msg.payload.size() - 10);
|
||||
}
|
||||
|
||||
// 构建 JSON 对象
|
||||
data_j["protocol"] = "private_structured";
|
||||
data_j["values"] = {
|
||||
{"temperature", temperature},
|
||||
{"humidity", humidity},
|
||||
{"voltage_mv", voltage_mv}
|
||||
};
|
||||
data_j["sn"] = serial_number;
|
||||
|
||||
} catch (const json::exception& e) {
|
||||
spdlog::error("nlohmann::json exception: {}", e.what());
|
||||
// 如果JSON构建失败,可以发送一个错误格式的JSON
|
||||
data_j = {
|
||||
{"error", "Failed to parse structured payload"},
|
||||
{"payload_size", msg.payload.size()}
|
||||
};
|
||||
}
|
||||
} else {
|
||||
// 如果payload长度不符合预期,我们也记录一个错误
|
||||
spdlog::warn("Payload size {} is too small for structured data.", msg.payload.size());
|
||||
data_j = {
|
||||
{"error", "Invalid payload size"},
|
||||
{"payload_size", msg.payload.size()}
|
||||
};
|
||||
}
|
||||
|
||||
report_data.data_json = data_j.dump();
|
||||
|
||||
spdlog::debug("Generated JSON: {}", report_data.data_json);
|
||||
|
||||
// 4. 通过回调函数将统一格式的数据上报给核心业务层
|
||||
if (report_callback_) {
|
||||
report_callback_(report_data);
|
||||
}
|
||||
|
||||
// 5. 发送响应给客户端 (原有逻辑)
|
||||
std::string response_str = "JSON data processed successfully.";
|
||||
std::vector<uint8_t> response_payload(response_str.begin(), response_str.end());
|
||||
send_message(0x0002, response_payload);
|
||||
break;
|
||||
}
|
||||
case 0x0100: // 心跳消息
|
||||
{
|
||||
spdlog::info("Heartbeat (MsgType=0x0100) received.");
|
||||
send_message(0x0101, {});
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
spdlog::warn("Unknown MsgType=0x{:04x} received.", msg.msg_type);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 发送消息 (与您原来的逻辑完全相同)
|
||||
void PrivateProtocolAdapter::send_message(uint16_t msg_type, const std::vector<uint8_t>& payload) {
|
||||
std::vector<uint8_t> frame;
|
||||
frame.reserve(proto::ProtocolConfig::kHeaderLen + payload.size() + proto::ProtocolConfig::kCrcLen);
|
||||
|
||||
frame.insert(frame.end(), std::begin(proto::ProtocolConfig::kMagic), std::end(proto::ProtocolConfig::kMagic));
|
||||
uint8_t header_buf[6];
|
||||
proto::write_be16(header_buf, msg_type);
|
||||
proto::write_be32(header_buf + 2, static_cast<uint32_t>(payload.size()));
|
||||
frame.insert(frame.end(), header_buf, header_buf + 6);
|
||||
|
||||
frame.insert(frame.end(), payload.begin(), payload.end());
|
||||
|
||||
uint32_t crc = calculate_crc32(frame.data(), frame.size());
|
||||
uint8_t crc_buf[4];
|
||||
proto::write_be32(crc_buf, crc);
|
||||
frame.insert(frame.end(), std::begin(crc_buf), std::end(crc_buf));
|
||||
|
||||
if (send_callback_) {
|
||||
send_callback_(frame);
|
||||
}
|
||||
}
|
||||
|
||||
// --- ReceiveBuffer 及辅助函数的实现 (均与您原来的代码相同) ---
|
||||
|
||||
void PrivateProtocolAdapter::ReceiveBuffer::compact() {
|
||||
if (write_idx > 0) {
|
||||
std::size_t remaining = data_size();
|
||||
if (remaining > 0) {
|
||||
std::memmove(data_, data_ + write_idx, remaining);
|
||||
}
|
||||
read_idx = remaining;
|
||||
write_idx = 0;
|
||||
}
|
||||
}
|
||||
std::size_t PrivateProtocolAdapter::ReceiveBuffer::data_size() const { return read_idx - write_idx; }
|
||||
std::size_t PrivateProtocolAdapter::ReceiveBuffer::free_space() const { return capacity_ - read_idx; }
|
||||
const uint8_t* PrivateProtocolAdapter::ReceiveBuffer::current_data() const { return data_ + write_idx; }
|
||||
uint8_t* PrivateProtocolAdapter::ReceiveBuffer::write_ptr() { return data_ + read_idx; }
|
||||
|
||||
|
||||
std::string PrivateProtocolAdapter::payload_to_ascii_string(const std::vector<uint8_t>& payload) {
|
||||
std::string ascii_str;
|
||||
ascii_str.reserve(payload.size());
|
||||
for (auto byte : payload) {
|
||||
ascii_str.push_back(std::isprint(byte) ? static_cast<char>(byte) : '.');
|
||||
}
|
||||
return ascii_str;
|
||||
}
|
||||
|
||||
std::string PrivateProtocolAdapter::bytes_to_hex_string(const uint8_t* data, std::size_t len) {
|
||||
std::string hex_str;
|
||||
hex_str.reserve(len * 3);
|
||||
static const char* digits = "0123456789abcdef";
|
||||
for (std::size_t i = 0; i < len; ++i) {
|
||||
hex_str.push_back(digits[(data[i] >> 4) & 0xF]);
|
||||
hex_str.push_back(digits[data[i] & 0xF]);
|
||||
if (i < len - 1) hex_str.push_back(' ');
|
||||
}
|
||||
return hex_str;
|
||||
}
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
// 文件名: private_protocol_adapter.h
|
||||
#ifndef PRIVATE_PROTOCOL_ADAPTER_H
|
||||
#define PRIVATE_PROTOCOL_ADAPTER_H
|
||||
|
||||
#include "iprotocol_adapter.h" // 引入新的接口
|
||||
#include "protocol.h" // 依赖您原来的协议定义 (protocol.h)
|
||||
|
||||
#include <functional>
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
|
||||
/**
|
||||
* @brief 私有协议适配器
|
||||
* * 这个类实现了 IProtocolAdapter 接口,专门用于处理您之前定义的私有二进制协议。
|
||||
* 它的内部逻辑大部分来自于您原来的 ProtocolHandler 类。
|
||||
*/
|
||||
class PrivateProtocolAdapter : public IProtocolAdapter {
|
||||
public:
|
||||
/**
|
||||
* @brief 构造函数
|
||||
* @param report_cb 用于将解析出的 UnifiedData 上报给核心业务层的回调函数
|
||||
*/
|
||||
explicit PrivateProtocolAdapter(ReportDataCallback report_cb);
|
||||
|
||||
// --- 实现 IProtocolAdapter 接口 ---
|
||||
void set_send_callback(SendCallback cb) override;
|
||||
void process_raw_data(const uint8_t* data, std::size_t len) override;
|
||||
|
||||
private:
|
||||
// 内部缓冲区,用于处理粘包/半包问题 (与您原来的设计保持一致)
|
||||
struct ReceiveBuffer {
|
||||
static constexpr std::size_t capacity_ = 8192;
|
||||
uint8_t data_[capacity_];
|
||||
std::size_t read_idx = 0;
|
||||
std::size_t write_idx = 0;
|
||||
void compact();
|
||||
std::size_t data_size() const;
|
||||
std::size_t free_space() const;
|
||||
const uint8_t* current_data() const;
|
||||
uint8_t* write_ptr();
|
||||
} receive_buffer_;
|
||||
|
||||
// 协议解析与业务处理函数
|
||||
void parse_messages();
|
||||
void on_message(proto::Message msg);
|
||||
void send_message(uint16_t msg_type, const std::vector<uint8_t>& payload);
|
||||
|
||||
// 辅助函数
|
||||
std::string payload_to_ascii_string(const std::vector<uint8_t>& payload);
|
||||
std::string bytes_to_hex_string(const uint8_t* data, std::size_t len);
|
||||
|
||||
ReportDataCallback report_callback_; // 回调到核心业务层
|
||||
SendCallback send_callback_; // 回调到网络层
|
||||
};
|
||||
|
||||
#endif // PRIVATE_PROTOCOL_ADAPTER_H
|
||||
|
|
@ -0,0 +1,23 @@
|
|||
#include "protocol_factory.h"
|
||||
#include "private_protocol_adapter.h"
|
||||
// #include "modbus_tcp_adapter.h" // <-- 未来在这里包含新的协议适配器
|
||||
|
||||
#include "spdlog/spdlog.h"
|
||||
|
||||
std::unique_ptr<IProtocolAdapter> ProtocolFactory::create_adapter(uint16_t port, ReportDataCallback report_cb) {
|
||||
spdlog::info("Creating protocol adapter for port {}", port);
|
||||
|
||||
// 根据端口号决定创建哪个适配器
|
||||
if (port == 8888) { // 假设 8001 是您的私有协议端口
|
||||
return std::make_unique<PrivateProtocolAdapter>(std::move(report_cb));
|
||||
}
|
||||
|
||||
else if (port == 502) {
|
||||
//TODO:这里没有写modbus协议的Adapter,快去写
|
||||
return std::make_unique<PrivateProtocolAdapter>(std::move(report_cb));
|
||||
}
|
||||
|
||||
// 如果没有匹配的端口,返回空指针
|
||||
spdlog::error("No protocol adapter configured for port {}", port);
|
||||
return nullptr;
|
||||
}
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
#ifndef PROTOCOL_FACTORY_H
|
||||
#define PROTOCOL_FACTORY_H
|
||||
|
||||
#include "iprotocol_adapter.h"
|
||||
#include <memory>
|
||||
#include <cstdint>
|
||||
|
||||
class ProtocolFactory {
|
||||
public:
|
||||
/**
|
||||
* @brief 根据端口号创建对应的协议适配器实例
|
||||
* * @param port 客户端连接上来的服务器端口号
|
||||
* @param report_cb 用于上报数据的回调函数
|
||||
* @return std::unique_ptr<IProtocolAdapter> 创建好的适配器实例,如果端口不支持则返回 nullptr
|
||||
*/
|
||||
static std::unique_ptr<IProtocolAdapter> create_adapter(uint16_t port, ReportDataCallback report_cb);
|
||||
};
|
||||
|
||||
#endif // PROTOCOL_FACTORY_H
|
||||
Loading…
Reference in New Issue