// 文件名: src/modbus/modbus_rtu_poller_service.cc #include "modbus_rtu_poller_service.h" #include "generic_modbus_parser.h" // 使用通用解析器 #include "spdlog/spdlog.h" #include #include // for std::minmax_element ModbusRtuPollerService::ModbusRtuPollerService(ModbusRtuDeviceConfig config, ReportDataCallback report_cb) : m_config(std::move(config)), m_report_callback(std::move(report_cb)) {} ModbusRtuPollerService::~ModbusRtuPollerService() { stop(); } void ModbusRtuPollerService::start() { if (m_thread.joinable()) { spdlog::warn("[Modbus RTU Service] Poller for device '{}' is already running.", m_config.device_id); return; } m_stop_flag = false; // 启动一个新线程,并将 run() 方法作为入口点 // 'this' 指针被传递,以便新线程可以调用类的成员函数 m_thread = std::thread(&ModbusRtuPollerService::run, this); spdlog::info("[Modbus RTU Service] Poller for device '{}' started in a background thread.", m_config.device_id); } void ModbusRtuPollerService::stop() { m_stop_flag = true; if (m_thread.joinable()) { m_thread.join(); // 等待线程安全退出 spdlog::info("[Modbus RTU Service] Poller for device '{}' has been stopped.", m_config.device_id); } } const ModbusRtuDeviceConfig& ModbusRtuPollerService::get_config() const { return m_config; } bool ModbusRtuPollerService::is_running() const { return !m_stop_flag && m_thread.joinable(); } void ModbusRtuPollerService::write_single_register(uint16_t address, uint16_t value) { std::lock_guard lock(m_client_mutex); try { spdlog::debug("[Modbus RTU] Device '{}': Writing value {} to address {}.", m_config.device_id, value, address); m_client.writeSingleRegister(m_config.slave_id, address, value); spdlog::info("[Modbus RTU] Device '{}': Successfully wrote value {} to address {}.", m_config.device_id, value, address); } catch (const std::exception& e) { spdlog::error("[Modbus RTU] Device '{}': Failed to write to address {}: {}", m_config.device_id, address, e.what()); } } void ModbusRtuPollerService::run() { if (m_config.data_points.empty()) { spdlog::warn("[Modbus RTU] Device '{}' has no data points configured. Thread will not run.", m_config.device_id); return; } if (!m_client.setPortSettings(m_config.port_path, m_config.baud_rate)) { spdlog::error("[Modbus RTU] Failed to set up serial port '{}' for device '{}'. Thread exiting.", m_config.port_path, m_config.device_id); return; } auto [min_it, max_it] = std::minmax_element(m_config.data_points.begin(), m_config.data_points.end(), [](const DataPointConfig& a, const DataPointConfig& b) { return a.address < b.address; }); uint16_t start_address = min_it->address; uint16_t last_address = max_it->address; if (max_it->type == ModbusDataType::UINT32 || max_it->type == ModbusDataType::INT32 || max_it->type == ModbusDataType::FLOAT32) { last_address += 1; } uint16_t quantity = last_address - start_address + 1; spdlog::info("[Modbus RTU] Device '{}' will poll {} registers starting from address {}.", m_config.device_id, quantity, start_address); while (!m_stop_flag) { { // <--- 创建一个新的作用域来控制锁的生命周期 std::lock_guard lock(m_client_mutex); try { std::vector raw_registers = m_client.readHoldingRegisters(m_config.slave_id, start_address, quantity); std::map registers_map; for (uint16_t i = 0; i < raw_registers.size(); ++i) { registers_map[start_address + i] = raw_registers[i]; } nlohmann::json data_j = GenericModbusParser::parse(registers_map, m_config.data_points); UnifiedData report_data; report_data.device_id = m_config.device_id; report_data.timestamp_ms = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); report_data.data_json = data_j.dump(); if (m_report_callback) { m_report_callback(report_data); } } catch (const std::exception& e) { spdlog::error("[Modbus RTU] Error during communication with device '{}': {}", m_config.device_id, e.what()); } } // <--- 锁在这里被释放 auto wake_up_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(m_config.poll_interval_ms); while (std::chrono::steady_clock::now() < wake_up_time) { if (m_stop_flag) { break; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } m_client.closePort(); spdlog::info("[RTU Poller {}] Run loop finished. Thread exiting.", m_config.device_id); }