From d2cbab34c7c4052f0be83f614dbabaa3f6fb982b Mon Sep 17 00:00:00 2001 From: GuanYuankai Date: Thu, 8 Jan 2026 13:47:49 +0800 Subject: [PATCH] =?UTF-8?q?feat(video,network):=20=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E5=92=8C=E5=89=8D=E7=AB=AF=E7=9A=84=E8=A7=86=E9=A2=91=E7=BB=8A?= =?UTF-8?q?=E7=BA=BF=E6=8E=A5=E5=8F=97=E8=B0=83=E6=95=B4=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.json | 8 +-- src/config/config_manager.cc | 64 +++++++++++++++------ src/mysqlManager/ResourceFileDao.cpp | 12 ++-- src/mysqlManager/mysql_manager.cpp | 29 ++++++++-- src/videoService/video_pipeline.cpp | 86 +++++++++++++++++----------- src/web/web_server.cc | 25 ++++++-- 6 files changed, 152 insertions(+), 72 deletions(-) diff --git a/config/config.json b/config/config.json index 5eac0d5..74e8d57 100644 --- a/config/config.json +++ b/config/config.json @@ -20,12 +20,12 @@ "enable": true, "line": { "p1": { - "x": 0.0, - "y": 0.8 + "x": 0.39809998869895935, + "y": 0.8816999793052673 }, "p2": { - "x": 1.0, - "y": 0.8 + "x": 0.8337000012397766, + "y": 0.5864999890327454 } }, "name": "Main_Gate_Line" diff --git a/src/config/config_manager.cc b/src/config/config_manager.cc index 473e19f..eb4d727 100644 --- a/src/config/config_manager.cc +++ b/src/config/config_manager.cc @@ -268,27 +268,55 @@ TripwireConfig ConfigManager::getTripwireConfig() { return config; } bool ConfigManager::updateTripwireLine(float p1_x, float p1_y, float p2_x, float p2_y) { - std::unique_lock lock(m_mutex); // 获取写锁,确保线程安全 + json new_tripwire_value; // 用于存储更新后的片段 - // 1. 确保 tripwire 对象存在 - if (!m_config_json.contains("tripwire") || !m_config_json["tripwire"].is_object()) { - m_config_json["tripwire"] = json::object(); - } + // 1. 更新数据并保存 (使用作用域限制锁的范围) + { + std::unique_lock lock(m_mutex); // 获取写锁 - // 2. 确保 line 对象存在 - if (!m_config_json["tripwire"].contains("line") || - !m_config_json["tripwire"]["line"].is_object()) { - m_config_json["tripwire"]["line"] = json::object(); - } + if (!m_config_json.contains("tripwire") || !m_config_json["tripwire"].is_object()) { + m_config_json["tripwire"] = json::object(); + } + + if (!m_config_json["tripwire"].contains("line") || !m_config_json["tripwire"]["line"].is_object()) { + m_config_json["tripwire"]["line"] = json::object(); + } - // 3. 更新坐标值 - // 注意:JSON 结构根据 config.json 文件构建 - m_config_json["tripwire"]["line"]["p1"] = {{"x", p1_x}, {"y", p1_y}}; - m_config_json["tripwire"]["line"]["p2"] = {{"x", p2_x}, {"y", p2_y}}; + m_config_json["tripwire"]["line"]["p1"] = { {"x", p1_x}, {"y", p1_y} }; + m_config_json["tripwire"]["line"]["p2"] = { {"x", p2_x}, {"y", p2_y} }; - spdlog::info("Updating Tripwire: P1({:.2f}, {:.2f}), P2({:.2f}, {:.2f})", p1_x, p1_y, p2_x, - p2_y); + // 捕获最新的 tripwire 值,用于发送给回调 + new_tripwire_value = m_config_json["tripwire"]; - // 4. 保存到文件 (save_unlocked 内部不加锁,适合在这里调用) - return save_unlocked(); + spdlog::info("Updating Tripwire: P1({:.2f}, {:.2f}), P2({:.2f}, {:.2f})", + p1_x, p1_y, p2_x, p2_y); + + if (!save_unlocked()) { + return false; + } + } + + // 2. === [修复核心] 手动触发观察者回调 === + std::vector callbacks; + { + std::lock_guard cb_lock(m_callbackMutex); + if (m_key_callbacks.count("tripwire")) { + callbacks = m_key_callbacks["tripwire"]; + } + } + + // 执行回调 + int trigger_count = 0; + for (const auto& cb : callbacks) { + try { + cb(new_tripwire_value); // 这里的 new_tripwire_value 就是传给 VideoPipeline 的新配置 + trigger_count++; + } catch (const std::exception& e) { + spdlog::error("Exception inside manual config update callback: {}", e.what()); + } + } + + spdlog::info("Manual update triggered {} callbacks.", trigger_count); + + return true; } \ No newline at end of file diff --git a/src/mysqlManager/ResourceFileDao.cpp b/src/mysqlManager/ResourceFileDao.cpp index 1174da8..4178598 100644 --- a/src/mysqlManager/ResourceFileDao.cpp +++ b/src/mysqlManager/ResourceFileDao.cpp @@ -3,6 +3,7 @@ #include #include "mysql_manager.h" +#include "spdlog/spdlog.h" // --- 静态辅助函数 --- static std::string GetFileTypeStr(FileType type) { @@ -23,12 +24,11 @@ bool ResourceFileDao::Insert(const ResourceFile& file) { std::string sql = "INSERT INTO sys_resource_file " "(source_table, file_path, source_file_name, suffix_name, " - "file_type, business_id, business_type, create_user_id) " - "VALUES (?, ?, ?, ?, ?, ?, ?, ?)"; + "file_type, business_id, create_user_id) " + "VALUES (?, ?, ?, ?, ?, ?, ?)"; auto pstmt = MysqlManager::GetInstance()->GetPreparedStatement(sql); if (!pstmt) { - return false; } try { @@ -38,14 +38,14 @@ bool ResourceFileDao::Insert(const ResourceFile& file) { pstmt->setString(4, file.suffixName); pstmt->setString(5, file.fileType); pstmt->setInt64(6, file.businessId); - pstmt->setString(7, file.businessType); // create_user_id 暂时给0,如果有登录系统可传入 - pstmt->setInt64(8, file.createUserId); + pstmt->setInt64(7, file.createUserId); pstmt->executeUpdate(); return true; } catch (sql::SQLException& e) { - // 建议 log: e.what() + spdlog::error("MySQL Error in ResourceFileDao: Code={}, Message={}", e.getErrorCode(), + e.what()); return false; } } diff --git a/src/mysqlManager/mysql_manager.cpp b/src/mysqlManager/mysql_manager.cpp index 76abf9c..51c1836 100644 --- a/src/mysqlManager/mysql_manager.cpp +++ b/src/mysqlManager/mysql_manager.cpp @@ -1,5 +1,7 @@ #include "mysql_manager.h" +#include "spdlog/spdlog.h" + MysqlManager* MysqlManager::instance = nullptr; std::mutex MysqlManager::mtx; @@ -35,13 +37,30 @@ bool MysqlManager::Initialize(const std::string& h, const std::string& u, const } void MysqlManager::CheckConnection() { - if (!connection || connection->isClosed()) { - // 尝试重连 + bool valid = false; + + // 1. 初步检查对象是否存在 + if (connection && !connection->isClosed()) { + try { + // 2. 主动执行一个轻量级查询 (Ping) + std::unique_ptr stmt(connection->createStatement()); + std::unique_ptr res(stmt->executeQuery("SELECT 1")); + valid = true; + } catch (sql::SQLException& e) { + spdlog::warn("MySQL Connection is dead (Ping failed): {}", e.what()); + valid = false; + } + } + + // 3. 如果无效,则重连 + if (!valid) { + spdlog::info("Attempting to reconnect to MySQL..."); try { connection.reset(driver->connect(host, user, pass)); connection->setSchema(dbName); - } catch (...) { - // 重连失败,实际项目中应记录日志 + spdlog::info("MySQL Reconnected successfully."); + } catch (sql::SQLException& e) { + spdlog::error("Failed to reconnect to MySQL: {}", e.what()); } } } @@ -53,6 +72,8 @@ std::unique_ptr MysqlManager::GetPreparedStatement(const try { return std::unique_ptr(connection->prepareStatement(sql)); } catch (sql::SQLException& e) { + spdlog::error("MySQL Error in GetPreparedStatement: Code={}, Message={}", e.getErrorCode(), + e.what()); return nullptr; } } \ No newline at end of file diff --git a/src/videoService/video_pipeline.cpp b/src/videoService/video_pipeline.cpp index 3cda195..fcc1825 100644 --- a/src/videoService/video_pipeline.cpp +++ b/src/videoService/video_pipeline.cpp @@ -4,6 +4,7 @@ #include #include // C++17 #include // for std::put_time +#include // [新增] 引入互斥锁头文件 #include #include "config/config_manager.h" @@ -16,6 +17,10 @@ namespace fs = std::filesystem; const int YoloDetector::NPU_CORE_CNT; +// === [新增] 数据库写入互斥锁 === +// 防止多个后台线程同时操作同一个 MySQL 连接导致 "Lost connection" 或协议错误 +static std::mutex g_db_mtx; + // === 静态辅助函数 === // 获取矩形底部的中心点 (作为车辆的"脚") @@ -173,7 +178,7 @@ void VideoPipeline::processCrossing(const TrackedVehicle& vehicle, const cv::Mat const std::string& locationName) { // 启动分离线程,避免阻塞主视频流 std::thread([this, vehicle, frame, locationName]() { - // === 1. 准备目录与文件名 === + // === 1. 准备目录与文件名 (IO操作,无需加锁) === std::string saveDir = "../captures"; try { if (!fs::exists(saveDir)) { @@ -194,7 +199,7 @@ void VideoPipeline::processCrossing(const TrackedVehicle& vehicle, const cv::Mat std::string fileName = fmt::format("{}_id{}.jpg", timeStr, vehicle.id); std::string fullPath = fmt::format("{}/{}", saveDir, fileName); - // === 2. 安全截图 === + // === 2. 安全截图 (内存操作,无需加数据库锁) === cv::Rect safeBox = vehicle.box; // 边界钳制,防止 crash if (safeBox.x < 0) @@ -212,53 +217,64 @@ void VideoPipeline::processCrossing(const TrackedVehicle& vehicle, const cv::Mat } try { - // Clone 数据,因为主线程可能会复用 frame 内存 (虽然当前架构中 frame - // 是独立的,但为了健壮性) + // Clone 数据,因为主线程可能会复用 frame 内存 cv::Mat snapshot = frame(safeBox).clone(); if (cv::imwrite(fullPath, snapshot)) { spdlog::info("Snapshot saved: {}", fullPath); } else { spdlog::error("Failed to write image: {}", fullPath); - return; + return; // 图片保存失败通常不继续写库 } } catch (const std::exception& e) { spdlog::error("Save image exception: {}", e.what()); return; } - // === 3. 数据库入库 === - DeviceIdentificationDao deviceDao; - ResourceFileDao fileDao; + // === 3. 数据库入库 (关键修改:加锁) === + // 使用大括号限制锁的作用域 + { + // [修复] 此处加锁,确保同一时刻只有一个线程使用 MySQL 连接 + std::lock_guard db_lock(g_db_mtx); - // 3.1 准备枚举数据 - // class_id: 1 -> EV(Green), 0 -> Fuel(Blue) - CarType cType = (vehicle.last_class_id == 1) ? CarType::ELECTRIC : CarType::GASOLINE; - CarColor cColor = (vehicle.last_class_id == 1) ? CarColor::GREEN : CarColor::BLUE; + try { + DeviceIdentificationDao deviceDao; + ResourceFileDao fileDao; - // 3.2 插入业务主表 - // 假设 SystemID 为 1,实际项目可能需配置 - int64_t systemId = 1; - std::string location = locationName.empty() ? "Unkown_Line" : locationName; + // 3.1 准备枚举数据 + // class_id: 1 -> EV(Green), 0 -> Fuel(Blue) + CarType cType = + (vehicle.last_class_id == 1) ? CarType::ELECTRIC : CarType::GASOLINE; + CarColor cColor = (vehicle.last_class_id == 1) ? CarColor::GREEN : CarColor::BLUE; - int64_t dataId = deviceDao.ReportIdentification(systemId, location, cColor, cType); + // 3.2 插入业务主表 + // 假设 SystemID 为 1,实际项目可能需配置 + int64_t systemId = 1; + std::string location = locationName.empty() ? "Unkown_Line" : locationName; - if (dataId > 0) { - // 3.3 插入文件关联表 - bool fileSaved = fileDao.SaveFile("tb_device_identification_data", // source_table - dataId, // business_id - fullPath, // file_path - fileName, // source_file_name - FileType::ORIGINAL // file_type - ); + int64_t dataId = deviceDao.ReportIdentification(systemId, location, cColor, cType); - if (fileSaved) { - spdlog::debug("DB Transaction Complete. DataID: {}", dataId); - } else { - spdlog::error("Failed to save file record for DataID: {}", dataId); + if (dataId > 0) { + // 3.3 插入文件关联表 + bool fileSaved = + fileDao.SaveFile("tb_device_identification_data", // source_table + dataId, // business_id + fullPath, // file_path + fileName, // source_file_name + FileType::ORIGINAL // file_type + ); + + if (fileSaved) { + spdlog::debug("DB Transaction Complete. DataID: {}", dataId); + } else { + spdlog::error("Failed to save file record for DataID: {}", dataId); + } + } else { + spdlog::error("Failed to insert device identification data."); + } + } catch (const std::exception& e) { + spdlog::error("Database Exception in thread: {}", e.what()); } - } else { - spdlog::error("Failed to insert device identification data."); - } + } // 锁在这里自动释放 }).detach(); // 让线程后台运行 } @@ -482,9 +498,9 @@ void VideoPipeline::processLoop(std::string inputUrl, std::string outputUrl, boo writer.write(current_data.original_frame); } - if (write_frame_idx % 60 == 0) { - spdlog::info("Processed Frame ID: {}", write_frame_idx); - } + // if (write_frame_idx % 60 == 0) { + // spdlog::info("Processed Frame ID: {}", write_frame_idx); + // } } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } diff --git a/src/web/web_server.cc b/src/web/web_server.cc index 96cc5f9..c7999de 100644 --- a/src/web/web_server.cc +++ b/src/web/web_server.cc @@ -60,7 +60,10 @@ void WebServer::setup_routes() { // 2. 校验 JSON 格式是否合法 if (!json_body) { - return crow::response(400, "Invalid JSON format"); + json response_json = { + {"code", 400}, {"status", "fail"}, {"message", "Invalid JSON format"}}; + + return crow::response(400, response_json.dump()); } // 3. 校验必要字段是否存在 @@ -80,7 +83,11 @@ void WebServer::setup_routes() { // 5. 简单的范围校验 (可选,防止异常数据) if (x1_pct < 0 || x1_pct > 100 || y1_pct < 0 || y1_pct > 100 || x2_pct < 0 || x2_pct > 100 || y2_pct < 0 || y2_pct > 100) { - return crow::response(400, "Values must be between 0 and 100"); + json response_json = {{"code", 400}, + {"status", "fail"}, + {"message", "Values must be between 0 and 100"}}; + + return crow::response(400, response_json.dump()); } // 6. 归一化:除以 100 转换为 0.0 - 1.0 @@ -94,16 +101,24 @@ void WebServer::setup_routes() { ConfigManager::getInstance().updateTripwireLine(p1_x, p1_y, p2_x, p2_y); if (success) { - json response_json = {{"status", "success"}, + json response_json = {{"code", 200}, + {"status", "success"}, {"message", "Tripwire updated successfully"}}; return crow::response(200, response_json.dump()); } else { - return crow::response(500, "Failed to save configuration"); + json response_json = {{"code", 500}, + {"status", "fail"}, + {"message", "Failed to save configuration"}}; + + return crow::response(500, response_json.dump()); } } catch (const std::exception& e) { spdlog::error("Error parsing tripwire coordinates: {}", e.what()); - return crow::response(400, "Invalid data types"); + json response_json = { + {"code", 400}, {"status", "fail"}, {"message", "Invalid data types"}}; + + return crow::response(400, response_json.dump()); } }); } \ No newline at end of file