增加告警热重载功能
This commit is contained in:
parent
4e8cb2e4a0
commit
59436351cd
|
|
@ -1,359 +1,433 @@
|
|||
// alarm/alarm_service.cpp
|
||||
#include "alarm_service.h"
|
||||
#include <fstream>
|
||||
|
||||
#include <chrono>
|
||||
#include <fstream>
|
||||
|
||||
AlarmService::AlarmService(boost::asio::io_context& io,
|
||||
PiperTTSInterface& tts_service,
|
||||
MqttClient& mqtt_client,
|
||||
DataStorage& data_storage)
|
||||
: m_io_context(io),
|
||||
PiperTTSInterface& tts_service,
|
||||
MqttClient& mqtt_client, DataStorage& data_storage)
|
||||
: m_io_context(io),
|
||||
m_tts_service(tts_service),
|
||||
m_mqtt_client(mqtt_client),
|
||||
m_data_storage(data_storage),
|
||||
m_tts_running(true) {
|
||||
|
||||
m_tts_thread = std::thread(&AlarmService::tts_worker, this);
|
||||
spdlog::info("AlarmService created and TTS worker thread started.");
|
||||
m_tts_thread = std::thread(&AlarmService::tts_worker, this);
|
||||
spdlog::info("AlarmService created and TTS worker thread started.");
|
||||
}
|
||||
|
||||
AlarmService::~AlarmService() {
|
||||
if (m_tts_running) {
|
||||
stop();
|
||||
}
|
||||
if (m_tts_thread.joinable()) {
|
||||
m_tts_thread.join();
|
||||
}
|
||||
if (m_tts_running) {
|
||||
stop();
|
||||
}
|
||||
if (m_tts_thread.joinable()) {
|
||||
m_tts_thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
void AlarmService::stop() {
|
||||
spdlog::info("Stopping AlarmService TTS worker...");
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_tts_queue_mutex);
|
||||
m_tts_running = false;
|
||||
}
|
||||
m_tts_cv.notify_one(); // 唤醒工作线程使其退出
|
||||
spdlog::info("Stopping AlarmService TTS worker...");
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_tts_queue_mutex);
|
||||
m_tts_running = false;
|
||||
}
|
||||
m_tts_cv.notify_one(); // 唤醒工作线程使其退出
|
||||
}
|
||||
|
||||
bool AlarmService::load_rules(const std::string& config_path) {
|
||||
std::ifstream ifs(config_path);
|
||||
if (!ifs.is_open()) {
|
||||
spdlog::error("Failed to open alarm rules file: {}", config_path);
|
||||
return false;
|
||||
}
|
||||
m_rules_config_path = config_path;
|
||||
|
||||
try {
|
||||
json j_rules = json::parse(ifs);
|
||||
for (const auto& j_rule : j_rules) {
|
||||
AlarmRule rule;
|
||||
rule.rule_id = j_rule.at("rule_id");
|
||||
rule.device_id = j_rule.at("device_id");
|
||||
rule.data_point_name = j_rule.at("data_point_name");
|
||||
rule.compare_type = AlarmRule::stringToCompareType(j_rule.at("compare_type"));
|
||||
rule.threshold = j_rule.at("threshold");
|
||||
rule.level = AlarmRule::stringToLevel(j_rule.at("level"));
|
||||
rule.message_template = j_rule.at("message_template");
|
||||
|
||||
// 可选字段
|
||||
rule.debounce_seconds = j_rule.value("debounce_seconds", 0);
|
||||
rule.alarm_mqtt_topic = j_rule.value("alarm_mqtt_topic", "");
|
||||
rule.clear_message_template = j_rule.value("clear_message_template", "");
|
||||
std::vector<AlarmRule> new_rules;
|
||||
std::map<std::string, AlarmState> new_states;
|
||||
|
||||
if (rule.compare_type == CompareType::UNKNOWN) {
|
||||
spdlog::warn("Skipping rule '{}': Unknown compare_type.", rule.rule_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
m_rules.push_back(std::move(rule));
|
||||
|
||||
// 为每条规则初始化一个状态机
|
||||
m_alarm_states.emplace(
|
||||
std::piecewise_construct,
|
||||
std::forward_as_tuple(rule.rule_id),
|
||||
std::forward_as_tuple(m_io_context)
|
||||
);
|
||||
}
|
||||
} catch (const json::exception& e) {
|
||||
spdlog::error("Failed to parse alarm rules file '{}': {}", config_path, e.what());
|
||||
return false;
|
||||
}
|
||||
if (!parse_rules_from_file(config_path, new_rules, new_states)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
spdlog::info("Successfully loaded {} alarm rules from '{}'.", m_rules.size(), config_path);
|
||||
return true;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_rules_mutex);
|
||||
m_rules = std::move(new_rules);
|
||||
m_alarm_states = std::move(new_states);
|
||||
}
|
||||
|
||||
spdlog::info("Successfully loaded {} alarm rules from '{}'.", m_rules.size(),
|
||||
config_path);
|
||||
return true;
|
||||
}
|
||||
|
||||
void AlarmService::process_device_data(const std::string& device_id, const std::string& data_json) {
|
||||
try {
|
||||
json j_data = json::parse(data_json);
|
||||
if (!j_data.is_object()) return;
|
||||
bool AlarmService::reload_rules() {
|
||||
if (m_rules_config_path.empty()) {
|
||||
spdlog::error(
|
||||
"Cannot reload rules: config path not set (load_rules was never "
|
||||
"called?).");
|
||||
return false;
|
||||
}
|
||||
|
||||
// 遍历所有规则
|
||||
for (auto& rule : m_rules) {
|
||||
// 匹配 device_id (支持通配符 * 或 "proxy_system")
|
||||
if (rule.device_id == device_id || rule.device_id == "*") {
|
||||
// 检查规则关心的数据点是否存在
|
||||
if (j_data.contains(rule.data_point_name)) {
|
||||
if (j_data[rule.data_point_name].is_number()) {
|
||||
double value = j_data[rule.data_point_name].get<double>();
|
||||
// 调用核心逻辑
|
||||
check_rule_against_value(rule, value, device_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
spdlog::info("Attempting to reload alarm rules from '{}'...",
|
||||
m_rules_config_path);
|
||||
|
||||
std::vector<AlarmRule> new_rules;
|
||||
std::map<std::string, AlarmState> new_states;
|
||||
|
||||
if (!parse_rules_from_file(m_rules_config_path, new_rules, new_states)) {
|
||||
spdlog::error(
|
||||
"Failed to parse new rules file, aborting reload. Service continues "
|
||||
"with old rules.");
|
||||
return false;
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_rules_mutex);
|
||||
|
||||
m_rules = std::move(new_rules);
|
||||
m_alarm_states = std::move(new_states);
|
||||
}
|
||||
|
||||
spdlog::info("Successfully reloaded {} alarm rules.", m_rules.size());
|
||||
return true;
|
||||
}
|
||||
|
||||
void AlarmService::process_device_data(const std::string& device_id,
|
||||
const std::string& data_json) {
|
||||
std::lock_guard<std::mutex> lock(m_rules_mutex);
|
||||
try {
|
||||
json j_data = json::parse(data_json);
|
||||
if (!j_data.is_object()) return;
|
||||
|
||||
for (auto& rule : m_rules) {
|
||||
if (rule.device_id == device_id || rule.device_id == "*") {
|
||||
if (j_data.contains(rule.data_point_name)) {
|
||||
if (j_data[rule.data_point_name].is_number()) {
|
||||
double value = j_data[rule.data_point_name].get<double>();
|
||||
check_rule_against_value(rule, value, device_id);
|
||||
}
|
||||
}
|
||||
} catch (const json::exception& e) {
|
||||
spdlog::error("Failed to parse device data for alarm: {}", e.what());
|
||||
}
|
||||
}
|
||||
} catch (const json::exception& e) {
|
||||
spdlog::error("Failed to parse device data for alarm: {}", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void AlarmService::process_system_data(const std::string& system_data_json) {
|
||||
try {
|
||||
json j_data = json::parse(system_data_json);
|
||||
if (!j_data.is_object()) return;
|
||||
|
||||
const std::string device_id = "proxy_system"; // 系统数据的固定ID
|
||||
std::lock_guard<std::mutex> lock(m_rules_mutex);
|
||||
try {
|
||||
json j_data = json::parse(system_data_json);
|
||||
if (!j_data.is_object()) return;
|
||||
|
||||
for (auto& rule : m_rules) {
|
||||
// 只匹配 "proxy_system"
|
||||
if (rule.device_id == device_id) {
|
||||
if (j_data.contains(rule.data_point_name)) {
|
||||
if (j_data[rule.data_point_name].is_number()) {
|
||||
double value = j_data[rule.data_point_name].get<double>();
|
||||
check_rule_against_value(rule, value, device_id);
|
||||
}
|
||||
}
|
||||
const std::string device_id = "proxy_system"; // 系统数据的固定ID
|
||||
|
||||
for (auto& rule : m_rules) {
|
||||
if (rule.device_id == device_id) {
|
||||
if (j_data.contains(rule.data_point_name)) {
|
||||
if (j_data[rule.data_point_name].is_number()) {
|
||||
double value = j_data[rule.data_point_name].get<double>();
|
||||
check_rule_against_value(rule, value, device_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (const json::exception& e) {
|
||||
spdlog::error("Failed to parse system data for alarm: {}", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void AlarmService::check_rule_against_value(
|
||||
AlarmRule& rule, double value, const std::string& actual_device_id) {
|
||||
bool condition_met = false;
|
||||
switch (rule.compare_type) {
|
||||
case CompareType::GT:
|
||||
condition_met = (value > rule.threshold);
|
||||
break;
|
||||
case CompareType::LT:
|
||||
condition_met = (value < rule.threshold);
|
||||
break;
|
||||
case CompareType::EQ:
|
||||
condition_met = (std::abs(value - rule.threshold) < 1e-9);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
auto it = m_alarm_states.find(rule.rule_id);
|
||||
|
||||
if (it == m_alarm_states.end()) {
|
||||
spdlog::warn(
|
||||
"AlarmState for rule '{}' was missing. Creating it on-the-fly.",
|
||||
rule.rule_id);
|
||||
|
||||
auto result = m_alarm_states.emplace(std::piecewise_construct,
|
||||
std::forward_as_tuple(rule.rule_id),
|
||||
std::forward_as_tuple(m_io_context));
|
||||
it = result.first;
|
||||
}
|
||||
|
||||
AlarmState& state = it->second;
|
||||
|
||||
if (condition_met) {
|
||||
if (state.current_state == AlarmStateType::NORMAL) {
|
||||
state.current_state = AlarmStateType::PENDING;
|
||||
|
||||
if (rule.debounce_seconds <= 0) {
|
||||
state.current_state = AlarmStateType::ACTIVE;
|
||||
trigger_alarm_action(rule, value, actual_device_id);
|
||||
} else {
|
||||
spdlog::debug("Alarm PENDING: {}", rule.rule_id);
|
||||
state.debounce_timer.expires_after(
|
||||
std::chrono::seconds(rule.debounce_seconds));
|
||||
|
||||
std::string rule_id = rule.rule_id;
|
||||
|
||||
state.debounce_timer.async_wait([this, rule_id, value,
|
||||
actual_device_id](
|
||||
const boost::system::error_code&
|
||||
ec) {
|
||||
std::lock_guard<std::mutex> lock(m_rules_mutex);
|
||||
|
||||
if (ec == boost::asio::error::operation_aborted) {
|
||||
spdlog::debug("Alarm PENDING cancelled (or reloaded): {}", rule_id);
|
||||
return;
|
||||
}
|
||||
auto it = m_alarm_states.find(rule_id);
|
||||
if (it == m_alarm_states.end()) {
|
||||
spdlog::debug(
|
||||
"Alarm timer fired, but rule '{}' no longer exists (reloaded).",
|
||||
rule_id);
|
||||
return;
|
||||
}
|
||||
AlarmState& current_state = it->second;
|
||||
|
||||
AlarmRule* current_rule = nullptr;
|
||||
for (auto& r : m_rules) {
|
||||
if (r.rule_id == rule_id) {
|
||||
current_rule = &r;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (const json::exception& e) {
|
||||
spdlog::error("Failed to parse system data for alarm: {}", e.what());
|
||||
}
|
||||
|
||||
if (current_rule &&
|
||||
current_state.current_state == AlarmStateType::PENDING) {
|
||||
current_state.current_state = AlarmStateType::ACTIVE;
|
||||
trigger_alarm_action(*current_rule, value, actual_device_id);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
if (state.current_state == AlarmStateType::PENDING) {
|
||||
state.debounce_timer.cancel();
|
||||
state.current_state = AlarmStateType::NORMAL;
|
||||
} else if (state.current_state == AlarmStateType::ACTIVE) {
|
||||
state.current_state = AlarmStateType::NORMAL;
|
||||
clear_alarm(rule, value, actual_device_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void AlarmService::check_rule_against_value(AlarmRule& rule, double value, const std::string& actual_device_id) {
|
||||
bool condition_met = false;
|
||||
switch (rule.compare_type) {
|
||||
case CompareType::GT: condition_met = (value > rule.threshold); break;
|
||||
case CompareType::LT: condition_met = (value < rule.threshold); break;
|
||||
case CompareType::EQ: condition_met = (std::abs(value - rule.threshold) < 1e-9); break;
|
||||
default: break;
|
||||
}
|
||||
std::string AlarmService::format_message(const AlarmRule& rule,
|
||||
const std::string& template_str,
|
||||
double value,
|
||||
const std::string& actual_device_id) {
|
||||
std::string msg = template_str;
|
||||
|
||||
size_t pos = msg.find("{device_id}");
|
||||
if (pos != std::string::npos) {
|
||||
msg.replace(pos, 11, actual_device_id);
|
||||
}
|
||||
|
||||
auto it = m_alarm_states.find(rule.rule_id);
|
||||
pos = msg.find("{value}");
|
||||
if (pos != std::string::npos) {
|
||||
char buffer[32];
|
||||
std::snprintf(buffer, sizeof(buffer), "%.2f", value);
|
||||
msg.replace(pos, 7, buffer);
|
||||
}
|
||||
|
||||
if (it == m_alarm_states.end()) {
|
||||
spdlog::warn("AlarmState for rule '{}' was missing. Creating it on-the-fly.", rule.rule_id);
|
||||
|
||||
auto result = m_alarm_states.emplace(
|
||||
std::piecewise_construct,
|
||||
std::forward_as_tuple(rule.rule_id),
|
||||
std::forward_as_tuple(m_io_context)
|
||||
);
|
||||
it = result.first; // 'it' 现在指向新创建的元素
|
||||
pos = msg.find("{threshold}");
|
||||
if (pos != std::string::npos) {
|
||||
char buffer[32];
|
||||
std::snprintf(buffer, sizeof(buffer), "%.2f", rule.threshold);
|
||||
msg.replace(pos, 11, buffer);
|
||||
}
|
||||
return msg;
|
||||
}
|
||||
|
||||
AlarmState& state = it->second;
|
||||
void AlarmService::trigger_alarm_action(AlarmRule& rule, double value,
|
||||
const std::string& actual_device_id) {
|
||||
std::string message =
|
||||
format_message(rule, rule.message_template, value, actual_device_id);
|
||||
spdlog::warn("[ALARM ACTIVE] (Rule: {}) {}", rule.rule_id, message);
|
||||
|
||||
if (condition_met) {
|
||||
// 条件满足
|
||||
if (state.current_state == AlarmStateType::NORMAL) {
|
||||
state.current_state = AlarmStateType::PENDING;
|
||||
|
||||
if (rule.debounce_seconds <= 0) {
|
||||
// 立即触发
|
||||
state.current_state = AlarmStateType::ACTIVE;
|
||||
trigger_alarm_action(rule, value, actual_device_id);
|
||||
} else {
|
||||
// 启动防抖计时器
|
||||
spdlog::debug("Alarm PENDING: {}", rule.rule_id);
|
||||
state.debounce_timer.expires_after(std::chrono::seconds(rule.debounce_seconds));
|
||||
|
||||
std::string rule_id = rule.rule_id; // 捕获 rule_id
|
||||
|
||||
state.debounce_timer.async_wait(
|
||||
[this, rule_id, value, actual_device_id](const boost::system::error_code& ec) {
|
||||
|
||||
if (ec == boost::asio::error::operation_aborted) {
|
||||
// Timer被取消了 (因为数据恢复正常)
|
||||
spdlog::debug("Alarm PENDING cancelled: {}", rule_id);
|
||||
return;
|
||||
}
|
||||
|
||||
// 计时器正常结束,告警激活
|
||||
AlarmState& current_state = m_alarm_states.at(rule_id);
|
||||
|
||||
// 查找原始规则 (必须重新查找,因为 &rule 引用可能已失效)
|
||||
AlarmRule* current_rule = nullptr;
|
||||
for(auto& r : m_rules) { if (r.rule_id == rule_id) { current_rule = &r; break; } }
|
||||
AlarmEvent event;
|
||||
event.rule_id = rule.rule_id;
|
||||
event.device_id = actual_device_id;
|
||||
event.status = AlarmEventStatus::ACTIVE;
|
||||
event.level = AlarmRule::levelToString(rule.level);
|
||||
event.message = message;
|
||||
event.trigger_value = value;
|
||||
event.timestamp_ms = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch())
|
||||
.count();
|
||||
|
||||
if (current_rule && current_state.current_state == AlarmStateType::PENDING) {
|
||||
current_state.current_state = AlarmStateType::ACTIVE;
|
||||
trigger_alarm_action(*current_rule, value, actual_device_id);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
// else: 如果已经是 PENDING 或 ACTIVE,保持不变 (防止重复触发)
|
||||
|
||||
} else {
|
||||
// 条件不满足 (数据恢复正常)
|
||||
if (state.current_state == AlarmStateType::PENDING) {
|
||||
// 状态:PENDING -> NORMAL
|
||||
// 在防抖期间恢复了,取消计时器
|
||||
state.debounce_timer.cancel();
|
||||
state.current_state = AlarmStateType::NORMAL;
|
||||
} else if (state.current_state == AlarmStateType::ACTIVE) {
|
||||
// 状态:ACTIVE -> NORMAL
|
||||
// 告警已激活,现在恢复了
|
||||
state.current_state = AlarmStateType::NORMAL;
|
||||
clear_alarm(rule, value, actual_device_id);
|
||||
}
|
||||
// else: 已经是 NORMAL,保持不变
|
||||
}
|
||||
// 1. 语音播报 (加入队列)
|
||||
schedule_tts(message);
|
||||
|
||||
// 2. 发布到MQTT
|
||||
std::string topic =
|
||||
rule.alarm_mqtt_topic.empty() ? "proxy/alarms" : rule.alarm_mqtt_topic;
|
||||
m_mqtt_client.publish(topic, AlarmEvent::toJson(event).dump(), 1, false);
|
||||
|
||||
// 3. 写入数据库
|
||||
if (!m_data_storage.storeAlarmEvent(event)) {
|
||||
spdlog::error("Failed to store ACTIVE alarm event for rule: {}",
|
||||
rule.rule_id);
|
||||
}
|
||||
}
|
||||
|
||||
std::string AlarmService::format_message(const AlarmRule& rule, const std::string& template_str, double value, const std::string& actual_device_id) {
|
||||
std::string msg = template_str;
|
||||
|
||||
// 替换 {device_id}
|
||||
size_t pos = msg.find("{device_id}");
|
||||
if (pos != std::string::npos) {
|
||||
msg.replace(pos, 11, actual_device_id);
|
||||
}
|
||||
|
||||
// 替换 {value}
|
||||
pos = msg.find("{value}");
|
||||
if (pos != std::string::npos) {
|
||||
// 格式化浮点数,保留2位小数
|
||||
char buffer[32];
|
||||
std::snprintf(buffer, sizeof(buffer), "%.2f", value);
|
||||
msg.replace(pos, 7, buffer);
|
||||
}
|
||||
void AlarmService::clear_alarm(AlarmRule& rule, double value,
|
||||
const std::string& actual_device_id) {
|
||||
spdlog::info("[ALARM CLEARED] (Rule: {})", rule.rule_id);
|
||||
|
||||
// 替换 {threshold}
|
||||
pos = msg.find("{threshold}");
|
||||
if (pos != std::string::npos) {
|
||||
char buffer[32];
|
||||
std::snprintf(buffer, sizeof(buffer), "%.2f", rule.threshold);
|
||||
msg.replace(pos, 11, buffer);
|
||||
}
|
||||
return msg;
|
||||
}
|
||||
// 检查是否有解除消息模板
|
||||
if (rule.clear_message_template.empty()) {
|
||||
return; // 没有模板,安静地解除
|
||||
}
|
||||
|
||||
void AlarmService::trigger_alarm_action(AlarmRule& rule, double value, const std::string& actual_device_id) {
|
||||
std::string message = format_message(rule, rule.message_template, value, actual_device_id);
|
||||
spdlog::warn("[ALARM ACTIVE] (Rule: {}) {}", rule.rule_id, message);
|
||||
std::string message = format_message(rule, rule.clear_message_template, value,
|
||||
actual_device_id);
|
||||
|
||||
AlarmEvent event;
|
||||
event.rule_id = rule.rule_id;
|
||||
event.device_id = actual_device_id;
|
||||
event.status = AlarmEventStatus::ACTIVE;
|
||||
event.level = AlarmRule::levelToString(rule.level);
|
||||
event.message = message;
|
||||
event.trigger_value = value;
|
||||
event.timestamp_ms = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch()
|
||||
).count();
|
||||
AlarmEvent event;
|
||||
event.rule_id = rule.rule_id;
|
||||
event.device_id = actual_device_id;
|
||||
event.status = AlarmEventStatus::CLEARED;
|
||||
event.level = AlarmRule::levelToString(rule.level);
|
||||
event.message = message;
|
||||
event.trigger_value = value;
|
||||
event.timestamp_ms = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch())
|
||||
.count();
|
||||
|
||||
// 1. 语音播报 (加入队列)
|
||||
schedule_tts(message);
|
||||
// 1. 语音播报 (加入队列)
|
||||
schedule_tts(message);
|
||||
|
||||
// 2. 发布到MQTT
|
||||
std::string topic = rule.alarm_mqtt_topic.empty() ? "proxy/alarms" : rule.alarm_mqtt_topic;
|
||||
m_mqtt_client.publish(topic, AlarmEvent::toJson(event).dump(), 1, false);
|
||||
// 2. 发布到MQTT
|
||||
std::string topic =
|
||||
rule.alarm_mqtt_topic.empty() ? "proxy/alarms" : rule.alarm_mqtt_topic;
|
||||
m_mqtt_client.publish(topic, AlarmEvent::toJson(event).dump(), 1, false);
|
||||
|
||||
// 3. 写入数据库
|
||||
if (!m_data_storage.storeAlarmEvent(event)) {
|
||||
spdlog::error("Failed to store ACTIVE alarm event for rule: {}", rule.rule_id);
|
||||
}
|
||||
}
|
||||
|
||||
void AlarmService::clear_alarm(AlarmRule& rule, double value, const std::string& actual_device_id) {
|
||||
spdlog::info("[ALARM CLEARED] (Rule: {})", rule.rule_id);
|
||||
|
||||
// 检查是否有解除消息模板
|
||||
if (rule.clear_message_template.empty()) {
|
||||
return; // 没有模板,安静地解除
|
||||
}
|
||||
|
||||
std::string message = format_message(rule, rule.clear_message_template, value, actual_device_id);
|
||||
|
||||
AlarmEvent event;
|
||||
event.rule_id = rule.rule_id;
|
||||
event.device_id = actual_device_id;
|
||||
event.status = AlarmEventStatus::CLEARED;
|
||||
event.level = AlarmRule::levelToString(rule.level);
|
||||
event.message = message;
|
||||
event.trigger_value = value;
|
||||
event.timestamp_ms = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch()
|
||||
).count();
|
||||
|
||||
// 1. 语音播报 (加入队列)
|
||||
schedule_tts(message);
|
||||
|
||||
// 2. 发布到MQTT
|
||||
std::string topic = rule.alarm_mqtt_topic.empty() ? "proxy/alarms" : rule.alarm_mqtt_topic;
|
||||
m_mqtt_client.publish(topic, AlarmEvent::toJson(event).dump(), 1, false);
|
||||
|
||||
// 3. 写入数据库
|
||||
if (!m_data_storage.storeAlarmEvent(event)) {
|
||||
spdlog::error("Failed to store CLEARED alarm event for rule: {}", rule.rule_id);
|
||||
}
|
||||
// 3. 写入数据库
|
||||
if (!m_data_storage.storeAlarmEvent(event)) {
|
||||
spdlog::error("Failed to store CLEARED alarm event for rule: {}",
|
||||
rule.rule_id);
|
||||
}
|
||||
}
|
||||
|
||||
// --- TTS 队列和 Web API 实现 ---
|
||||
|
||||
void AlarmService::tts_worker() {
|
||||
while (m_tts_running) {
|
||||
std::string message_to_play;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_tts_queue_mutex);
|
||||
// 等待队列中有消息 或 线程被通知停止
|
||||
m_tts_cv.wait(lock, [this]{ return !m_tts_queue.empty() || !m_tts_running; });
|
||||
|
||||
if (!m_tts_running && m_tts_queue.empty()) {
|
||||
break; // 退出
|
||||
}
|
||||
while (m_tts_running) {
|
||||
std::string message_to_play;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_tts_queue_mutex);
|
||||
// 等待队列中有消息 或 线程被通知停止
|
||||
m_tts_cv.wait(lock,
|
||||
[this] { return !m_tts_queue.empty() || !m_tts_running; });
|
||||
|
||||
if (!m_tts_queue.empty()) {
|
||||
message_to_play = m_tts_queue.front();
|
||||
m_tts_queue.pop();
|
||||
}
|
||||
} // 释放锁
|
||||
if (!m_tts_running && m_tts_queue.empty()) {
|
||||
break; // 退出
|
||||
}
|
||||
|
||||
if (!message_to_play.empty()) {
|
||||
spdlog::debug("TTS worker playing: {}", message_to_play);
|
||||
// 这是阻塞调用,但在独立线程中,不会影响主 io_context
|
||||
m_tts_service.say_text_and_play(message_to_play);
|
||||
}
|
||||
if (!m_tts_queue.empty()) {
|
||||
message_to_play = m_tts_queue.front();
|
||||
m_tts_queue.pop();
|
||||
}
|
||||
} // 释放锁
|
||||
|
||||
if (!message_to_play.empty()) {
|
||||
spdlog::debug("TTS worker playing: {}", message_to_play);
|
||||
// 这是阻塞调用,但在独立线程中,不会影响主 io_context
|
||||
m_tts_service.say_text_and_play(message_to_play);
|
||||
}
|
||||
spdlog::info("TTS worker thread stopped.");
|
||||
}
|
||||
spdlog::info("TTS worker thread stopped.");
|
||||
}
|
||||
|
||||
void AlarmService::schedule_tts(const std::string& text) {
|
||||
if (text.empty() || !m_tts_running) return;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_tts_queue_mutex);
|
||||
m_tts_queue.push(text);
|
||||
}
|
||||
m_tts_cv.notify_one();
|
||||
if (text.empty() || !m_tts_running) return;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_tts_queue_mutex);
|
||||
m_tts_queue.push(text);
|
||||
}
|
||||
m_tts_cv.notify_one();
|
||||
}
|
||||
|
||||
nlohmann::json AlarmService::getActiveAlarmsJson() {
|
||||
auto alarms = m_data_storage.getActiveAlarms();
|
||||
json j_alarms = json::array();
|
||||
for(const auto& alarm : alarms) {
|
||||
j_alarms.push_back(AlarmEvent::toJson(alarm));
|
||||
}
|
||||
return j_alarms;
|
||||
auto alarms = m_data_storage.getActiveAlarms();
|
||||
json j_alarms = json::array();
|
||||
for (const auto& alarm : alarms) {
|
||||
j_alarms.push_back(AlarmEvent::toJson(alarm));
|
||||
}
|
||||
return j_alarms;
|
||||
}
|
||||
|
||||
nlohmann::json AlarmService::getAlarmHistoryJson(int limit) {
|
||||
auto alarms = m_data_storage.getAlarmHistory(limit);
|
||||
json j_alarms = json::array();
|
||||
for(const auto& alarm : alarms) {
|
||||
j_alarms.push_back(AlarmEvent::toJson(alarm));
|
||||
auto alarms = m_data_storage.getAlarmHistory(limit);
|
||||
json j_alarms = json::array();
|
||||
for (const auto& alarm : alarms) {
|
||||
j_alarms.push_back(AlarmEvent::toJson(alarm));
|
||||
}
|
||||
return j_alarms;
|
||||
}
|
||||
|
||||
bool AlarmService::parse_rules_from_file(
|
||||
const std::string& config_path, std::vector<AlarmRule>& out_rules,
|
||||
std::map<std::string, AlarmState>& out_states) {
|
||||
std::ifstream ifs(config_path);
|
||||
if (!ifs.is_open()) {
|
||||
spdlog::error("Failed to open alarm rules file: {}", config_path);
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
json j_rules = json::parse(ifs);
|
||||
|
||||
// 清空临时容器,准备接收新规则
|
||||
out_rules.clear();
|
||||
out_states.clear();
|
||||
|
||||
for (const auto& j_rule : j_rules) {
|
||||
AlarmRule rule;
|
||||
rule.rule_id = j_rule.at("rule_id");
|
||||
rule.device_id = j_rule.at("device_id");
|
||||
rule.data_point_name = j_rule.at("data_point_name");
|
||||
rule.compare_type =
|
||||
AlarmRule::stringToCompareType(j_rule.at("compare_type"));
|
||||
rule.threshold = j_rule.at("threshold");
|
||||
rule.level = AlarmRule::stringToLevel(j_rule.at("level"));
|
||||
rule.message_template = j_rule.at("message_template");
|
||||
rule.debounce_seconds = j_rule.value("debounce_seconds", 0);
|
||||
rule.alarm_mqtt_topic = j_rule.value("alarm_mqtt_topic", "");
|
||||
rule.clear_message_template = j_rule.value("clear_message_template", "");
|
||||
|
||||
if (rule.compare_type == CompareType::UNKNOWN) {
|
||||
spdlog::warn("Skipping rule '{}': Unknown compare_type.", rule.rule_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
out_rules.push_back(std::move(rule));
|
||||
|
||||
// 为每条规则准备一个状态机
|
||||
out_states.emplace(std::piecewise_construct,
|
||||
std::forward_as_tuple(rule.rule_id),
|
||||
std::forward_as_tuple(
|
||||
m_io_context) // m_io_context 是类成员,可以访问
|
||||
);
|
||||
}
|
||||
return j_alarms;
|
||||
} catch (const json::exception& e) {
|
||||
spdlog::error("Failed to parse alarm rules file '{}': {}", config_path,
|
||||
e.what());
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
@ -1,69 +1,76 @@
|
|||
// alarm/alarm_service.h
|
||||
#pragma once
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <condition_variable>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include <queue>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "alarm_defs.h"
|
||||
#include "alarm_event.h"
|
||||
#include "tts/piper_tts_interface.h" // 你的 TTS 接口
|
||||
#include "dataStorage/data_storage.h"
|
||||
#include "mqtt/mqtt_client.h"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <nlohmann/json.hpp>
|
||||
|
||||
#include <vector>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <queue>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include "tts/piper_tts_interface.h" // 你的 TTS 接口
|
||||
|
||||
using json = nlohmann::json;
|
||||
|
||||
class AlarmService {
|
||||
public:
|
||||
AlarmService(boost::asio::io_context& io,
|
||||
PiperTTSInterface& tts_service,
|
||||
MqttClient& mqtt_client,
|
||||
DataStorage& data_storage);
|
||||
|
||||
~AlarmService();
|
||||
public:
|
||||
AlarmService(boost::asio::io_context& io, PiperTTSInterface& tts_service,
|
||||
MqttClient& mqtt_client, DataStorage& data_storage);
|
||||
|
||||
void stop();
|
||||
~AlarmService();
|
||||
|
||||
bool load_rules(const std::string& config_path);
|
||||
void stop();
|
||||
|
||||
void process_device_data(const std::string& device_id, const std::string& data_json);
|
||||
|
||||
void process_system_data(const std::string& system_data_json);
|
||||
bool load_rules(const std::string& config_path);
|
||||
bool reload_rules();
|
||||
|
||||
nlohmann::json getActiveAlarmsJson();
|
||||
nlohmann::json getAlarmHistoryJson(int limit);
|
||||
void process_device_data(const std::string& device_id,
|
||||
const std::string& data_json);
|
||||
|
||||
private:
|
||||
void check_rule_against_value(AlarmRule& rule, double value, const std::string& actual_device_id);
|
||||
void process_system_data(const std::string& system_data_json);
|
||||
|
||||
std::string format_message(const AlarmRule& rule, const std::string& template_str, double value, const std::string& actual_device_id);
|
||||
nlohmann::json getActiveAlarmsJson();
|
||||
nlohmann::json getAlarmHistoryJson(int limit);
|
||||
|
||||
void trigger_alarm_action(AlarmRule& rule, double value, const std::string& actual_device_id);
|
||||
|
||||
void clear_alarm(AlarmRule& rule, double value, const std::string& actual_device_id);
|
||||
private:
|
||||
void check_rule_against_value(AlarmRule& rule, double value,
|
||||
const std::string& actual_device_id);
|
||||
std::string format_message(const AlarmRule& rule,
|
||||
const std::string& template_str, double value,
|
||||
const std::string& actual_device_id);
|
||||
void trigger_alarm_action(AlarmRule& rule, double value,
|
||||
const std::string& actual_device_id);
|
||||
void clear_alarm(AlarmRule& rule, double value,
|
||||
const std::string& actual_device_id);
|
||||
void tts_worker();
|
||||
void schedule_tts(const std::string& text);
|
||||
bool parse_rules_from_file(const std::string& config_path,
|
||||
std::vector<AlarmRule>& out_rules,
|
||||
std::map<std::string, AlarmState>& out_states);
|
||||
|
||||
void tts_worker();
|
||||
void schedule_tts(const std::string& text);
|
||||
boost::asio::io_context& m_io_context;
|
||||
PiperTTSInterface& m_tts_service;
|
||||
MqttClient& m_mqtt_client;
|
||||
DataStorage& m_data_storage;
|
||||
|
||||
boost::asio::io_context& m_io_context;
|
||||
PiperTTSInterface& m_tts_service;
|
||||
MqttClient& m_mqtt_client;
|
||||
DataStorage& m_data_storage;
|
||||
|
||||
std::vector<AlarmRule> m_rules;
|
||||
std::map<std::string, AlarmState> m_alarm_states;
|
||||
std::vector<AlarmRule> m_rules;
|
||||
std::map<std::string, AlarmState> m_alarm_states;
|
||||
|
||||
// TTS 播报队列
|
||||
std::mutex m_tts_queue_mutex;
|
||||
std::condition_variable m_tts_cv;
|
||||
std::queue<std::string> m_tts_queue;
|
||||
std::thread m_tts_thread;
|
||||
bool m_tts_running;
|
||||
std::string m_rules_config_path; // <--- 新增:存储配置文件路径
|
||||
std::mutex m_rules_mutex; // <--- 新增:保护 m_rules 和 m_alarm_states
|
||||
|
||||
// TTS 播报队列
|
||||
std::mutex m_tts_queue_mutex;
|
||||
std::condition_variable m_tts_cv;
|
||||
std::queue<std::string> m_tts_queue;
|
||||
std::thread m_tts_thread;
|
||||
bool m_tts_running;
|
||||
};
|
||||
Loading…
Reference in New Issue