使用配置文件来管理设备
This commit is contained in:
parent
45f89eb57e
commit
3aaec6a42c
|
|
@ -23,6 +23,9 @@ add_library(edge_proxy_lib STATIC
|
|||
# TCP通讯层
|
||||
src/network/tcp_server.cc
|
||||
|
||||
# --- 设备管理模块 ---
|
||||
src/device_manager.cc
|
||||
|
||||
# 小工具
|
||||
src/utils/mqtt_topic_matcher.cpp
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,52 @@
|
|||
{
|
||||
"modbus_rtu_devices": [
|
||||
{
|
||||
"enabled": true,
|
||||
"device_id": "rtu_temp_sensor_lab",
|
||||
"port_path": "/dev/ttyS7",
|
||||
"baud_rate": 9600,
|
||||
"slave_id": 1,
|
||||
"poll_interval_ms": 5000,
|
||||
"data_points": [
|
||||
{"name": "temperature", "address": 0, "type": "INT16", "scale": 0.1},
|
||||
{"name": "humidity", "address": 1, "type": "UINT16", "scale": 0.1}
|
||||
]
|
||||
},
|
||||
{
|
||||
"enabled": false,
|
||||
"device_id": "water_meter_main_gate",
|
||||
"port_path": "/dev/ttyS7",
|
||||
"baud_rate": 9600,
|
||||
"slave_id": 5,
|
||||
"poll_interval_ms": 10000,
|
||||
"data_points": [
|
||||
{"name": "total_flow", "address": 256, "type": "FLOAT32", "scale": 1.0}
|
||||
]
|
||||
},
|
||||
{
|
||||
"enabled": false,
|
||||
"device_id": "backup_counter",
|
||||
"port_path": "/dev/ttyS7",
|
||||
"baud_rate": 9600,
|
||||
"slave_id": 10,
|
||||
"poll_interval_ms": 1000,
|
||||
"data_points": [
|
||||
{"name": "count", "address": 32, "type": "UINT32"}
|
||||
]
|
||||
}
|
||||
],
|
||||
"modbus_tcp_devices": [
|
||||
{
|
||||
"enabled": false,
|
||||
"device_id": "plc_workshop1",
|
||||
"ip_address": "192.168.1.120",
|
||||
"port": 502,
|
||||
"slave_id": 1,
|
||||
"poll_interval_ms": 1000,
|
||||
"data_points": [
|
||||
{"name": "motor_speed", "address": 100, "type": "UINT16"},
|
||||
{"name": "pressure", "address": 102, "type": "FLOAT32", "scale": 0.01}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
@ -0,0 +1,114 @@
|
|||
// 文件名: src/device_manager.cc
|
||||
#include "device_manager.h"
|
||||
#include "modbus/modbus_common.h"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include <nlohmann/json.hpp>
|
||||
#include <fstream>
|
||||
#include <map>
|
||||
|
||||
using json = nlohmann::json;
|
||||
|
||||
// 辅助函数:将JSON中的字符串类型映射到C++枚举
|
||||
static ModbusDataType string_to_modbus_data_type(const std::string& type_str) {
|
||||
static const std::map<std::string, ModbusDataType> type_map = {
|
||||
{"UINT16", ModbusDataType::UINT16},
|
||||
{"INT16", ModbusDataType::INT16},
|
||||
{"UINT32", ModbusDataType::UINT32},
|
||||
{"INT32", ModbusDataType::INT32},
|
||||
{"FLOAT32", ModbusDataType::FLOAT32}
|
||||
};
|
||||
auto it = type_map.find(type_str);
|
||||
if (it != type_map.end()) {
|
||||
return it->second;
|
||||
}
|
||||
throw std::runtime_error("Unknown ModbusDataType string: " + type_str);
|
||||
}
|
||||
|
||||
|
||||
DeviceManager::DeviceManager(boost::asio::io_context& io_context, ReportDataCallback report_cb)
|
||||
: m_io_context(io_context), m_report_callback(std::move(report_cb)) {}
|
||||
|
||||
DeviceManager::~DeviceManager() {
|
||||
stop_all();
|
||||
}
|
||||
|
||||
void DeviceManager::load_and_start(const std::string& config_path) {
|
||||
spdlog::info("Loading device configuration from '{}'...", config_path);
|
||||
std::ifstream config_file(config_path);
|
||||
if (!config_file.is_open()) {
|
||||
spdlog::critical("Failed to open configuration file: {}", config_path);
|
||||
throw std::runtime_error("Configuration file not found.");
|
||||
}
|
||||
|
||||
try {
|
||||
json config_json = json::parse(config_file);
|
||||
|
||||
// --- 加载 Modbus RTU 设备 ---
|
||||
if (config_json.contains("modbus_rtu_devices")) {
|
||||
for (const auto& dev_json : config_json["modbus_rtu_devices"]) {
|
||||
if (!dev_json.value("enabled", false)) continue;
|
||||
|
||||
ModbusRtuDeviceConfig config;
|
||||
config.device_id = dev_json.at("device_id").get<std::string>();
|
||||
config.port_path = dev_json.at("port_path").get<std::string>();
|
||||
config.baud_rate = dev_json.at("baud_rate").get<unsigned int>();
|
||||
config.slave_id = dev_json.at("slave_id").get<uint8_t>();
|
||||
config.poll_interval_ms = dev_json.at("poll_interval_ms").get<int>();
|
||||
|
||||
for (const auto& dp_json : dev_json.at("data_points")) {
|
||||
config.data_points.push_back({
|
||||
dp_json.at("name").get<std::string>(),
|
||||
(uint16_t)dp_json.at("address").get<int>(),
|
||||
string_to_modbus_data_type(dp_json.at("type").get<std::string>()),
|
||||
dp_json.value("scale", 1.0) // .value for optional fields
|
||||
});
|
||||
}
|
||||
|
||||
auto service = std::make_unique<ModbusRtuPollerService>(config, m_report_callback);
|
||||
service->start();
|
||||
m_rtu_services.push_back(std::move(service));
|
||||
spdlog::info("Started Modbus RTU service for device '{}'.", config.device_id);
|
||||
}
|
||||
}
|
||||
|
||||
// --- 加载 Modbus TCP 设备 ---
|
||||
if (config_json.contains("modbus_tcp_devices")) {
|
||||
for (const auto& dev_json : config_json["modbus_tcp_devices"]) {
|
||||
if (!dev_json.value("enabled", false)) continue;
|
||||
|
||||
ModbusDeviceConfig config; // This is the TCP config struct
|
||||
config.device_id = dev_json.at("device_id").get<std::string>();
|
||||
config.ip_address = dev_json.at("ip_address").get<std::string>();
|
||||
config.port = dev_json.at("port").get<uint16_t>();
|
||||
config.slave_id = dev_json.at("slave_id").get<uint8_t>();
|
||||
config.poll_interval_ms = dev_json.at("poll_interval_ms").get<int>();
|
||||
|
||||
// Note: The current ModbusMasterPoller doesn't use data_points yet.
|
||||
// This is a point for future enhancement to make it also configuration-driven.
|
||||
// For now, we just start it based on connection params.
|
||||
// You would need to refactor ModbusMasterPoller similarly to ModbusRtuPollerService
|
||||
// to make it fully generic.
|
||||
|
||||
auto poller = std::make_shared<ModbusMasterPoller>(m_io_context, config, m_report_callback);
|
||||
poller->start();
|
||||
m_tcp_pollers.push_back(poller);
|
||||
spdlog::info("Started Modbus TCP service for device '{}'.", config.device_id);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (const json::exception& e) {
|
||||
spdlog::critical("Failed to parse JSON configuration: {}", e.what());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void DeviceManager::stop_all() {
|
||||
spdlog::info("Stopping all device services...");
|
||||
for (auto& service : m_rtu_services) {
|
||||
service->stop();
|
||||
}
|
||||
// For Asio-based objects, stopping is usually handled by stopping the io_context.
|
||||
// If they have explicit stop logic, call it here.
|
||||
m_tcp_pollers.clear();
|
||||
spdlog::info("All device services stopped.");
|
||||
}
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
// 文件名: src/device_manager.h
|
||||
#ifndef DEVICE_MANAGER_H
|
||||
#define DEVICE_MANAGER_H
|
||||
|
||||
#include "protocol/iprotocol_adapter.h"
|
||||
#include "modbus/modbus_rtu_poller_service.h"
|
||||
#include "modbus/modbus_master_poller.h"
|
||||
#include <boost/asio.hpp>
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
/**
|
||||
* @brief 设备管理模块
|
||||
* * 负责从配置文件加载设备信息,并动态创建、启动和停止相应的
|
||||
* 设备服务实例(如Modbus轮询器)。
|
||||
*/
|
||||
class DeviceManager {
|
||||
public:
|
||||
DeviceManager(boost::asio::io_context& io_context, ReportDataCallback report_cb);
|
||||
~DeviceManager();
|
||||
|
||||
// 禁止拷贝和赋值
|
||||
DeviceManager(const DeviceManager&) = delete;
|
||||
DeviceManager& operator=(const DeviceManager&) = delete;
|
||||
|
||||
/**
|
||||
* @brief 从JSON配置文件加载所有设备并启动服务
|
||||
* @param config_path JSON配置文件的路径
|
||||
*/
|
||||
void load_and_start(const std::string& config_path);
|
||||
|
||||
/**
|
||||
* @brief 安全地停止所有正在运行的设备服务
|
||||
*/
|
||||
void stop_all();
|
||||
|
||||
private:
|
||||
boost::asio::io_context& m_io_context;
|
||||
ReportDataCallback m_report_callback;
|
||||
|
||||
// 用于存储正在运行的服务实例,以管理其生命周期
|
||||
std::vector<std::unique_ptr<ModbusRtuPollerService>> m_rtu_services;
|
||||
std::vector<std::shared_ptr<ModbusMasterPoller>> m_tcp_pollers;
|
||||
};
|
||||
|
||||
#endif // DEVICE_MANAGER_H
|
||||
107
src/main.cpp
107
src/main.cpp
|
|
@ -1,76 +1,66 @@
|
|||
// main.cpp
|
||||
|
||||
#include "network/tcp_server.h"
|
||||
#include "mqtt/mqtt_client.h"
|
||||
#include "mqtt/mqtt_router.h"
|
||||
#include "systemMonitor/system_monitor.h"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include "modbus/modbus_master_poller.h"
|
||||
#include "modbus/modbus_rtu_poller_service.h"
|
||||
#include "device_manager.h" // <<< MODIFIED: 包含新的设备管理器头文件
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <csignal>
|
||||
#include <iostream>
|
||||
#include <functional>
|
||||
#include <functional>
|
||||
|
||||
// 用于 ASIO 服务的全局 io_context
|
||||
boost::asio::io_context g_io_context;
|
||||
|
||||
/**
|
||||
* @brief 处理终止信号 (SIGINT, SIGTERM).
|
||||
*
|
||||
* 这个函数会优雅地停止 Boost.Asio 的 io_context,
|
||||
* 这将导致 main() 函数中的 io_context.run() 调用解除阻塞,
|
||||
* 从而允许程序干净地退出。
|
||||
*
|
||||
* @param signum 接收到的信号编号。
|
||||
*/
|
||||
void signalHandler(int signum) {
|
||||
spdlog::warn("Interrupt signal ({}) received. Shutting down.", signum);
|
||||
// 向 io_context 提交一个停止事件。这是一种线程安全的方式
|
||||
// 来告诉 io_context 停止其事件循环。
|
||||
g_io_context.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief 周期性轮询系统状态并发布到 MQTT
|
||||
*/
|
||||
void poll_system_metrics(
|
||||
boost::asio::steady_timer& timer,
|
||||
SystemMonitor::SystemMonitor& monitor,
|
||||
MqttClient& mqtt_client
|
||||
) {
|
||||
// a. 采集数据
|
||||
auto cpu_util = monitor.getCpuUtilization();
|
||||
auto cpu_util = monitor.getCpuUtilization();
|
||||
auto mem_info = monitor.getMemoryInfo();
|
||||
double mem_total_gb = mem_info.total_kb / 1024.0 / 1024.0;
|
||||
|
||||
// b. (示例) 将数据通过 MQTT 发送出去
|
||||
|
||||
std::string topic = "proxy/system_status";
|
||||
std::string payload = "{\"cpu_usage\":" + std::to_string(cpu_util.totalUsagePercentage) +
|
||||
std::string payload = "{\"cpu_usage\":" + std::to_string(cpu_util.totalUsagePercentage) +
|
||||
",\"mem_total_gb\":" + std::to_string(mem_total_gb) + "}";
|
||||
mqtt_client.publish(topic, payload);
|
||||
spdlog::debug("System metrics published.");
|
||||
|
||||
// c. 将定时器重置到 15 秒后
|
||||
timer.expires_at(timer.expiry() + std::chrono::seconds(15));
|
||||
|
||||
// d. 异步等待定时器,到期后再次调用本函数,形成循环
|
||||
timer.async_wait(std::bind(poll_system_metrics, std::ref(timer), std::ref(monitor), std::ref(mqtt_client)));
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
|
||||
try {
|
||||
spdlog::set_level(spdlog::level::debug); // 设置日志级别为 debug,方便调试
|
||||
spdlog::set_level(spdlog::level::debug);
|
||||
spdlog::info("Edge Proxy starting up...");
|
||||
} catch (const spdlog::spdlog_ex& ex) {
|
||||
std::cerr << "Log initialization failed: " << ex.what() << std::endl;
|
||||
return 1;
|
||||
}
|
||||
|
||||
// 注册信号处理器
|
||||
signal(SIGINT, signalHandler);
|
||||
signal(SIGTERM, signalHandler);
|
||||
|
||||
try {
|
||||
constexpr uint16_t tcp_port = 8888;
|
||||
// --- 1. 初始化核心服务 ---
|
||||
MqttClient mqtt_client("tcp://mqtt-broker:1883", "edge-proxy-main-client");
|
||||
MqttRouter mqtt_router(mqtt_client);
|
||||
std::vector<uint16_t> listen_ports = { 8888 };
|
||||
|
|
@ -80,67 +70,32 @@ int main(int argc, char* argv[]) {
|
|||
mqtt_client.connect();
|
||||
mqtt_router.start();
|
||||
|
||||
monitor.getCpuUtilization();
|
||||
boost::asio::steady_timer timer(g_io_context, std::chrono::seconds(15));
|
||||
timer.async_wait(std::bind(poll_system_metrics, std::ref(timer), std::ref(monitor), std::ref(mqtt_client)));
|
||||
// --- 2. 启动系统状态监控定时器 ---
|
||||
monitor.getCpuUtilization(); // 首次调用以初始化
|
||||
boost::asio::steady_timer system_monitor_timer(g_io_context, std::chrono::seconds(15));
|
||||
system_monitor_timer.async_wait(std::bind(poll_system_metrics, std::ref(system_monitor_timer), std::ref(monitor), std::ref(mqtt_client)));
|
||||
|
||||
// --- 3. 创建统一的数据上报回调函数 ---
|
||||
auto report_to_mqtt = [&](const UnifiedData& data) {
|
||||
std::string topic = "devices/" + data.device_id + "/data";
|
||||
mqtt_client.publish(topic, data.data_json, 1, false);
|
||||
// 使用 post 确保 MQTT 发布操作在 io_context 的主事件循环中执行,保证线程安全
|
||||
g_io_context.post([&, topic, payload = data.data_json]() {
|
||||
mqtt_client.publish(topic, payload, 1, false);
|
||||
});
|
||||
};
|
||||
|
||||
// 配置并启动Modbus轮询器
|
||||
// ModbusDeviceConfig temp_sensor_config = {
|
||||
// .device_id = "temp_sensor_workshop1",
|
||||
// .ip_address = "192.168.1.120", // 您的Modbus设备的IP地址
|
||||
// .port = 502, // Modbus TCP标准端口
|
||||
// .slave_id = 1, // 设备的从站ID
|
||||
// .start_address = 0, // 要读取的第一个寄存器的地址
|
||||
// .quantity = 8, // 从起始地址开始,总共读取多少个寄存器
|
||||
// .poll_interval_ms = 2000 // 每2000毫秒轮询一次
|
||||
// };
|
||||
|
||||
// 创建轮询器实例 (使用std::make_shared确保生命周期)
|
||||
// auto poller = std::make_shared<ModbusMasterPoller>(g_io_context, temp_sensor_config, report_to_mqtt);
|
||||
// poller->start();
|
||||
|
||||
|
||||
// ==========================================================================
|
||||
// modbus rtu轮询服务启动
|
||||
ModbusRtuDeviceConfig temp_sensor_config = {
|
||||
.device_id = "rtu_temp_sensor_lab",
|
||||
.port_path = "/dev/ttyS7",
|
||||
.baud_rate = 9600,
|
||||
.slave_id = 0x01,
|
||||
.poll_interval_ms = 5000,
|
||||
.data_points = {
|
||||
{"temperature", 0x0000, ModbusDataType::INT16, 0.1}, // 地址0, 16位有符号, 结果乘以0.1
|
||||
{"humidity", 0x0001, ModbusDataType::UINT16, 0.1} // 地址1, 16位无符号, 结果乘以0.1
|
||||
}
|
||||
};
|
||||
ModbusRtuPollerService temp_sensor_service(temp_sensor_config, report_to_mqtt);
|
||||
temp_sensor_service.start();
|
||||
|
||||
// 示例2:配置一个水表
|
||||
// ModbusRtuDeviceConfig water_meter_config = {
|
||||
// .device_id = "water_meter_main_gate",
|
||||
// .port_path = "/dev/ttyS7", // 假设在同一条RS485总线上
|
||||
// .baud_rate = 9600,
|
||||
// .slave_id = 0x05, // 不同的从站ID
|
||||
// .poll_interval_ms = 10000,
|
||||
// .data_points = {
|
||||
// {"total_flow", 0x0100, ModbusDataType::FLOAT32, 1.0}, // 地址256, 32位浮点数
|
||||
// {"flow_rate", 0x0102, ModbusDataType::FLOAT32, 1.0} // 地址258, 32位浮点数
|
||||
// }
|
||||
// };
|
||||
// ModbusRtuPollerService water_meter_service(water_meter_config, report_to_mqtt);
|
||||
// water_meter_service.start();
|
||||
|
||||
// --- 4. 实例化设备管理器并从文件加载所有设备 ---
|
||||
DeviceManager device_manager(g_io_context, report_to_mqtt);
|
||||
// 默认从程序运行目录下的 "devices.json" 文件加载
|
||||
device_manager.load_and_start("../config/devices.json");
|
||||
|
||||
// --- 5. 启动主事件循环 (程序将阻塞在这里直到被信号中断) ---
|
||||
spdlog::info("All services are running. Press Ctrl+C to exit.");
|
||||
|
||||
g_io_context.run();
|
||||
|
||||
// --- 清理工作 ---
|
||||
// io_context.run() 返回后,程序将退出 try 块。
|
||||
// device_manager 的析构函数会自动被调用,它会负责停止所有设备服务线程。
|
||||
spdlog::info("Shutting down MQTT client...");
|
||||
mqtt_client.disconnect();
|
||||
|
||||
|
|
@ -151,4 +106,4 @@ int main(int argc, char* argv[]) {
|
|||
|
||||
spdlog::info("Server has been shut down gracefully. Exiting.");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue