diff --git a/src/alarm/alarm_service.cc b/src/alarm/alarm_service.cc index 6a8d018..8538d05 100644 --- a/src/alarm/alarm_service.cc +++ b/src/alarm/alarm_service.cc @@ -1,359 +1,433 @@ // alarm/alarm_service.cpp #include "alarm_service.h" -#include + #include +#include AlarmService::AlarmService(boost::asio::io_context& io, - PiperTTSInterface& tts_service, - MqttClient& mqtt_client, - DataStorage& data_storage) - : m_io_context(io), + PiperTTSInterface& tts_service, + MqttClient& mqtt_client, DataStorage& data_storage) + : m_io_context(io), m_tts_service(tts_service), m_mqtt_client(mqtt_client), m_data_storage(data_storage), m_tts_running(true) { - - m_tts_thread = std::thread(&AlarmService::tts_worker, this); - spdlog::info("AlarmService created and TTS worker thread started."); + m_tts_thread = std::thread(&AlarmService::tts_worker, this); + spdlog::info("AlarmService created and TTS worker thread started."); } AlarmService::~AlarmService() { - if (m_tts_running) { - stop(); - } - if (m_tts_thread.joinable()) { - m_tts_thread.join(); - } + if (m_tts_running) { + stop(); + } + if (m_tts_thread.joinable()) { + m_tts_thread.join(); + } } void AlarmService::stop() { - spdlog::info("Stopping AlarmService TTS worker..."); - { - std::lock_guard lock(m_tts_queue_mutex); - m_tts_running = false; - } - m_tts_cv.notify_one(); // 唤醒工作线程使其退出 + spdlog::info("Stopping AlarmService TTS worker..."); + { + std::lock_guard lock(m_tts_queue_mutex); + m_tts_running = false; + } + m_tts_cv.notify_one(); // 唤醒工作线程使其退出 } bool AlarmService::load_rules(const std::string& config_path) { - std::ifstream ifs(config_path); - if (!ifs.is_open()) { - spdlog::error("Failed to open alarm rules file: {}", config_path); - return false; - } + m_rules_config_path = config_path; - try { - json j_rules = json::parse(ifs); - for (const auto& j_rule : j_rules) { - AlarmRule rule; - rule.rule_id = j_rule.at("rule_id"); - rule.device_id = j_rule.at("device_id"); - rule.data_point_name = j_rule.at("data_point_name"); - rule.compare_type = AlarmRule::stringToCompareType(j_rule.at("compare_type")); - rule.threshold = j_rule.at("threshold"); - rule.level = AlarmRule::stringToLevel(j_rule.at("level")); - rule.message_template = j_rule.at("message_template"); - - // 可选字段 - rule.debounce_seconds = j_rule.value("debounce_seconds", 0); - rule.alarm_mqtt_topic = j_rule.value("alarm_mqtt_topic", ""); - rule.clear_message_template = j_rule.value("clear_message_template", ""); + std::vector new_rules; + std::map new_states; - if (rule.compare_type == CompareType::UNKNOWN) { - spdlog::warn("Skipping rule '{}': Unknown compare_type.", rule.rule_id); - continue; - } - - m_rules.push_back(std::move(rule)); - - // 为每条规则初始化一个状态机 - m_alarm_states.emplace( - std::piecewise_construct, - std::forward_as_tuple(rule.rule_id), - std::forward_as_tuple(m_io_context) - ); - } - } catch (const json::exception& e) { - spdlog::error("Failed to parse alarm rules file '{}': {}", config_path, e.what()); - return false; - } + if (!parse_rules_from_file(config_path, new_rules, new_states)) { + return false; + } - spdlog::info("Successfully loaded {} alarm rules from '{}'.", m_rules.size(), config_path); - return true; + { + std::lock_guard lock(m_rules_mutex); + m_rules = std::move(new_rules); + m_alarm_states = std::move(new_states); + } + + spdlog::info("Successfully loaded {} alarm rules from '{}'.", m_rules.size(), + config_path); + return true; } -void AlarmService::process_device_data(const std::string& device_id, const std::string& data_json) { - try { - json j_data = json::parse(data_json); - if (!j_data.is_object()) return; +bool AlarmService::reload_rules() { + if (m_rules_config_path.empty()) { + spdlog::error( + "Cannot reload rules: config path not set (load_rules was never " + "called?)."); + return false; + } - // 遍历所有规则 - for (auto& rule : m_rules) { - // 匹配 device_id (支持通配符 * 或 "proxy_system") - if (rule.device_id == device_id || rule.device_id == "*") { - // 检查规则关心的数据点是否存在 - if (j_data.contains(rule.data_point_name)) { - if (j_data[rule.data_point_name].is_number()) { - double value = j_data[rule.data_point_name].get(); - // 调用核心逻辑 - check_rule_against_value(rule, value, device_id); - } - } - } + spdlog::info("Attempting to reload alarm rules from '{}'...", + m_rules_config_path); + + std::vector new_rules; + std::map new_states; + + if (!parse_rules_from_file(m_rules_config_path, new_rules, new_states)) { + spdlog::error( + "Failed to parse new rules file, aborting reload. Service continues " + "with old rules."); + return false; + } + + { + std::lock_guard lock(m_rules_mutex); + + m_rules = std::move(new_rules); + m_alarm_states = std::move(new_states); + } + + spdlog::info("Successfully reloaded {} alarm rules.", m_rules.size()); + return true; +} + +void AlarmService::process_device_data(const std::string& device_id, + const std::string& data_json) { + std::lock_guard lock(m_rules_mutex); + try { + json j_data = json::parse(data_json); + if (!j_data.is_object()) return; + + for (auto& rule : m_rules) { + if (rule.device_id == device_id || rule.device_id == "*") { + if (j_data.contains(rule.data_point_name)) { + if (j_data[rule.data_point_name].is_number()) { + double value = j_data[rule.data_point_name].get(); + check_rule_against_value(rule, value, device_id); + } } - } catch (const json::exception& e) { - spdlog::error("Failed to parse device data for alarm: {}", e.what()); + } } + } catch (const json::exception& e) { + spdlog::error("Failed to parse device data for alarm: {}", e.what()); + } } void AlarmService::process_system_data(const std::string& system_data_json) { - try { - json j_data = json::parse(system_data_json); - if (!j_data.is_object()) return; - - const std::string device_id = "proxy_system"; // 系统数据的固定ID + std::lock_guard lock(m_rules_mutex); + try { + json j_data = json::parse(system_data_json); + if (!j_data.is_object()) return; - for (auto& rule : m_rules) { - // 只匹配 "proxy_system" - if (rule.device_id == device_id) { - if (j_data.contains(rule.data_point_name)) { - if (j_data[rule.data_point_name].is_number()) { - double value = j_data[rule.data_point_name].get(); - check_rule_against_value(rule, value, device_id); - } - } + const std::string device_id = "proxy_system"; // 系统数据的固定ID + + for (auto& rule : m_rules) { + if (rule.device_id == device_id) { + if (j_data.contains(rule.data_point_name)) { + if (j_data[rule.data_point_name].is_number()) { + double value = j_data[rule.data_point_name].get(); + check_rule_against_value(rule, value, device_id); + } + } + } + } + } catch (const json::exception& e) { + spdlog::error("Failed to parse system data for alarm: {}", e.what()); + } +} + +void AlarmService::check_rule_against_value( + AlarmRule& rule, double value, const std::string& actual_device_id) { + bool condition_met = false; + switch (rule.compare_type) { + case CompareType::GT: + condition_met = (value > rule.threshold); + break; + case CompareType::LT: + condition_met = (value < rule.threshold); + break; + case CompareType::EQ: + condition_met = (std::abs(value - rule.threshold) < 1e-9); + break; + default: + break; + } + + auto it = m_alarm_states.find(rule.rule_id); + + if (it == m_alarm_states.end()) { + spdlog::warn( + "AlarmState for rule '{}' was missing. Creating it on-the-fly.", + rule.rule_id); + + auto result = m_alarm_states.emplace(std::piecewise_construct, + std::forward_as_tuple(rule.rule_id), + std::forward_as_tuple(m_io_context)); + it = result.first; + } + + AlarmState& state = it->second; + + if (condition_met) { + if (state.current_state == AlarmStateType::NORMAL) { + state.current_state = AlarmStateType::PENDING; + + if (rule.debounce_seconds <= 0) { + state.current_state = AlarmStateType::ACTIVE; + trigger_alarm_action(rule, value, actual_device_id); + } else { + spdlog::debug("Alarm PENDING: {}", rule.rule_id); + state.debounce_timer.expires_after( + std::chrono::seconds(rule.debounce_seconds)); + + std::string rule_id = rule.rule_id; + + state.debounce_timer.async_wait([this, rule_id, value, + actual_device_id]( + const boost::system::error_code& + ec) { + std::lock_guard lock(m_rules_mutex); + + if (ec == boost::asio::error::operation_aborted) { + spdlog::debug("Alarm PENDING cancelled (or reloaded): {}", rule_id); + return; + } + auto it = m_alarm_states.find(rule_id); + if (it == m_alarm_states.end()) { + spdlog::debug( + "Alarm timer fired, but rule '{}' no longer exists (reloaded).", + rule_id); + return; + } + AlarmState& current_state = it->second; + + AlarmRule* current_rule = nullptr; + for (auto& r : m_rules) { + if (r.rule_id == rule_id) { + current_rule = &r; + break; } - } - } catch (const json::exception& e) { - spdlog::error("Failed to parse system data for alarm: {}", e.what()); + } + + if (current_rule && + current_state.current_state == AlarmStateType::PENDING) { + current_state.current_state = AlarmStateType::ACTIVE; + trigger_alarm_action(*current_rule, value, actual_device_id); + } + }); + } } + + } else { + if (state.current_state == AlarmStateType::PENDING) { + state.debounce_timer.cancel(); + state.current_state = AlarmStateType::NORMAL; + } else if (state.current_state == AlarmStateType::ACTIVE) { + state.current_state = AlarmStateType::NORMAL; + clear_alarm(rule, value, actual_device_id); + } + } } -void AlarmService::check_rule_against_value(AlarmRule& rule, double value, const std::string& actual_device_id) { - bool condition_met = false; - switch (rule.compare_type) { - case CompareType::GT: condition_met = (value > rule.threshold); break; - case CompareType::LT: condition_met = (value < rule.threshold); break; - case CompareType::EQ: condition_met = (std::abs(value - rule.threshold) < 1e-9); break; - default: break; - } +std::string AlarmService::format_message(const AlarmRule& rule, + const std::string& template_str, + double value, + const std::string& actual_device_id) { + std::string msg = template_str; + size_t pos = msg.find("{device_id}"); + if (pos != std::string::npos) { + msg.replace(pos, 11, actual_device_id); + } -auto it = m_alarm_states.find(rule.rule_id); + pos = msg.find("{value}"); + if (pos != std::string::npos) { + char buffer[32]; + std::snprintf(buffer, sizeof(buffer), "%.2f", value); + msg.replace(pos, 7, buffer); + } -if (it == m_alarm_states.end()) { - spdlog::warn("AlarmState for rule '{}' was missing. Creating it on-the-fly.", rule.rule_id); - - auto result = m_alarm_states.emplace( - std::piecewise_construct, - std::forward_as_tuple(rule.rule_id), - std::forward_as_tuple(m_io_context) - ); - it = result.first; // 'it' 现在指向新创建的元素 + pos = msg.find("{threshold}"); + if (pos != std::string::npos) { + char buffer[32]; + std::snprintf(buffer, sizeof(buffer), "%.2f", rule.threshold); + msg.replace(pos, 11, buffer); + } + return msg; } -AlarmState& state = it->second; +void AlarmService::trigger_alarm_action(AlarmRule& rule, double value, + const std::string& actual_device_id) { + std::string message = + format_message(rule, rule.message_template, value, actual_device_id); + spdlog::warn("[ALARM ACTIVE] (Rule: {}) {}", rule.rule_id, message); - if (condition_met) { - // 条件满足 - if (state.current_state == AlarmStateType::NORMAL) { - state.current_state = AlarmStateType::PENDING; - - if (rule.debounce_seconds <= 0) { - // 立即触发 - state.current_state = AlarmStateType::ACTIVE; - trigger_alarm_action(rule, value, actual_device_id); - } else { - // 启动防抖计时器 - spdlog::debug("Alarm PENDING: {}", rule.rule_id); - state.debounce_timer.expires_after(std::chrono::seconds(rule.debounce_seconds)); - - std::string rule_id = rule.rule_id; // 捕获 rule_id - - state.debounce_timer.async_wait( - [this, rule_id, value, actual_device_id](const boost::system::error_code& ec) { - - if (ec == boost::asio::error::operation_aborted) { - // Timer被取消了 (因为数据恢复正常) - spdlog::debug("Alarm PENDING cancelled: {}", rule_id); - return; - } - - // 计时器正常结束,告警激活 - AlarmState& current_state = m_alarm_states.at(rule_id); - - // 查找原始规则 (必须重新查找,因为 &rule 引用可能已失效) - AlarmRule* current_rule = nullptr; - for(auto& r : m_rules) { if (r.rule_id == rule_id) { current_rule = &r; break; } } + AlarmEvent event; + event.rule_id = rule.rule_id; + event.device_id = actual_device_id; + event.status = AlarmEventStatus::ACTIVE; + event.level = AlarmRule::levelToString(rule.level); + event.message = message; + event.trigger_value = value; + event.timestamp_ms = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); - if (current_rule && current_state.current_state == AlarmStateType::PENDING) { - current_state.current_state = AlarmStateType::ACTIVE; - trigger_alarm_action(*current_rule, value, actual_device_id); - } - }); - } - } - // else: 如果已经是 PENDING 或 ACTIVE,保持不变 (防止重复触发) - - } else { - // 条件不满足 (数据恢复正常) - if (state.current_state == AlarmStateType::PENDING) { - // 状态:PENDING -> NORMAL - // 在防抖期间恢复了,取消计时器 - state.debounce_timer.cancel(); - state.current_state = AlarmStateType::NORMAL; - } else if (state.current_state == AlarmStateType::ACTIVE) { - // 状态:ACTIVE -> NORMAL - // 告警已激活,现在恢复了 - state.current_state = AlarmStateType::NORMAL; - clear_alarm(rule, value, actual_device_id); - } - // else: 已经是 NORMAL,保持不变 - } + // 1. 语音播报 (加入队列) + schedule_tts(message); + + // 2. 发布到MQTT + std::string topic = + rule.alarm_mqtt_topic.empty() ? "proxy/alarms" : rule.alarm_mqtt_topic; + m_mqtt_client.publish(topic, AlarmEvent::toJson(event).dump(), 1, false); + + // 3. 写入数据库 + if (!m_data_storage.storeAlarmEvent(event)) { + spdlog::error("Failed to store ACTIVE alarm event for rule: {}", + rule.rule_id); + } } -std::string AlarmService::format_message(const AlarmRule& rule, const std::string& template_str, double value, const std::string& actual_device_id) { - std::string msg = template_str; - - // 替换 {device_id} - size_t pos = msg.find("{device_id}"); - if (pos != std::string::npos) { - msg.replace(pos, 11, actual_device_id); - } - - // 替换 {value} - pos = msg.find("{value}"); - if (pos != std::string::npos) { - // 格式化浮点数,保留2位小数 - char buffer[32]; - std::snprintf(buffer, sizeof(buffer), "%.2f", value); - msg.replace(pos, 7, buffer); - } +void AlarmService::clear_alarm(AlarmRule& rule, double value, + const std::string& actual_device_id) { + spdlog::info("[ALARM CLEARED] (Rule: {})", rule.rule_id); - // 替换 {threshold} - pos = msg.find("{threshold}"); - if (pos != std::string::npos) { - char buffer[32]; - std::snprintf(buffer, sizeof(buffer), "%.2f", rule.threshold); - msg.replace(pos, 11, buffer); - } - return msg; -} + // 检查是否有解除消息模板 + if (rule.clear_message_template.empty()) { + return; // 没有模板,安静地解除 + } -void AlarmService::trigger_alarm_action(AlarmRule& rule, double value, const std::string& actual_device_id) { - std::string message = format_message(rule, rule.message_template, value, actual_device_id); - spdlog::warn("[ALARM ACTIVE] (Rule: {}) {}", rule.rule_id, message); + std::string message = format_message(rule, rule.clear_message_template, value, + actual_device_id); - AlarmEvent event; - event.rule_id = rule.rule_id; - event.device_id = actual_device_id; - event.status = AlarmEventStatus::ACTIVE; - event.level = AlarmRule::levelToString(rule.level); - event.message = message; - event.trigger_value = value; - event.timestamp_ms = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch() - ).count(); + AlarmEvent event; + event.rule_id = rule.rule_id; + event.device_id = actual_device_id; + event.status = AlarmEventStatus::CLEARED; + event.level = AlarmRule::levelToString(rule.level); + event.message = message; + event.trigger_value = value; + event.timestamp_ms = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); - // 1. 语音播报 (加入队列) - schedule_tts(message); + // 1. 语音播报 (加入队列) + schedule_tts(message); - // 2. 发布到MQTT - std::string topic = rule.alarm_mqtt_topic.empty() ? "proxy/alarms" : rule.alarm_mqtt_topic; - m_mqtt_client.publish(topic, AlarmEvent::toJson(event).dump(), 1, false); + // 2. 发布到MQTT + std::string topic = + rule.alarm_mqtt_topic.empty() ? "proxy/alarms" : rule.alarm_mqtt_topic; + m_mqtt_client.publish(topic, AlarmEvent::toJson(event).dump(), 1, false); - // 3. 写入数据库 - if (!m_data_storage.storeAlarmEvent(event)) { - spdlog::error("Failed to store ACTIVE alarm event for rule: {}", rule.rule_id); - } -} - -void AlarmService::clear_alarm(AlarmRule& rule, double value, const std::string& actual_device_id) { - spdlog::info("[ALARM CLEARED] (Rule: {})", rule.rule_id); - - // 检查是否有解除消息模板 - if (rule.clear_message_template.empty()) { - return; // 没有模板,安静地解除 - } - - std::string message = format_message(rule, rule.clear_message_template, value, actual_device_id); - - AlarmEvent event; - event.rule_id = rule.rule_id; - event.device_id = actual_device_id; - event.status = AlarmEventStatus::CLEARED; - event.level = AlarmRule::levelToString(rule.level); - event.message = message; - event.trigger_value = value; - event.timestamp_ms = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch() - ).count(); - - // 1. 语音播报 (加入队列) - schedule_tts(message); - - // 2. 发布到MQTT - std::string topic = rule.alarm_mqtt_topic.empty() ? "proxy/alarms" : rule.alarm_mqtt_topic; - m_mqtt_client.publish(topic, AlarmEvent::toJson(event).dump(), 1, false); - - // 3. 写入数据库 - if (!m_data_storage.storeAlarmEvent(event)) { - spdlog::error("Failed to store CLEARED alarm event for rule: {}", rule.rule_id); - } + // 3. 写入数据库 + if (!m_data_storage.storeAlarmEvent(event)) { + spdlog::error("Failed to store CLEARED alarm event for rule: {}", + rule.rule_id); + } } // --- TTS 队列和 Web API 实现 --- void AlarmService::tts_worker() { - while (m_tts_running) { - std::string message_to_play; - { - std::unique_lock lock(m_tts_queue_mutex); - // 等待队列中有消息 或 线程被通知停止 - m_tts_cv.wait(lock, [this]{ return !m_tts_queue.empty() || !m_tts_running; }); - - if (!m_tts_running && m_tts_queue.empty()) { - break; // 退出 - } + while (m_tts_running) { + std::string message_to_play; + { + std::unique_lock lock(m_tts_queue_mutex); + // 等待队列中有消息 或 线程被通知停止 + m_tts_cv.wait(lock, + [this] { return !m_tts_queue.empty() || !m_tts_running; }); - if (!m_tts_queue.empty()) { - message_to_play = m_tts_queue.front(); - m_tts_queue.pop(); - } - } // 释放锁 + if (!m_tts_running && m_tts_queue.empty()) { + break; // 退出 + } - if (!message_to_play.empty()) { - spdlog::debug("TTS worker playing: {}", message_to_play); - // 这是阻塞调用,但在独立线程中,不会影响主 io_context - m_tts_service.say_text_and_play(message_to_play); - } + if (!m_tts_queue.empty()) { + message_to_play = m_tts_queue.front(); + m_tts_queue.pop(); + } + } // 释放锁 + + if (!message_to_play.empty()) { + spdlog::debug("TTS worker playing: {}", message_to_play); + // 这是阻塞调用,但在独立线程中,不会影响主 io_context + m_tts_service.say_text_and_play(message_to_play); } - spdlog::info("TTS worker thread stopped."); + } + spdlog::info("TTS worker thread stopped."); } void AlarmService::schedule_tts(const std::string& text) { - if (text.empty() || !m_tts_running) return; - { - std::lock_guard lock(m_tts_queue_mutex); - m_tts_queue.push(text); - } - m_tts_cv.notify_one(); + if (text.empty() || !m_tts_running) return; + { + std::lock_guard lock(m_tts_queue_mutex); + m_tts_queue.push(text); + } + m_tts_cv.notify_one(); } nlohmann::json AlarmService::getActiveAlarmsJson() { - auto alarms = m_data_storage.getActiveAlarms(); - json j_alarms = json::array(); - for(const auto& alarm : alarms) { - j_alarms.push_back(AlarmEvent::toJson(alarm)); - } - return j_alarms; + auto alarms = m_data_storage.getActiveAlarms(); + json j_alarms = json::array(); + for (const auto& alarm : alarms) { + j_alarms.push_back(AlarmEvent::toJson(alarm)); + } + return j_alarms; } nlohmann::json AlarmService::getAlarmHistoryJson(int limit) { - auto alarms = m_data_storage.getAlarmHistory(limit); - json j_alarms = json::array(); - for(const auto& alarm : alarms) { - j_alarms.push_back(AlarmEvent::toJson(alarm)); + auto alarms = m_data_storage.getAlarmHistory(limit); + json j_alarms = json::array(); + for (const auto& alarm : alarms) { + j_alarms.push_back(AlarmEvent::toJson(alarm)); + } + return j_alarms; +} + +bool AlarmService::parse_rules_from_file( + const std::string& config_path, std::vector& out_rules, + std::map& out_states) { + std::ifstream ifs(config_path); + if (!ifs.is_open()) { + spdlog::error("Failed to open alarm rules file: {}", config_path); + return false; + } + + try { + json j_rules = json::parse(ifs); + + // 清空临时容器,准备接收新规则 + out_rules.clear(); + out_states.clear(); + + for (const auto& j_rule : j_rules) { + AlarmRule rule; + rule.rule_id = j_rule.at("rule_id"); + rule.device_id = j_rule.at("device_id"); + rule.data_point_name = j_rule.at("data_point_name"); + rule.compare_type = + AlarmRule::stringToCompareType(j_rule.at("compare_type")); + rule.threshold = j_rule.at("threshold"); + rule.level = AlarmRule::stringToLevel(j_rule.at("level")); + rule.message_template = j_rule.at("message_template"); + rule.debounce_seconds = j_rule.value("debounce_seconds", 0); + rule.alarm_mqtt_topic = j_rule.value("alarm_mqtt_topic", ""); + rule.clear_message_template = j_rule.value("clear_message_template", ""); + + if (rule.compare_type == CompareType::UNKNOWN) { + spdlog::warn("Skipping rule '{}': Unknown compare_type.", rule.rule_id); + continue; + } + + out_rules.push_back(std::move(rule)); + + // 为每条规则准备一个状态机 + out_states.emplace(std::piecewise_construct, + std::forward_as_tuple(rule.rule_id), + std::forward_as_tuple( + m_io_context) // m_io_context 是类成员,可以访问 + ); } - return j_alarms; + } catch (const json::exception& e) { + spdlog::error("Failed to parse alarm rules file '{}': {}", config_path, + e.what()); + return false; + } + + return true; } \ No newline at end of file diff --git a/src/alarm/alarm_service.h b/src/alarm/alarm_service.h index 1befd1b..8a0f40c 100644 --- a/src/alarm/alarm_service.h +++ b/src/alarm/alarm_service.h @@ -1,69 +1,76 @@ // alarm/alarm_service.h #pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include + #include "alarm_defs.h" #include "alarm_event.h" -#include "tts/piper_tts_interface.h" // 你的 TTS 接口 #include "dataStorage/data_storage.h" #include "mqtt/mqtt_client.h" #include "spdlog/spdlog.h" -#include -#include - -#include -#include -#include -#include -#include -#include -#include +#include "tts/piper_tts_interface.h" // 你的 TTS 接口 using json = nlohmann::json; class AlarmService { -public: - AlarmService(boost::asio::io_context& io, - PiperTTSInterface& tts_service, - MqttClient& mqtt_client, - DataStorage& data_storage); - - ~AlarmService(); + public: + AlarmService(boost::asio::io_context& io, PiperTTSInterface& tts_service, + MqttClient& mqtt_client, DataStorage& data_storage); - void stop(); + ~AlarmService(); - bool load_rules(const std::string& config_path); + void stop(); - void process_device_data(const std::string& device_id, const std::string& data_json); - - void process_system_data(const std::string& system_data_json); + bool load_rules(const std::string& config_path); + bool reload_rules(); - nlohmann::json getActiveAlarmsJson(); - nlohmann::json getAlarmHistoryJson(int limit); + void process_device_data(const std::string& device_id, + const std::string& data_json); -private: - void check_rule_against_value(AlarmRule& rule, double value, const std::string& actual_device_id); + void process_system_data(const std::string& system_data_json); - std::string format_message(const AlarmRule& rule, const std::string& template_str, double value, const std::string& actual_device_id); + nlohmann::json getActiveAlarmsJson(); + nlohmann::json getAlarmHistoryJson(int limit); - void trigger_alarm_action(AlarmRule& rule, double value, const std::string& actual_device_id); - - void clear_alarm(AlarmRule& rule, double value, const std::string& actual_device_id); + private: + void check_rule_against_value(AlarmRule& rule, double value, + const std::string& actual_device_id); + std::string format_message(const AlarmRule& rule, + const std::string& template_str, double value, + const std::string& actual_device_id); + void trigger_alarm_action(AlarmRule& rule, double value, + const std::string& actual_device_id); + void clear_alarm(AlarmRule& rule, double value, + const std::string& actual_device_id); + void tts_worker(); + void schedule_tts(const std::string& text); + bool parse_rules_from_file(const std::string& config_path, + std::vector& out_rules, + std::map& out_states); - void tts_worker(); - void schedule_tts(const std::string& text); + boost::asio::io_context& m_io_context; + PiperTTSInterface& m_tts_service; + MqttClient& m_mqtt_client; + DataStorage& m_data_storage; - boost::asio::io_context& m_io_context; - PiperTTSInterface& m_tts_service; - MqttClient& m_mqtt_client; - DataStorage& m_data_storage; - - std::vector m_rules; - std::map m_alarm_states; + std::vector m_rules; + std::map m_alarm_states; - // TTS 播报队列 - std::mutex m_tts_queue_mutex; - std::condition_variable m_tts_cv; - std::queue m_tts_queue; - std::thread m_tts_thread; - bool m_tts_running; + std::string m_rules_config_path; // <--- 新增:存储配置文件路径 + std::mutex m_rules_mutex; // <--- 新增:保护 m_rules 和 m_alarm_states + + // TTS 播报队列 + std::mutex m_tts_queue_mutex; + std::condition_variable m_tts_cv; + std::queue m_tts_queue; + std::thread m_tts_thread; + bool m_tts_running; }; \ No newline at end of file