报警功能(开发中)
This commit is contained in:
parent
7a5375ef68
commit
1365e95c19
|
|
@ -0,0 +1,88 @@
|
|||
[
|
||||
{
|
||||
"rule_id": "RTU_TEMP_HIGH_CRITICAL",
|
||||
"device_id": "rtu_temp_sensor_lab",
|
||||
"data_point_name": "temperature",
|
||||
"compare_type": "GT",
|
||||
"threshold": 30.0,
|
||||
"level": "CRITICAL",
|
||||
"debounce_seconds": 60,
|
||||
"message_template": "RTU温度传感器'{device_id}' 检测到温度过高 ({value}°C),已达临界值 {threshold}°C!"
|
||||
},
|
||||
{
|
||||
"rule_id": "RTU_HUMIDITY_LOW_WARNING",
|
||||
"device_id": "rtu_temp_sensor_lab",
|
||||
"data_point_name": "humidity",
|
||||
"compare_type": "LT",
|
||||
"threshold": 40.0,
|
||||
"level": "WARNING",
|
||||
"message_template": "RTU温度传感器'{device_id}' 的湿度 ({value}%) 低于警告值 {threshold}%!"
|
||||
},
|
||||
{
|
||||
"rule_id": "ROTARY_ENCODER_COUNT_EXCEED",
|
||||
"device_id": "rotary encoder",
|
||||
"data_point_name": "count",
|
||||
"compare_type": "GT",
|
||||
"threshold": 1000.0,
|
||||
"level": "INFO",
|
||||
"message_template": "旋转编码器 '{device_id}' 计数 ({value}) 超过 {threshold}!"
|
||||
},
|
||||
{
|
||||
"rule_id": "PLC_PRESSURE_HIGH_CRITICAL",
|
||||
"device_id": "plc_workshop1",
|
||||
"data_point_name": "pressure",
|
||||
"compare_type": "GT",
|
||||
"threshold": 10.5,
|
||||
"level": "CRITICAL",
|
||||
"debounce_seconds": 120,
|
||||
"message_template": "PLC设备 '{device_id}' 检测到压力异常升高 ({value} bar),已达临界值 {threshold} bar!"
|
||||
},
|
||||
{
|
||||
"rule_id": "PLC_MOTOR_SPEED_WARNING",
|
||||
"device_id": "plc_workshop1",
|
||||
"data_point_name": "motor_speed",
|
||||
"compare_type": "GT",
|
||||
"threshold": 1500.0,
|
||||
"level": "WARNING",
|
||||
"message_template": "PLC设备 '{device_id}' 电机转速 ({value} RPM) 超过警告值 {threshold} RPM!"
|
||||
},
|
||||
{
|
||||
"rule_id": "PLC_VALVE_STATUS_WARN",
|
||||
"device_id": "plc_workshop1",
|
||||
"data_point_name": "valve_status",
|
||||
"compare_type": "EQ",
|
||||
"threshold": 0.0,
|
||||
"level": "WARNING",
|
||||
"message_template": "PLC设备 '{device_id}' 阀门状态异常 (当前值: {value},期望值: {threshold}不等于0)!"
|
||||
},
|
||||
{
|
||||
"rule_id": "ANY_TEMP_CRITICAL",
|
||||
"device_id": "*",
|
||||
"data_point_name": "temperature",
|
||||
"compare_type": "GT",
|
||||
"threshold": 40.0,
|
||||
"level": "CRITICAL",
|
||||
"debounce_seconds": 300,
|
||||
"message_template": "【全局报警】任意设备 '{device_id}' 的温度 ({value}°C) 达到或超过 {threshold}°C。请立即关注!"
|
||||
},
|
||||
{
|
||||
"rule_id": "SYSTEM_CPU_HIGH",
|
||||
"device_id": "proxy_system",
|
||||
"data_point_name": "cpu_usage",
|
||||
"compare_type": "GT",
|
||||
"threshold": 90.0,
|
||||
"level": "WARNING",
|
||||
"debounce_seconds": 30,
|
||||
"message_template": "代理系统 CPU 利用率 ({value}%) 超过 {threshold}%,请检查!"
|
||||
},
|
||||
{
|
||||
"rule_id": "SYSTEM_MEM_HIGH",
|
||||
"device_id": "proxy_system",
|
||||
"data_point_name": "mem_usage_percentage",
|
||||
"compare_type": "GT",
|
||||
"threshold": 80.0,
|
||||
"level": "WARNING",
|
||||
"debounce_seconds": 30,
|
||||
"message_template": "代理系统内存使用率 ({value}%) 超过 {threshold}%,请检查!"
|
||||
}
|
||||
]
|
||||
|
|
@ -0,0 +1,192 @@
|
|||
#include "alarm_manager.h"
|
||||
#include <fstream>
|
||||
#include <chrono>
|
||||
#include <iomanip> // For std::put_time
|
||||
#include <uuid/uuid.h> // For generating unique IDs
|
||||
|
||||
AlarmManager::AlarmManager() {
|
||||
// 可以在构造函数中做一些初始化
|
||||
}
|
||||
|
||||
bool AlarmManager::load_rules(const std::string& config_path) {
|
||||
std::lock_guard<std::mutex> lock(m_mutex); // 访问规则时加锁
|
||||
spdlog::info("Loading alarm rules from '{}'...", config_path);
|
||||
std::ifstream config_file(config_path);
|
||||
if (!config_file.is_open()) {
|
||||
spdlog::critical("Failed to open alarm rules configuration file: {}", config_path);
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
nlohmann::json rules_json = nlohmann::json::parse(config_file);
|
||||
for (const auto& rule_json : rules_json) {
|
||||
AlarmRule rule;
|
||||
rule.rule_id = rule_json.at("rule_id").get<std::string>();
|
||||
rule.device_id = rule_json.at("device_id").get<std::string>();
|
||||
rule.data_point_name = rule_json.at("data_point_name").get<std::string>();
|
||||
rule.compare_type = rule_json.at("compare_type").get<std::string>();
|
||||
rule.threshold = rule_json.at("threshold").get<double>();
|
||||
|
||||
std::string level_str = rule_json.at("level").get<std::string>();
|
||||
if (level_str == "INFO") rule.level = AlarmLevel::INFO;
|
||||
else if (level_str == "WARNING") rule.level = AlarmLevel::WARNING;
|
||||
else if (level_str == "CRITICAL") rule.level = AlarmLevel::CRITICAL;
|
||||
else {
|
||||
spdlog::warn("Unknown alarm level '{}' for rule '{}', defaulting to INFO.", level_str, rule.rule_id);
|
||||
rule.level = AlarmLevel::INFO;
|
||||
}
|
||||
rule.message_template = rule_json.value("message_template",
|
||||
"设备 {device_id} 的 {data_point_name} ({value}) "
|
||||
"触发了 {level} 报警,条件 {compare_type}{threshold}!");
|
||||
|
||||
// 报警去抖动间隔,默认为0 (不启用)
|
||||
rule.debounce_interval = std::chrono::seconds(rule_json.value("debounce_seconds", 0));
|
||||
|
||||
m_alarm_rules.push_back(rule);
|
||||
m_active_alarms[rule.rule_id] = false; // 初始化所有规则为非活动状态
|
||||
spdlog::debug("Loaded alarm rule: ID={}, Device={}, DataPoint={}, Threshold={}, Debounce={}s",
|
||||
rule.rule_id, rule.device_id, rule.data_point_name, rule.threshold, rule.debounce_interval.count());
|
||||
}
|
||||
} catch (const nlohmann::json::exception& e) {
|
||||
spdlog::critical("Failed to parse alarm rules JSON: {}", e.what());
|
||||
return false;
|
||||
}
|
||||
spdlog::info("Successfully loaded {} alarm rules.", m_alarm_rules.size());
|
||||
return true;
|
||||
}
|
||||
|
||||
void AlarmManager::check_data_for_alarms(const std::string& device_id, const nlohmann::json& data_json) {
|
||||
std::lock_guard<std::mutex> lock(m_mutex); // 保护 m_active_alarms 和 m_last_alarm_time
|
||||
|
||||
for (const auto& rule : m_alarm_rules) {
|
||||
if (rule.device_id != device_id && rule.device_id != "*") { // 支持 "*" 表示应用于所有设备
|
||||
continue;
|
||||
}
|
||||
|
||||
if (data_json.contains(rule.data_point_name) && data_json.at(rule.data_point_name).is_number()) {
|
||||
try {
|
||||
double current_value = data_json.at(rule.data_point_name).get<double>();
|
||||
bool condition_met = evaluate_rule(rule.compare_type, current_value, rule.threshold);
|
||||
bool alarm_was_active = m_active_alarms[rule.rule_id]; // 获取当前状态
|
||||
|
||||
if (condition_met && !alarm_was_active) {
|
||||
// 触发新的报警
|
||||
|
||||
// 检查是否在去抖动间隔内 (如果设置了 debounce_interval)
|
||||
if (rule.debounce_interval.count() > 0) {
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
if (m_last_alarm_time.count(rule.rule_id) &&
|
||||
(now - m_last_alarm_time[rule.rule_id]) < rule.debounce_interval) {
|
||||
spdlog::debug("Alarm for rule '{}' triggered but debounced.", rule.rule_id);
|
||||
continue; // 在去抖动间隔内,不发送报警
|
||||
}
|
||||
}
|
||||
|
||||
m_active_alarms[rule.rule_id] = true; // 设置为活动状态
|
||||
m_last_alarm_time[rule.rule_id] = std::chrono::steady_clock::now(); // 更新最后触发时间
|
||||
|
||||
// 生成唯一ID
|
||||
uuid_t uuid;
|
||||
uuid_generate_time(uuid);
|
||||
char uuid_str[37];
|
||||
uuid_unparse_lower(uuid, uuid_str);
|
||||
|
||||
AlarmEvent event;
|
||||
event.event_id = uuid_str;
|
||||
event.rule_id = rule.rule_id;
|
||||
event.device_id = device_id;
|
||||
event.data_point_name = rule.data_point_name;
|
||||
event.current_value = current_value;
|
||||
event.threshold = rule.threshold;
|
||||
event.level = rule.level;
|
||||
event.timestamp = get_current_timestamp();
|
||||
event.message = generate_alarm_message(rule, current_value);
|
||||
|
||||
for (const auto& callback : m_alarm_callbacks) {
|
||||
callback(event);
|
||||
}
|
||||
spdlog::warn("!!! ALARM TRIGGERED !!! [{}], Device: {}, DataPoint: {}, Value: {}, Rule: {}",
|
||||
alarm_level_to_string(rule.level), device_id, rule.data_point_name, current_value, rule.rule_id);
|
||||
} else if (!condition_met && alarm_was_active) {
|
||||
// 报警条件不再满足,且之前处于活动状态,发送恢复通知 (可选)
|
||||
m_active_alarms[rule.rule_id] = false; // 设置为非活动状态
|
||||
|
||||
// 可以在这里也发送一个报警恢复事件
|
||||
AlarmEvent event;
|
||||
event.event_id = "RECOVER_" + rule.rule_id + "_" + get_current_timestamp();
|
||||
event.rule_id = rule.rule_id;
|
||||
event.device_id = device_id;
|
||||
event.data_point_name = rule.data_point_name;
|
||||
event.current_value = current_value;
|
||||
event.threshold = rule.threshold;
|
||||
event.level = rule.level; // 恢复时可以沿用之前报警的级别
|
||||
event.timestamp = get_current_timestamp();
|
||||
event.message = "ALARM RECOVERED! 设备 " + device_id + " 的 " + rule.data_point_name + " 数据已恢复正常。";
|
||||
|
||||
for (const auto& callback : m_alarm_callbacks) {
|
||||
callback(event); // 也通知恢复事件
|
||||
}
|
||||
|
||||
spdlog::info("ALARM RECOVERED! [{}], Device: {}, DataPoint: {}, Value: {}, Rule: {}",
|
||||
alarm_level_to_string(rule.level), device_id, rule.data_point_name, current_value, rule.rule_id);
|
||||
}
|
||||
} catch (const nlohmann::json::exception& e) {
|
||||
spdlog::warn("Data point '{}' in device '{}' is not a number or incompatible type for alarm check: {}",
|
||||
rule.data_point_name, device_id, e.what());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void AlarmManager::register_alarm_callback(AlarmCallback callback) {
|
||||
std::lock_guard<std::mutex> lock(m_mutex); // 保护 m_alarm_callbacks
|
||||
m_alarm_callbacks.push_back(std::move(callback));
|
||||
}
|
||||
|
||||
bool AlarmManager::evaluate_rule(const std::string& compare_type, double value, double threshold) {
|
||||
if (compare_type == "GT") return value > threshold;
|
||||
if (compare_type == "LT") return value < threshold;
|
||||
if (compare_type == "EQ") return value == threshold;
|
||||
if (compare_type == "GE") return value >= threshold;
|
||||
if (compare_type == "LE") return value <= threshold;
|
||||
// 可以添加更多比较类型,例如 "NE" (不等于)
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string AlarmManager::generate_alarm_message(const AlarmRule& rule, double current_value) {
|
||||
std::string msg = rule.message_template;
|
||||
|
||||
// 替换模板中的占位符
|
||||
// 注意:查找和替换的顺序很重要,以避免替换已经被替换掉的占位符的一部分
|
||||
// 简单的替换逻辑,更健壮的应使用正则表达式或更复杂的模板引擎
|
||||
std::string s_val = std::to_string(current_value);
|
||||
std::string s_th = std::to_string(rule.threshold);
|
||||
std::string s_level = alarm_level_to_string(rule.level);
|
||||
|
||||
size_t pos = std::string::npos;
|
||||
while ((pos = msg.find("{device_id}")) != std::string::npos) msg.replace(pos, 11, rule.device_id);
|
||||
while ((pos = msg.find("{data_point_name}")) != std::string::npos) msg.replace(pos, 17, rule.data_point_name);
|
||||
while ((pos = msg.find("{value}")) != std::string::npos) msg.replace(pos, 7, s_val);
|
||||
while ((pos = msg.find("{threshold}")) != std::string::npos) msg.replace(pos, 11, s_th);
|
||||
while ((pos = msg.find("{level}")) != std::string::npos) msg.replace(pos, 7, s_level);
|
||||
while ((pos = msg.find("{compare_type}")) != std::string::npos) msg.replace(pos, 14, rule.compare_type);
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
std::string AlarmManager::get_current_timestamp() {
|
||||
auto now = std::chrono::system_clock::now();
|
||||
auto in_time_t = std::chrono::system_clock::to_time_t(now);
|
||||
std::stringstream ss;
|
||||
ss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%dT%H:%M:%S"); // ISO 8601 格式
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
std::string AlarmManager::alarm_level_to_string(AlarmLevel level) {
|
||||
switch (level) {
|
||||
case AlarmLevel::INFO: return "INFO";
|
||||
case AlarmLevel::WARNING: return "WARNING";
|
||||
case AlarmLevel::CRITICAL: return "CRITICAL";
|
||||
default: return "UNKNOWN";
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,76 @@
|
|||
#pragma once
|
||||
|
||||
#include "spdlog/spdlog.h"
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <functional>
|
||||
#include <chrono> // For std::chrono
|
||||
#include <mutex> // For std::mutex
|
||||
#include <nlohmann/json.hpp>
|
||||
|
||||
// 报警级别
|
||||
enum class AlarmLevel {
|
||||
INFO,
|
||||
WARNING,
|
||||
CRITICAL
|
||||
};
|
||||
|
||||
// 报警规则结构体
|
||||
struct AlarmRule {
|
||||
std::string rule_id; // 规则唯一ID
|
||||
std::string device_id; // 针对的设备ID
|
||||
std::string data_point_name; // 监控的数据点名称
|
||||
std::string compare_type; // 比较类型: "GT" (>), "LT" (<), "EQ" (=), "GE" (>=), "LE" (<=)
|
||||
double threshold; // 报警阈值
|
||||
AlarmLevel level; // 报警级别
|
||||
std::string message_template; // 报警消息模板,例如 "设备 {device_id} 的 {data_point_name} ({value}) 超过阈值 {threshold}!"
|
||||
std::chrono::seconds debounce_interval; // 报警去抖动/抑制间隔
|
||||
};
|
||||
|
||||
// 报警事件结构体
|
||||
struct AlarmEvent {
|
||||
std::string event_id;
|
||||
std::string rule_id;
|
||||
std::string device_id;
|
||||
std::string data_point_name;
|
||||
double current_value;
|
||||
double threshold;
|
||||
AlarmLevel level;
|
||||
std::string timestamp;
|
||||
std::string message;
|
||||
};
|
||||
|
||||
class AlarmManager {
|
||||
public:
|
||||
using AlarmCallback = std::function<void(const AlarmEvent&)>;
|
||||
|
||||
AlarmManager();
|
||||
|
||||
// 加载报警规则 (例如从配置文件)
|
||||
bool load_rules(const std::string& config_path);
|
||||
|
||||
// 检查数据是否触发报警
|
||||
void check_data_for_alarms(const std::string& device_id, const nlohmann::json& data_json);
|
||||
|
||||
// 注册报警回调函数,当报警触发时调用
|
||||
void register_alarm_callback(AlarmCallback callback);
|
||||
|
||||
// 辅助函数:将报警级别枚举转为字符串 (用于MQTT payload或日志)
|
||||
static std::string alarm_level_to_string(AlarmLevel level);
|
||||
|
||||
private:
|
||||
std::vector<AlarmRule> m_alarm_rules;
|
||||
std::vector<AlarmCallback> m_alarm_callbacks;
|
||||
|
||||
// 记录每个规则上一次触发报警的时间,用于去抖动
|
||||
std::map<std::string, std::chrono::steady_clock::time_point> m_last_alarm_time;
|
||||
// 记录每个规则的当前报警状态,防止重复发送
|
||||
std::map<std::string, bool> m_active_alarms;
|
||||
|
||||
std::mutex m_mutex; // 用于保护 m_last_alarm_time 和 m_active_alarms
|
||||
|
||||
bool evaluate_rule(const std::string& compare_type, double value, double threshold);
|
||||
std::string generate_alarm_message(const AlarmRule& rule, double current_value);
|
||||
std::string get_current_timestamp();
|
||||
};
|
||||
258
src/test.cc
258
src/test.cc
|
|
@ -1,20 +1,20 @@
|
|||
#include "tts/piper_tts_interface.h"
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
// #include "tts/piper_tts_interface.h"
|
||||
// #include <string>
|
||||
// #include <iostream>
|
||||
|
||||
int main() {
|
||||
// 使用默认配置 (piper 和 /app/piper_models/zh_CN-huayan-medium.onnx)
|
||||
PiperTTSInterface tts_speaker;
|
||||
// 如果你的piper可执行文件不在PATH中,或者模型路径不同,你可以这样指定:
|
||||
// PiperTTSInterface tts_speaker("/path/to/your/piper_executable", "/path/to/your/model.onnx");
|
||||
std::string text_to_say = "请执行上述调试步骤,特别是禁用文件删除后手动检查和播放 WAV 文件,这将提供更多线索";
|
||||
if (tts_speaker.say_text_and_play(text_to_say)) {
|
||||
std::cout << "语音播报完成。" << std::endl;
|
||||
} else {
|
||||
std::cerr << "语音播报失败,请检查日志。" << std::endl;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
// int main() {
|
||||
// // 使用默认配置 (piper 和 /app/piper_models/zh_CN-huayan-medium.onnx)
|
||||
// PiperTTSInterface tts_speaker;
|
||||
// // 如果你的piper可执行文件不在PATH中,或者模型路径不同,你可以这样指定:
|
||||
// // PiperTTSInterface tts_speaker("/path/to/your/piper_executable", "/path/to/your/model.onnx");
|
||||
// std::string text_to_say = "请执行上述调试步骤,特别是禁用文件删除后手动检查和播放 WAV 文件,这将提供更多线索";
|
||||
// if (tts_speaker.say_text_and_play(text_to_say)) {
|
||||
// std::cout << "语音播报完成。" << std::endl;
|
||||
// } else {
|
||||
// std::cerr << "语音播报失败,请检查日志。" << std::endl;
|
||||
// }
|
||||
// return 0;
|
||||
// }
|
||||
// #include <iostream>
|
||||
// #include <chrono>
|
||||
// #include <thread>
|
||||
|
|
@ -51,4 +51,228 @@ int main() {
|
|||
// std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
// }
|
||||
// return 0;
|
||||
// }
|
||||
// }
|
||||
// main.cpp
|
||||
|
||||
#include "network/tcp_server.h"
|
||||
#include "mqtt/mqtt_client.h"
|
||||
#include "mqtt/mqtt_router.h"
|
||||
#include "systemMonitor/system_monitor.h"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include "deviceManager/device_manager.h"
|
||||
#include "dataCache/data_cache.h"
|
||||
#include "dataCache/cache_uploader.h"
|
||||
#include "web/web_server.h"
|
||||
#include "dataCache/live_data_cache.h"
|
||||
#include "dataStorage/data_storage.h"
|
||||
#include "alert/alarm_manager.h" // <-- 添加报警管理器头文件
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <csignal>
|
||||
#include <iostream>
|
||||
#include <functional>
|
||||
#include <nlohmann/json.hpp> // 用于解析data_json
|
||||
|
||||
// 用于 ASIO 服务的全局 io_context
|
||||
boost::asio::io_context g_io_context;
|
||||
|
||||
// !!! 你提供的 UnifiedData 结构确认 !!!
|
||||
struct UnifiedData {
|
||||
std::string device_id; // 产生数据的设备唯一ID
|
||||
int64_t timestamp_ms; // 数据产生的时间戳 (毫秒级UTC)
|
||||
std::string data_json; // 采用JSON字符串格式,具有良好的灵活性和可扩展性
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* @brief 周期性轮询系统状态并发布到 MQTT
|
||||
*/
|
||||
void poll_system_metrics(
|
||||
boost::asio::steady_timer& timer,
|
||||
SystemMonitor::SystemMonitor& monitor,
|
||||
MqttClient& mqtt_client,
|
||||
AlarmManager& alarm_manager // <-- 传递 alarm_manager
|
||||
) {
|
||||
if (g_io_context.stopped()) return;
|
||||
auto cpu_util = monitor.getCpuUtilization();
|
||||
auto mem_info = monitor.getMemoryInfo();
|
||||
double mem_total_gb = mem_info.total_kb / 1024.0 / 1024.0;
|
||||
double mem_free_gb = mem_info.free_kb / 1024.0 / 1024.0;
|
||||
double mem_usage_percentage = (mem_total_gb - mem_free_gb) / mem_total_gb * 100.0;
|
||||
|
||||
|
||||
std::string topic = "proxy/system_status";
|
||||
nlohmann::json system_status_json;
|
||||
system_status_json["device_id"] = "proxy_system"; // 统一一个设备ID用于报警
|
||||
system_status_json["timestamp_ms"] = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch()
|
||||
).count();
|
||||
system_status_json["cpu_usage"] = cpu_util.totalUsagePercentage;
|
||||
system_status_json["mem_total_gb"] = mem_total_gb;
|
||||
system_status_json["mem_usage_percentage"] = mem_usage_percentage;
|
||||
|
||||
std::string payload = system_status_json.dump();
|
||||
mqtt_client.publish(topic, payload);
|
||||
spdlog::debug("System metrics published.");
|
||||
|
||||
// !!! 对系统指标进行报警检查 !!!
|
||||
alarm_manager.check_data_for_alarms("proxy_system", system_status_json);
|
||||
|
||||
|
||||
timer.expires_at(timer.expiry() + std::chrono::seconds(15));
|
||||
timer.async_wait(std::bind(poll_system_metrics, std::ref(timer), std::ref(monitor), std::ref(mqtt_client), std::ref(alarm_manager)));
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
|
||||
try {
|
||||
spdlog::set_level(spdlog::level::debug);
|
||||
spdlog::info("Edge Proxy starting up...");
|
||||
} catch (const spdlog::spdlog_ex& ex) {
|
||||
std::cerr << "Log initialization failed: " << ex.what() << std::endl;
|
||||
return 1;
|
||||
}
|
||||
|
||||
spdlog::info("Initializing Data Storage...");
|
||||
if (!DataStorage::getInstance().initialize("edge_proxy_data.db")) {
|
||||
spdlog::critical("Failed to initialize DataStorage. Exiting.");
|
||||
return 1;
|
||||
}
|
||||
|
||||
try {
|
||||
DataCache data_cache;
|
||||
LiveDataCache live_data_cache;
|
||||
MqttClient mqtt_client("tcp://localhost:1883", "edge-proxy-main-client");
|
||||
|
||||
// --- 报警管理器初始化 ---
|
||||
AlarmManager alarm_manager;
|
||||
// 加载报警规则
|
||||
if (!alarm_manager.load_rules("../config/alarms.json")) {
|
||||
spdlog::critical("Failed to load alarm rules. Exiting.");
|
||||
return 1;
|
||||
}
|
||||
|
||||
// 注册报警回调函数:当报警触发时,通过 MQTT 发布报警消息
|
||||
alarm_manager.register_alarm_callback([&](const AlarmEvent& event) {
|
||||
std::string alarm_topic = "alerts/device/" + event.device_id + "/" + AlarmManager::alarm_level_to_string(event.level);
|
||||
|
||||
nlohmann::json alarm_payload;
|
||||
alarm_payload["event_id"] = event.event_id;
|
||||
alarm_payload["rule_id"] = event.rule_id;
|
||||
alarm_payload["device_id"] = event.device_id;
|
||||
alarm_payload["data_point_name"] = event.data_point_name;
|
||||
alarm_payload["current_value"] = event.current_value;
|
||||
alarm_payload["threshold"] = event.threshold;
|
||||
alarm_payload["level"] = AlarmManager::alarm_level_to_string(event.level);
|
||||
alarm_payload["timestamp"] = event.timestamp;
|
||||
alarm_payload["message"] = event.message;
|
||||
|
||||
// 使用 g_io_context.post 将 MQTT 发布操作调度到主线程,避免在回调中直接阻塞
|
||||
g_io_context.post([&, alarm_topic, payload_str = alarm_payload.dump()]() {
|
||||
mqtt_client.publish(alarm_topic, payload_str, 1, false); // QoS 1, 不保留
|
||||
});
|
||||
spdlog::info("Published alarm: TOPIC={} PAYLOAD={}", alarm_topic, alarm_payload.dump());
|
||||
});
|
||||
// --- 报警管理器初始化结束 ---
|
||||
|
||||
|
||||
auto report_to_mqtt = [&](const UnifiedData& data) {
|
||||
// 使用 DataStorage 单例存储处理后的 UnifiedData 对象
|
||||
if (DataStorage::getInstance().storeProcessedData(data)) {
|
||||
spdlog::debug("Successfully stored PROCESSED data for device '{}'", data.device_id);
|
||||
} else {
|
||||
spdlog::error("Failed to store PROCESSED data for device '{}'", data.device_id);
|
||||
}
|
||||
|
||||
// 更新实时数据缓存
|
||||
live_data_cache.update_data(data.device_id, data.data_json);
|
||||
|
||||
// !!! 新增:进行数据比较和报警检查 !!!
|
||||
try {
|
||||
// 将 UnifiedData::data_json (string) 解析为 nlohmann::json 对象
|
||||
// 这假设 data.data_json 总是有效的 JSON
|
||||
nlohmann::json parsed_data_json = nlohmann::json::parse(data.data_json);
|
||||
alarm_manager.check_data_for_alarms(data.device_id, parsed_data_json);
|
||||
} catch (const nlohmann::json::exception& e) {
|
||||
spdlog::error("Failed to parse data_json for alarm checking for device '{}': {}", data.device_id, e.what());
|
||||
}
|
||||
|
||||
if (mqtt_client.is_connected()) {
|
||||
// 网络正常,直接上报
|
||||
std::string topic = "devices/" + data.device_id + "/data";
|
||||
g_io_context.post([&, topic, payload = data.data_json]() {
|
||||
mqtt_client.publish(topic, payload, 1, false);
|
||||
});
|
||||
} else {
|
||||
// 网络断开,写入缓存
|
||||
spdlog::warn("MQTT disconnected. Caching data for device '{}'.", data.device_id);
|
||||
data_cache.add(data);
|
||||
}
|
||||
};
|
||||
|
||||
DeviceManager device_manager(g_io_context, report_to_mqtt);
|
||||
MqttRouter mqtt_router(mqtt_client, device_manager);
|
||||
std::vector<uint16_t> listen_ports = { 12345 };
|
||||
TCPServer tcp_server(g_io_context, listen_ports, mqtt_client);
|
||||
SystemMonitor::SystemMonitor monitor;
|
||||
|
||||
if (!data_cache.open("edge_data_cache.db")) {
|
||||
spdlog::critical("Failed to initialize data cache. Exiting.");
|
||||
return 1;
|
||||
}
|
||||
CacheUploader cache_uploader(g_io_context, mqtt_client, data_cache);
|
||||
|
||||
mqtt_client.set_connected_handler([&](const std::string& cause){
|
||||
spdlog::info("MQTT client connected: {}", cause);
|
||||
cache_uploader.start_upload();
|
||||
});
|
||||
|
||||
mqtt_client.connect();
|
||||
mqtt_router.start();
|
||||
|
||||
|
||||
monitor.getCpuUtilization();
|
||||
boost::asio::steady_timer system_monitor_timer(g_io_context, std::chrono::seconds(15));
|
||||
// 将 alarm_manager 传递给 poll_system_metrics
|
||||
system_monitor_timer.async_wait(std::bind(poll_system_metrics, std::ref(system_monitor_timer), std::ref(monitor), std::ref(mqtt_client), std::ref(alarm_manager)));
|
||||
|
||||
|
||||
device_manager.load_and_start("../config/devices.json");
|
||||
|
||||
WebServer web_server(monitor, device_manager, live_data_cache, 8080);
|
||||
web_server.start();
|
||||
|
||||
boost::asio::signal_set signals(g_io_context, SIGINT, SIGTERM);
|
||||
signals.async_wait([&](const boost::system::error_code& error, int signal_number) {
|
||||
spdlog::warn("Interrupt signal ({}) received. Shutting down.", signal_number);
|
||||
|
||||
// a. 首先停止所有数据采集线程
|
||||
spdlog::info("[Shutdown] A. Stopping device manager services...");
|
||||
device_manager.stop_all();
|
||||
|
||||
// b. 停止Web服务器线程
|
||||
spdlog::info("[Shutdown] B. Stopping web server...");
|
||||
web_server.stop();
|
||||
|
||||
// c. 断开MQTT连接 (这将释放它对io_context的'劫持')
|
||||
spdlog::info("[Shutdown] C. Disconnecting from MQTT broker...");
|
||||
mqtt_client.disconnect();
|
||||
|
||||
// d. 最后,安全地停止io_context
|
||||
spdlog::info("[Shutdown] D. Stopping main event loop...");
|
||||
g_io_context.stop();
|
||||
});
|
||||
|
||||
spdlog::info("All services are running. Press Ctrl+C to exit.");
|
||||
g_io_context.run();
|
||||
|
||||
|
||||
} catch (const std::exception& e) {
|
||||
spdlog::critical("An unhandled exception occurred: {}", e.what());
|
||||
return 1;
|
||||
}
|
||||
|
||||
spdlog::info("Server has been shut down gracefully. Exiting.");
|
||||
return 0;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue