设备管理模块更新

This commit is contained in:
GuanYuankai 2025-11-04 09:51:37 +00:00
parent 367f3a12a7
commit 9a34055d50
7 changed files with 588 additions and 315 deletions

View File

@ -1,16 +1,4 @@
[
{
"rule_id": "TeST",
"device_id": "rtu_temp_sensor_lab",
"data_point_name": "temperature",
"compare_type": "EQ",
"threshold": 5,
"level": "CRITICAL",
"debounce_seconds": 0,
"message_template": "测试用",
"clear_message_template": "测试解除",
"alarm_mqtt_topic": "alarms/rtu_test"
},
{
"rule_id": "RTU_TEMP_HIGH_CRITICAL",
"device_id": "rtu_temp_sensor_lab",

View File

@ -34,13 +34,13 @@
"name": "count",
"address": 1,
"type": "INT16",
"scale": 1.0
"scale": 1
},
{
"name": "total_count",
"address": 2,
"type": "INT16",
"scale": 1.0
"scale": 1
}
]
},
@ -73,7 +73,7 @@
"name": "motor_speed",
"address": 100,
"type": "UINT16",
"scale": 1.0
"scale": 1
},
{
"name": "pressure",
@ -85,7 +85,7 @@
"name": "valve_status",
"address": 104,
"type": "UINT16",
"scale": 1.0
"scale": 1
}
]
}

0
config/video_config.json Normal file
View File

View File

@ -2,270 +2,439 @@
#include "device_manager.h"
#include "modbus/modbus_common.h"
#include "spdlog/spdlog.h"
#include <nlohmann/json.hpp>
#include <fstream>
#include <map>
#include <nlohmann/json.hpp>
#include <set>
#include <sstream>
using json = nlohmann::json;
static ModbusDataType string_to_modbus_data_type(const std::string& type_str) {
static const std::map<std::string, ModbusDataType> type_map = {
{"UINT16", ModbusDataType::UINT16},
{"INT16", ModbusDataType::INT16},
{"UINT32", ModbusDataType::UINT32},
{"INT32", ModbusDataType::INT32},
{"FLOAT32", ModbusDataType::FLOAT32}
};
auto it = type_map.find(type_str);
if (it != type_map.end()) {
return it->second;
}
throw std::runtime_error("Unknown ModbusDataType string: " + type_str);
static ModbusDataType string_to_modbus_data_type(const std::string &type_str) {
static const std::map<std::string, ModbusDataType> type_map = {
{"UINT16", ModbusDataType::UINT16},
{"INT16", ModbusDataType::INT16},
{"UINT32", ModbusDataType::UINT32},
{"INT32", ModbusDataType::INT32},
{"FLOAT32", ModbusDataType::FLOAT32}};
auto it = type_map.find(type_str);
if (it != type_map.end()) {
return it->second;
}
throw std::runtime_error("Unknown ModbusDataType string: " + type_str);
}
DeviceManager::DeviceManager(boost::asio::io_context& io_context, ReportDataCallback report_cb)
: m_io_context(io_context), m_report_callback(std::move(report_cb)) {}
DeviceManager::DeviceManager(boost::asio::io_context &io_context,
ReportDataCallback report_cb)
: m_io_context(io_context), m_report_callback(std::move(report_cb)) {}
DeviceManager::~DeviceManager() {
stop_all();
DeviceManager::~DeviceManager() { stop_all(); }
ModbusRtuDeviceConfig DeviceManager::_parse_rtu_config(const json &dev_json) {
ModbusRtuDeviceConfig config;
config.device_id = dev_json.at("device_id").get<std::string>();
config.port_path = dev_json.at("port_path").get<std::string>();
config.baud_rate = dev_json.at("baud_rate").get<unsigned int>();
config.slave_id = dev_json.at("slave_id").get<uint8_t>();
config.poll_interval_ms = dev_json.at("poll_interval_ms").get<int>();
for (const auto &dp_json : dev_json.at("data_points")) {
config.data_points.push_back(
{dp_json.at("name").get<std::string>(),
(uint16_t)dp_json.at("address").get<int>(),
string_to_modbus_data_type(dp_json.at("type").get<std::string>()),
dp_json.value("scale", 1.0)});
}
return config;
}
ModbusRtuDeviceConfig DeviceManager::_parse_rtu_config(const json& dev_json) {
ModbusRtuDeviceConfig config;
config.device_id = dev_json.at("device_id").get<std::string>();
config.port_path = dev_json.at("port_path").get<std::string>();
config.baud_rate = dev_json.at("baud_rate").get<unsigned int>();
config.slave_id = dev_json.at("slave_id").get<uint8_t>();
config.poll_interval_ms = dev_json.at("poll_interval_ms").get<int>();
ModbusTcpDeviceConfig DeviceManager::_parse_tcp_config(const json &dev_json) {
ModbusTcpDeviceConfig config;
config.device_id = dev_json.at("device_id").get<std::string>();
config.ip_address = dev_json.at("ip_address").get<std::string>();
config.port = dev_json.at("port").get<uint16_t>();
config.slave_id = dev_json.at("slave_id").get<uint8_t>();
config.poll_interval_ms = dev_json.at("poll_interval_ms").get<int>();
for (const auto& dp_json : dev_json.at("data_points")) {
config.data_points.push_back({
dp_json.at("name").get<std::string>(),
(uint16_t)dp_json.at("address").get<int>(),
string_to_modbus_data_type(dp_json.at("type").get<std::string>()),
dp_json.value("scale", 1.0)
});
}
return config;
for (const auto &dp_json : dev_json.at("data_points")) {
config.data_points.push_back(
{dp_json.at("name").get<std::string>(),
(uint16_t)dp_json.at("address").get<int>(),
string_to_modbus_data_type(dp_json.at("type").get<std::string>()),
dp_json.value("scale", 1.0)});
}
return config;
}
ModbusTcpDeviceConfig DeviceManager::_parse_tcp_config(const json& dev_json) {
ModbusTcpDeviceConfig config;
config.device_id = dev_json.at("device_id").get<std::string>();
config.ip_address = dev_json.at("ip_address").get<std::string>();
config.port = dev_json.at("port").get<uint16_t>();
config.slave_id = dev_json.at("slave_id").get<uint8_t>();
config.poll_interval_ms = dev_json.at("poll_interval_ms").get<int>();
for (const auto& dp_json : dev_json.at("data_points")) {
config.data_points.push_back({
dp_json.at("name").get<std::string>(),
(uint16_t)dp_json.at("address").get<int>(),
string_to_modbus_data_type(dp_json.at("type").get<std::string>()),
dp_json.value("scale", 1.0)
});
void DeviceManager::_validate_device_config(const nlohmann::json &config_json) {
std::set<std::string> all_device_ids;
// 辅助 lambda用于检查 data_points
auto validate_data_points = [](const nlohmann::json &data_points_json,
const std::string &device_id) {
if (!data_points_json.is_array() || data_points_json.empty()) {
throw std::runtime_error("Device '" + device_id +
"' must have a non-empty 'data_points' array.");
}
return config;
for (const auto &dp_json : data_points_json) {
dp_json.at("name").get<std::string>();
dp_json.at("address").get<int>();
dp_json.at("type").get<std::string>();
// .value("scale", 1.0) 是可选的, 不需检查
}
};
if (config_json.contains("modbus_rtu_devices")) {
if (!config_json["modbus_rtu_devices"].is_array()) {
throw std::runtime_error("'modbus_rtu_devices' must be an array.");
}
for (const auto &dev_json : config_json["modbus_rtu_devices"]) {
if (!dev_json.is_object())
continue;
const auto &id = dev_json.at("device_id").get<std::string>();
dev_json.at("port_path").get<std::string>();
dev_json.at("baud_rate").get<unsigned int>();
dev_json.at("slave_id").get<uint8_t>();
dev_json.at("poll_interval_ms").get<int>();
validate_data_points(dev_json.at("data_points"), id);
if (!all_device_ids.insert(id).second) {
throw std::runtime_error("Duplicate device_id found: " + id);
}
}
}
if (config_json.contains("modbus_tcp_devices")) {
if (!config_json["modbus_tcp_devices"].is_array()) {
throw std::runtime_error("'modbus_tcp_devices' must be an array.");
}
for (const auto &dev_json : config_json["modbus_tcp_devices"]) {
if (!dev_json.is_object())
continue;
const auto &id = dev_json.at("device_id").get<std::string>();
dev_json.at("ip_address").get<std::string>();
dev_json.at("port").get<uint16_t>();
dev_json.at("slave_id").get<uint8_t>();
dev_json.at("poll_interval_ms").get<int>();
validate_data_points(dev_json.at("data_points"), id);
if (!all_device_ids.insert(id).second) {
throw std::runtime_error("Duplicate device_id found: " + id);
}
}
}
if (config_json.contains("modbus_rtu_bus_configs")) {
if (!config_json["modbus_rtu_bus_configs"].is_object()) {
throw std::runtime_error(
"'modbus_rtu_bus_configs' must be an object (map).");
}
for (auto it = config_json["modbus_rtu_bus_configs"].begin();
it != config_json["modbus_rtu_bus_configs"].end(); ++it) {
it.value().at("inter_device_delay_ms").get<int>();
}
}
}
void DeviceManager::load_and_start(const std::string &config_path) {
{
std::lock_guard<std::mutex> lock(m_config_file_mutex);
m_config_path = config_path;
}
void DeviceManager::load_and_start(const std::string& config_path) {
std::lock_guard<std::mutex> lock(m_mutex);
spdlog::info("Loading device configuration from '{}'...", config_path);
std::ifstream config_file(config_path);
if (!config_file.is_open()) {
spdlog::critical("Failed to open configuration file: {}", config_path);
throw std::runtime_error("Configuration file not found.");
}
// 原始的启动逻辑
std::lock_guard<std::mutex> lock(m_mutex);
spdlog::info("Loading device configuration from '{}'...", config_path);
std::ifstream config_file(config_path);
if (!config_file.is_open()) {
spdlog::critical("Failed to open configuration file: {}", config_path);
throw std::runtime_error("Configuration file not found.");
}
try {
json config_json = json::parse(config_file);
_parse_and_apply_config(config_json);
try {
json config_json = json::parse(config_file);
} catch (const json::exception& e) { //
spdlog::critical("Failed to parse JSON configuration: {}", e.what());
throw; //
}
_validate_device_config(config_json);
_parse_and_apply_config(config_json);
} catch (const std::exception &e) { // 捕获所有错误 (parse, validate, apply)
spdlog::critical("Failed to parse, validate, or apply configuration: {}",
e.what());
throw;
}
}
void DeviceManager::post_apply_device_configuration(const std::string& json_payload) {
spdlog::debug("Posting config update task to io_context.");
boost::asio::post(m_io_context, [this, json_payload]() {
this->_apply_config_task(json_payload);
});
void DeviceManager::post_apply_device_configuration(
const std::string &json_payload) {
spdlog::debug("Posting config update task to io_context.");
boost::asio::post(m_io_context, [this, json_payload]() {
this->_apply_config_task(json_payload);
});
}
void DeviceManager::_apply_config_task(std::string json_payload) {
std::lock_guard<std::mutex> lock(m_mutex);
spdlog::info("Applying new device configuration (from MQTT)...");
try {
json config_json = json::parse(json_payload);
_parse_and_apply_config(config_json);
spdlog::info("Successfully applied new device configuration.");
} catch (const json::exception& e) {
spdlog::error("Failed to parse and apply new config: {}", e.what());
}
std::lock_guard<std::mutex> lock(m_mutex);
spdlog::info("Applying new device configuration...");
try {
json config_json = json::parse(json_payload);
// 在解析和应用之前,首先进行严格验证
_validate_device_config(config_json);
_parse_and_apply_config(config_json);
spdlog::info("Successfully applied new device configuration.");
} catch (const std::exception &e) { //
spdlog::error("Failed to parse, validate, and apply new config: {}",
e.what());
}
}
void DeviceManager::_parse_and_apply_config(const json& config_json) {
std::map<std::string, std::vector<ModbusRtuDeviceConfig>> new_rtu_groups;
if (config_json.contains("modbus_rtu_devices")) {
for (const auto& dev_json : config_json["modbus_rtu_devices"]) {
if (!dev_json.value("enabled", false)) continue;
auto config = _parse_rtu_config(dev_json);
new_rtu_groups[config.port_path].push_back(config);
}
void DeviceManager::_parse_and_apply_config(const json &config_json) {
std::map<std::string, std::vector<ModbusRtuDeviceConfig>> new_rtu_groups;
if (config_json.contains("modbus_rtu_devices")) {
for (const auto &dev_json : config_json["modbus_rtu_devices"]) {
if (!dev_json.value("enabled", false))
continue;
auto config = _parse_rtu_config(dev_json);
new_rtu_groups[config.port_path].push_back(config);
}
}
std::map<std::string, ModbusTcpDeviceConfig> new_tcp_configs;
if (config_json.contains("modbus_tcp_devices")) {
for (const auto &dev_json : config_json["modbus_tcp_devices"]) {
if (!dev_json.value("enabled", false))
continue;
auto config = _parse_tcp_config(dev_json);
new_tcp_configs[config.device_id] = config;
}
}
for (auto it = m_rtu_bus_services.begin(); it != m_rtu_bus_services.end();) {
const std::string &port_path = it->first;
if (new_rtu_groups.find(port_path) == new_rtu_groups.end()) {
spdlog::info("Config update: Stopping RTU service for port '{}'.",
port_path);
it->second->stop();
it = m_rtu_bus_services.erase(it);
} else {
++it;
}
}
for (const auto &pair : new_rtu_groups) {
const std::string &port_path = pair.first;
const auto &devices_on_bus = pair.second;
auto it = m_rtu_bus_services.find(port_path);
if (it != m_rtu_bus_services.end()) {
spdlog::info("Config update: Restarting RTU service for port '{}'...",
port_path);
it->second->stop();
m_rtu_bus_services.erase(it);
} else {
spdlog::info("Config update: Starting new RTU service for port '{}'...",
port_path);
}
std::map<std::string, ModbusTcpDeviceConfig> new_tcp_configs;
if (config_json.contains("modbus_tcp_devices")) {
for (const auto& dev_json : config_json["modbus_tcp_devices"]) {
if (!dev_json.value("enabled", false)) continue;
auto config = _parse_tcp_config(dev_json);
new_tcp_configs[config.device_id] = config;
}
auto service = std::make_unique<ModbusRtuBusService>(
port_path, devices_on_bus, m_report_callback);
service->start();
m_rtu_bus_services[port_path] = std::move(service);
spdlog::info(
"Started Modbus RTU Bus service for port '{}' with {} device(s).",
port_path, devices_on_bus.size());
}
for (auto it = m_tcp_pollers.begin(); it != m_tcp_pollers.end();) {
const std::string &device_id = it->first;
if (new_tcp_configs.find(device_id) == new_tcp_configs.end()) {
spdlog::info("Config update: Stopping TCP poller for device '{}'.",
device_id);
it->second->stop();
it = m_tcp_pollers.erase(it);
} else {
++it;
}
for (auto it = m_rtu_bus_services.begin(); it != m_rtu_bus_services.end(); ) {
const std::string& port_path = it->first;
if (new_rtu_groups.find(port_path) == new_rtu_groups.end()) {
spdlog::info("Config update: Stopping RTU service for port '{}'.", port_path);
it->second->stop();
it = m_rtu_bus_services.erase(it);
} else {
++it;
}
}
for (const auto &pair : new_tcp_configs) {
const std::string &device_id = pair.first;
const auto &config = pair.second;
auto it = m_tcp_pollers.find(device_id);
if (it != m_tcp_pollers.end()) {
spdlog::info("Config update: Restarting TCP poller for device '{}'...",
device_id);
it->second->stop();
m_tcp_pollers.erase(it);
} else {
spdlog::info("Config update: Starting new TCP poller for device '{}'...",
device_id);
}
// 启动新/更新 RTU 服务
for (const auto& pair : new_rtu_groups) {
const std::string& port_path = pair.first;
const auto& devices_on_bus = pair.second;
auto it = m_rtu_bus_services.find(port_path);
if (it != m_rtu_bus_services.end()) {
// TODO: 高级实现可以检查配置是否真的改变了
spdlog::info("Config update: Restarting RTU service for port '{}'...", port_path);
it->second->stop();
m_rtu_bus_services.erase(it);
} else {
spdlog::info("Config update: Starting new RTU service for port '{}'...", port_path);
}
auto service = std::make_unique<ModbusRtuBusService>(port_path, devices_on_bus, m_report_callback);
service->start();
m_rtu_bus_services[port_path] = std::move(service);
spdlog::info("Started Modbus RTU Bus service for port '{}' with {} device(s).", port_path, devices_on_bus.size());
}
for (auto it = m_tcp_pollers.begin(); it != m_tcp_pollers.end();) {
const std::string& device_id = it->first;
if (new_tcp_configs.find(device_id) == new_tcp_configs.end()) {
spdlog::info("Config update: Stopping TCP poller for device '{}'.", device_id);
it->second->stop();
it = m_tcp_pollers.erase(it);
} else {
++it;
}
}
for (const auto& pair : new_tcp_configs) {
const std::string& device_id = pair.first;
const auto& config = pair.second;
auto it = m_tcp_pollers.find(device_id);
if (it != m_tcp_pollers.end()) {
spdlog::info("Config update: Restarting TCP poller for device '{}'...", device_id);
it->second->stop();
m_tcp_pollers.erase(it);
} else {
spdlog::info("Config update: Starting new TCP poller for device '{}'...", device_id);
}
auto poller = std::make_shared<ModbusMasterPoller>(m_io_context, config, m_report_callback);
poller->start();
m_tcp_pollers[device_id] = poller;
}
auto poller = std::make_shared<ModbusMasterPoller>(m_io_context, config,
m_report_callback);
poller->start();
m_tcp_pollers[device_id] = poller;
}
}
void DeviceManager::stop_all() {
std::lock_guard<std::mutex> lock(m_mutex);
spdlog::info("Stopping all device services...");
for (auto& pair : m_rtu_bus_services) {
pair.second->stop();
}
m_rtu_bus_services.clear();
for (auto& pair : m_tcp_pollers) {
pair.second->stop();
}
m_tcp_pollers.clear();
std::lock_guard<std::mutex> lock(m_mutex);
spdlog::info("Stopping all device services...");
spdlog::info("All device services stopped.");
for (auto &pair : m_rtu_bus_services) {
pair.second->stop();
}
m_rtu_bus_services.clear();
for (auto &pair : m_tcp_pollers) {
pair.second->stop();
}
m_tcp_pollers.clear();
spdlog::info("All device services stopped.");
}
std::vector<DeviceInfo> DeviceManager::get_all_device_info() const {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_mutex);
std::vector<DeviceInfo> all_devices;
for (const auto& pair : m_rtu_bus_services) {
const auto& service = pair.second;
const auto& configs_on_bus = service->get_all_device_configs();
for (const auto& config : configs_on_bus) {
DeviceInfo info;
info.id = config.device_id;
info.type = "ModbusRTU";
info.is_running = service->is_running();
info.connection_details["Port Path"] = config.port_path;
info.connection_details["Baud Rate"] = std::to_string(config.baud_rate);
info.connection_details["Slave ID"] = std::to_string(config.slave_id);
all_devices.push_back(info);
}
std::vector<DeviceInfo> all_devices;
for (const auto &pair : m_rtu_bus_services) {
const auto &service = pair.second;
const auto &configs_on_bus = service->get_all_device_configs();
for (const auto &config : configs_on_bus) {
DeviceInfo info;
info.id = config.device_id;
info.type = "ModbusRTU";
info.is_running = service->is_running();
info.connection_details["Port Path"] = config.port_path;
info.connection_details["Baud Rate"] = std::to_string(config.baud_rate);
info.connection_details["Slave ID"] = std::to_string(config.slave_id);
all_devices.push_back(info);
}
}
for (const auto& pair : m_tcp_pollers) {
const auto& poller = pair.second;
const auto& config = poller->get_config();
for (const auto &pair : m_tcp_pollers) {
const auto &poller = pair.second;
const auto &config = poller->get_config();
DeviceInfo info;
info.id = config.device_id;
info.type = "ModbusTCP";
info.is_running = poller->is_running();
info.connection_details["IP Address"] = config.ip_address;
info.connection_details["Port"] = std::to_string(config.port);
info.connection_details["Slave ID"] = std::to_string(config.slave_id);
all_devices.push_back(info);
}
DeviceInfo info;
info.id = config.device_id;
info.type = "ModbusTCP";
info.is_running = poller->is_running();
info.connection_details["IP Address"] = config.ip_address;
info.connection_details["Port"] = std::to_string(config.port);
info.connection_details["Slave ID"] = std::to_string(config.slave_id);
all_devices.push_back(info);
}
return all_devices;
return all_devices;
}
bool DeviceManager::send_control_command(const std::string& device_id, uint16_t address, uint16_t value) {
std::lock_guard<std::mutex> lock(m_mutex);
bool DeviceManager::send_control_command(const std::string &device_id,
uint16_t address, uint16_t value) {
std::lock_guard<std::mutex> lock(m_mutex);
auto tcp_it = m_tcp_pollers.find(device_id);
if (tcp_it != m_tcp_pollers.end()) {
spdlog::info("Found TCP device '{}'. Dispatching write command to address {}.", device_id, address);
tcp_it->second->write_single_register(address, value);
return true;
auto tcp_it = m_tcp_pollers.find(device_id);
if (tcp_it != m_tcp_pollers.end()) {
spdlog::info(
"Found TCP device '{}'. Dispatching write command to address {}.",
device_id, address);
tcp_it->second->write_single_register(address, value);
return true;
}
for (const auto &pair : m_rtu_bus_services) {
const auto &service = pair.second;
if (service->manages_device(device_id)) {
spdlog::info("Found RTU device '{}' on a bus. Dispatching write command.",
device_id);
service->write_single_register(device_id, address, value);
return true;
}
}
for (const auto& pair : m_rtu_bus_services) {
const auto& service = pair.second;
if (service->manages_device(device_id)) {
spdlog::info("Found RTU device '{}' on a bus. Dispatching write command.", device_id);
service->write_single_register(device_id, address, value);
return true;
}
}
spdlog::warn("send_control_command failed: Device with ID '{}' not found in "
"any service.",
device_id);
return false;
}
spdlog::warn("send_control_command failed: Device with ID '{}' not found in any service.", device_id); //
return false;
std::string DeviceManager::get_config_as_json_string() const {
std::lock_guard<std::mutex> lock(m_config_file_mutex);
if (m_config_path.empty()) {
spdlog::warn("m_config_path is empty, cannot read config.");
return "{}";
}
std::ifstream config_file(m_config_path);
if (!config_file.is_open()) {
spdlog::error("Failed to open configuration file for reading: {}",
m_config_path);
return "{}";
}
std::stringstream buffer;
buffer << config_file.rdbuf();
return buffer.str();
}
bool DeviceManager::save_config_from_json_string(
const std::string &json_content) {
std::lock_guard<std::mutex> lock(m_config_file_mutex);
if (m_config_path.empty()) {
spdlog::error("m_config_path is empty, cannot save config.");
return false;
}
// 1. 验证 JSON 格式和 Schema
try {
json config_json = json::parse(json_content);
// 在写入文件之前,确保配置是有效的
_validate_device_config(config_json);
} catch (const std::exception &e) { // 捕获 json::parse 和 _validate 的异常
spdlog::error("Failed to save config: Invalid JSON format or schema. {}",
e.what());
return false; // 返回 false, web_server 会返回 400
}
// 2. 写入文件
std::ofstream config_file(m_config_path);
if (!config_file.is_open()) {
spdlog::error("Failed to open configuration file for writing: {}",
m_config_path);
return false;
}
config_file << json_content;
spdlog::info("Successfully saved new device configuration to {}",
m_config_path);
return true;
}
bool DeviceManager::reload_config_from_file() {
spdlog::info("Posting task to reload device config from file...");
std::string config_content;
try {
// 必须先读取,因为 post_apply_device_configuration 需要 payload
config_content = get_config_as_json_string();
} catch (const std::exception &e) {
spdlog::error("Failed to read config file for reload: {}", e.what());
return false;
}
// 复用现有的、线程安全的配置应用方法
post_apply_device_configuration(config_content);
return true;
}

View File

@ -2,25 +2,25 @@
#ifndef DEVICE_MANAGER_H
#define DEVICE_MANAGER_H
#include "protocol/iprotocol_adapter.h"
#include "modbus/modbus_rtu_bus_service.h"
#include "modbus/modbus_master_poller.h"
#include "modbus/modbus_rtu_bus_service.h"
#include "protocol/iprotocol_adapter.h"
#include <boost/asio.hpp>
#include <vector>
#include <map>
#include <memory>
#include <string>
#include <mutex>
#include <map>
#include <nlohmann/json.hpp> //
#include <nlohmann/json.hpp>
#include <string>
#include <vector>
/**
* @brief API层传递设备信息的统一结构体
*/
struct DeviceInfo {
std::string id;
std::string type;
bool is_running;
std::map<std::string, std::string> connection_details;
std::string id;
std::string type;
bool is_running;
std::map<std::string, std::string> connection_details;
};
/**
@ -30,76 +30,109 @@ struct DeviceInfo {
*/
class DeviceManager {
public:
DeviceManager(boost::asio::io_context& io_context, ReportDataCallback report_cb);
~DeviceManager();
DeviceManager(boost::asio::io_context &io_context,
ReportDataCallback report_cb);
~DeviceManager();
// 禁止拷贝和赋值
DeviceManager(const DeviceManager&) = delete;
DeviceManager& operator=(const DeviceManager&) = delete;
// 禁止拷贝和赋值
DeviceManager(const DeviceManager &) = delete;
DeviceManager &operator=(const DeviceManager &) = delete;
/**
* @brief JSON配置文件加载所有设备并启动服务
* @param config_path JSON配置文件的路径
*/
void load_and_start(const std::string& config_path); //
/**
* @brief [] JSON配置文件加载所有设备并启动服务
* @param config_path JSON配置文件的路径
*/
void load_and_start(const std::string &config_path);
/**
* @brief (线) MQTT 线
* post io_context 线
* @param json_payload JSON
*/
void post_apply_device_configuration(const std::string& json_payload);
/**
* @brief (线) MQTT 线
* post io_context 线
* @param json_payload JSON
*/
void post_apply_device_configuration(const std::string &json_payload);
/**
* @brief
* @return vector
*/
std::vector<DeviceInfo> get_all_device_info() const; //
/**
* @brief
* @return vector
*/
std::vector<DeviceInfo> get_all_device_info() const;
/**
* @brief
*/
void stop_all(); //
/**
* @brief
*/
void stop_all();
/**
* @brief Modbus设备发送一个写单个寄存器的命令
* @param device_id ID
* @param address
* @param value
* @return truefalse
*/
bool send_control_command(const std::string& device_id, uint16_t address, uint16_t value); //
/**
* @brief Modbus设备发送一个写单个寄存器的命令
* @param device_id ID
* @param address
* @param value
* @return truefalse
*/
bool send_control_command(const std::string &device_id, uint16_t address,
uint16_t value);
/**
* @brief (线)
* @return JSON
*/
std::string get_config_as_json_string() const;
/**
* @brief (线) JSON
* @param json_content JSON
* @return true, ( JSON schema ) false
*/
bool save_config_from_json_string(const std::string &json_content);
/**
* @brief (线)
* ( post_apply... )
* @return true
*/
bool reload_config_from_file();
private:
/**
* @brief (, io_context 线)
*
*/
void _apply_config_task(std::string json_payload);
/**
* @brief (, io_context 线)
*
*/
void _apply_config_task(std::string json_payload);
/**
* @brief (, )
* "Diff"
*/
void _parse_and_apply_config(const nlohmann::json& config_json);
/**
* @brief (, )
* "Diff"
*/
void _parse_and_apply_config(const nlohmann::json &config_json);
/**
* @brief (, ) Modbus RTU
*/
ModbusRtuDeviceConfig _parse_rtu_config(const nlohmann::json& dev_json);
/**
* @brief () JSON schema device_id
* @param config_json nlohmann::json
* @throws std::runtime_error
*/
void _validate_device_config(const nlohmann::json &config_json);
/**
* @brief (, ) Modbus TCP
*/
ModbusTcpDeviceConfig _parse_tcp_config(const nlohmann::json& dev_json);
/**
* @brief (, ) Modbus RTU
*/
ModbusRtuDeviceConfig _parse_rtu_config(const nlohmann::json &dev_json);
/**
* @brief (, ) Modbus TCP
*/
ModbusTcpDeviceConfig _parse_tcp_config(const nlohmann::json &dev_json);
private:
boost::asio::io_context& m_io_context;
ReportDataCallback m_report_callback;
boost::asio::io_context &m_io_context;
ReportDataCallback m_report_callback;
std::map<std::string, std::unique_ptr<ModbusRtuBusService>> m_rtu_bus_services;
std::map<std::string, std::shared_ptr<ModbusMasterPoller>> m_tcp_pollers;
mutable std::mutex m_mutex;
std::map<std::string, std::unique_ptr<ModbusRtuBusService>>
m_rtu_bus_services;
std::map<std::string, std::shared_ptr<ModbusMasterPoller>> m_tcp_pollers;
mutable std::mutex m_mutex;
std::string m_config_path; // 存储配置文件的路径
mutable std::mutex m_config_file_mutex; // 保护配置文件读写的专用互斥锁
};
#endif // DEVICE_MANAGER_H

View File

@ -28,10 +28,11 @@ boost::asio::io_context g_io_context;
/**
* @brief MQTT
*/
void poll_system_metrics(boost::asio::steady_timer& timer,
SystemMonitor::SystemMonitor& monitor,
MqttClient& mqtt_client, AlarmService& alarm_service) {
if (g_io_context.stopped()) return;
void poll_system_metrics(boost::asio::steady_timer &timer,
SystemMonitor::SystemMonitor &monitor,
MqttClient &mqtt_client, AlarmService &alarm_service) {
if (g_io_context.stopped())
return;
auto cpu_util = monitor.getCpuUtilization();
auto mem_info = monitor.getMemoryInfo();
double mem_total_gb = mem_info.total_kb / 1024.0 / 1024.0;
@ -56,7 +57,7 @@ void poll_system_metrics(boost::asio::steady_timer& timer,
payload = payload_json.dump();
} catch (const nlohmann::json::parse_error& e) {
} catch (const nlohmann::json::parse_error &e) {
spdlog::error("Failed to parse thermalInfo JSON: {}. Sending partial data.",
e.what());
nlohmann::json fallback_json;
@ -80,7 +81,7 @@ void poll_system_metrics(boost::asio::steady_timer& timer,
std::ref(alarm_service)));
}
int main(int argc, char* argv[]) {
int main(int argc, char *argv[]) {
const std::string config_path = "/app/config/config.json";
if (!ConfigManager::getInstance().load(config_path)) {
std::cerr << "Failed to load configuration from " << config_path
@ -88,18 +89,18 @@ int main(int argc, char* argv[]) {
<< std::endl;
}
auto& config = ConfigManager::getInstance();
auto &config = ConfigManager::getInstance();
try {
spdlog::set_level(spdlog::level::from_str(config.getLogLevel()));
spdlog::info("Edge Proxy starting up...");
} catch (const spdlog::spdlog_ex& ex) {
} catch (const spdlog::spdlog_ex &ex) {
std::cerr << "Log initialization failed: " << ex.what() << std::endl;
return 1;
}
spdlog::info("Initializing Data Storage...");
auto& data_storage = DataStorage::getInstance();
auto &data_storage = DataStorage::getInstance();
if (!data_storage.initialize(config.getDataStorageDbPath())) {
spdlog::critical("Failed to initialize DataStorage. Exiting.");
return 1;
@ -122,7 +123,7 @@ int main(int argc, char* argv[]) {
spdlog::error("Failed to load alarm rules. Alarms may be disabled.");
}
auto report_to_mqtt = [&](const UnifiedData& data) {
auto report_to_mqtt = [&](const UnifiedData &data) {
if (data_storage.storeProcessedData(data)) {
spdlog::debug("Successfully stored PROCESSED data for device '{}'",
data.device_id);
@ -161,7 +162,7 @@ int main(int argc, char* argv[]) {
}
CacheUploader cache_uploader(g_io_context, mqtt_client, data_cache);
mqtt_client.set_connected_handler([&](const std::string& cause) {
mqtt_client.set_connected_handler([&](const std::string &cause) {
spdlog::info("MQTT client connected: {}", cause);
cache_uploader.start_upload();
});
@ -190,7 +191,7 @@ int main(int argc, char* argv[]) {
boost::asio::signal_set signals(g_io_context, SIGINT, SIGTERM);
signals.async_wait(
[&](const boost::system::error_code& error, int signal_number) {
[&](const boost::system::error_code &error, int signal_number) {
spdlog::warn("Interrupt signal ({}) received. Shutting down.",
signal_number);
@ -222,7 +223,7 @@ int main(int argc, char* argv[]) {
spdlog::info("All services are running. Press Ctrl+C to exit.");
g_io_context.run();
} catch (const std::exception& e) {
} catch (const std::exception &e) {
spdlog::critical("An unhandled exception occurred: {}", e.what());
return 1;
}

View File

@ -89,8 +89,9 @@ void WebServer::setup_routes() {
devices_json.push_back(std::move(device_obj));
}
return crow::json::wvalue(devices_json);
auto res = crow::response(crow::json::wvalue(devices_json));
res.set_header("Content-Type", "application/json");
return res;
});
CROW_ROUTE((*this), "/api/data/latest").methods("GET"_method)([this] {
@ -100,8 +101,12 @@ void WebServer::setup_routes() {
for (const auto &pair : latest_data_map) {
response[pair.first] = crow::json::load(pair.second);
}
return response;
// 1. 将 JSON 对象转换为 crow::response
auto res = crow::response(response);
// 2. 明确设置 Content-Type 头部
res.set_header("Content-Type", "application/json");
// 3. 返回 response 对象
return res;
});
CROW_ROUTE((*this), "/api/alarms/active").methods("GET"_method)([this] {
@ -252,4 +257,81 @@ void WebServer::setup_routes() {
return res;
}
});
// --- [!! 新增路由: 设备配置管理 !!] ---
/**
* @brief GET /api/devices/config
* (devices.json) JSON
*/
CROW_ROUTE((*this), "/api/devices/config").methods("GET"_method)([this]() {
std::string rules_json_string =
m_device_manager.get_config_as_json_string();
auto res = crow::response(200, rules_json_string);
res.set_header("Content-Type", "application/json");
return res;
});
/**
* @brief POST /api/devices/config
* (devices.json) JSON
*/
CROW_ROUTE((*this), "/api/devices/config")
.methods("POST"_method)([this](const crow::request &req) {
const std::string &new_rules_content = req.body;
// save_config_from_json_string 内部包含 JSON 格式和 Schema 校验
bool success =
m_device_manager.save_config_from_json_string(new_rules_content);
if (success) {
crow::json::wvalue response_json;
response_json["status"] = "success";
response_json["message"] = "Device config saved successfully. A "
"reload is required to apply.";
auto res = crow::response(200, response_json.dump());
res.set_header("Content-Type", "application/json");
return res;
} else {
crow::json::wvalue error_json;
error_json["status"] = "error";
error_json["message"] = "Failed to save rules. Invalid JSON format "
"or schema. Check service logs.";
auto res = crow::response(400, error_json.dump());
res.set_header("Content-Type", "application/json");
return res;
}
});
/**
* @brief POST /api/devices/reload
* devices.json
*/
CROW_ROUTE((*this), "/api/devices/reload").methods("POST"_method)([this]() {
spdlog::info("Web API: Received request to reload device rules...");
bool success = m_device_manager.reload_config_from_file();
if (success) {
crow::json::wvalue response_json;
response_json["status"] = "success";
response_json["message"] = "Device rules reload posted successfully.";
auto res = crow::response(200, response_json.dump());
res.set_header("Content-Type", "application/json");
return res;
} else {
crow::json::wvalue error_json;
error_json["status"] = "error";
error_json["message"] =
"Failed to post device rules reload. Check service logs.";
auto res = crow::response(500, error_json.dump());
res.set_header("Content-Type", "application/json");
return res;
}
});
}