diff --git a/CMakeLists.txt b/CMakeLists.txt index c184950..f469140 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,6 +14,8 @@ find_package(PkgConfig REQUIRED) pkg_check_modules(GST REQUIRED gstreamer-1.0 gstreamer-app-1.0) find_package(OpenCV REQUIRED) +find_package(absl REQUIRED) + add_subdirectory(src/vendor/crow) add_library(nlohmann_json INTERFACE) @@ -60,11 +62,17 @@ add_library(vehicle_road_lib STATIC src/config/config_manager.cc #告警模块 src/alarm/alarm_service.cc + #新视频模块 + src/videoService/rtsp_camera_service.cc + src/videoService/rtsp_stream_pusher.cc + src/videoService/algorithm_service.cc + src/videoService/video_pipeline.cc ) target_include_directories(vehicle_road_lib PUBLIC /usr/include/opencv4 ${CMAKE_CURRENT_SOURCE_DIR}/src + ${CMAKE_CURRENT_SOURCE_DIR}/src/videoService ${GST_INCLUDE_DIRS} /usr/include/rga ) @@ -82,6 +90,12 @@ target_link_libraries(vehicle_road_lib PRIVATE rga ${OpenCV_LIBS} ${GST_LIBRARIES} + # --- 新增 Abseil 库 --- + absl::strings + absl::time + absl::synchronization + absl::status + absl::statusor ) diff --git a/TODO.md b/TODO.md index 7fac051..f077bf2 100644 --- a/TODO.md +++ b/TODO.md @@ -3,7 +3,8 @@ ## 当前任务 (Doing) - [ ] 验证 OpenCV 在 RK3588 容器内的调用 (/dev/video0) -- [ ] 接入摄像头设备 +- [ ] 接入摄像头设备 rtsp://admin:123456@192.168.1.57:554/stream0 +- [ ] 接入 RKNN 模型 ## 待办 (To Do) diff --git a/config/devices.json b/config/devices.json index b650012..c0e3d35 100644 --- a/config/devices.json +++ b/config/devices.json @@ -1,98 +1,98 @@ { - "modbus_rtu_devices": [ - { - "enabled": true, - "device_id": "rtu_temp_sensor_lab", - "port_path": "/dev/ttyS7", - "baud_rate": 9600, - "slave_id": 1, - "poll_interval_ms": 5000, - "data_points": [ - { - "name": "temperature", - "address": 0, - "type": "INT16", - "scale": 0.1 - }, - { - "name": "humidity", - "address": 1, - "type": "UINT16", - "scale": 0.1 - } - ] - }, - { - "enabled": false, - "device_id": "rotary encoder", - "port_path": "/dev/ttyS7", - "baud_rate": 9600, - "slave_id": 111, - "poll_interval_ms": 5000, - "data_points": [ - { - "name": "count", - "address": 1, - "type": "INT16", - "scale": 1 - }, - { - "name": "total_count", - "address": 2, - "type": "INT16", - "scale": 1 - } - ] - }, - { - "enabled": false, - "device_id": "backup_counter", - "port_path": "/dev/ttyS7", - "baud_rate": 9600, - "slave_id": 10, - "poll_interval_ms": 1000, - "data_points": [ - { - "name": "count", - "address": 32, - "type": "UINT32" - } - ] - } - ], - "modbus_tcp_devices": [ - { - "enabled": false, - "device_id": "plc_workshop1", - "ip_address": "192.168.1.120", - "port": 502, - "slave_id": 1, - "poll_interval_ms": 1000, - "data_points": [ - { - "name": "motor_speed", - "address": 100, - "type": "UINT16", - "scale": 1 - }, - { - "name": "pressure", - "address": 102, - "type": "FLOAT32", - "scale": 0.01 - }, - { - "name": "valve_status", - "address": 104, - "type": "UINT16", - "scale": 1 - } - ] - } - ], - "modbus_rtu_bus_configs": { - "/dev/ttyS7": { - "inter_device_delay_ms": 150 - } - } -} \ No newline at end of file + "modbus_rtu_devices": [ + { + "enabled": false, + "device_id": "rtu_temp_sensor_lab", + "port_path": "/dev/ttyS7", + "baud_rate": 9600, + "slave_id": 1, + "poll_interval_ms": 5000, + "data_points": [ + { + "name": "temperature", + "address": 0, + "type": "INT16", + "scale": 0.1 + }, + { + "name": "humidity", + "address": 1, + "type": "UINT16", + "scale": 0.1 + } + ] + }, + { + "enabled": false, + "device_id": "rotary encoder", + "port_path": "/dev/ttyS7", + "baud_rate": 9600, + "slave_id": 111, + "poll_interval_ms": 5000, + "data_points": [ + { + "name": "count", + "address": 1, + "type": "INT16", + "scale": 1 + }, + { + "name": "total_count", + "address": 2, + "type": "INT16", + "scale": 1 + } + ] + }, + { + "enabled": false, + "device_id": "backup_counter", + "port_path": "/dev/ttyS7", + "baud_rate": 9600, + "slave_id": 10, + "poll_interval_ms": 1000, + "data_points": [ + { + "name": "count", + "address": 32, + "type": "UINT32" + } + ] + } + ], + "modbus_tcp_devices": [ + { + "enabled": false, + "device_id": "plc_workshop1", + "ip_address": "192.168.1.120", + "port": 502, + "slave_id": 1, + "poll_interval_ms": 1000, + "data_points": [ + { + "name": "motor_speed", + "address": 100, + "type": "UINT16", + "scale": 1 + }, + { + "name": "pressure", + "address": 102, + "type": "FLOAT32", + "scale": 0.01 + }, + { + "name": "valve_status", + "address": 104, + "type": "UINT16", + "scale": 1 + } + ] + } + ], + "modbus_rtu_bus_configs": { + "/dev/ttyS7": { + "inter_device_delay_ms": 150 + } + } +} diff --git a/src/main.cpp b/src/main.cpp index a8cd6f0..52babb1 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -19,14 +19,13 @@ #include "nlohmann/json.hpp" #include "spdlog/spdlog.h" #include "systemMonitor/system_monitor.h" +#include "videoService/video_pipeline.hpp" #include "web/web_server.h" boost::asio::io_context g_io_context; -void poll_system_metrics(boost::asio::steady_timer &timer, - SystemMonitor::SystemMonitor &monitor, - MqttClient &mqtt_client, AlarmService &alarm_service) -{ +void poll_system_metrics(boost::asio::steady_timer& timer, SystemMonitor::SystemMonitor& monitor, + MqttClient& mqtt_client, AlarmService& alarm_service) { if (g_io_context.stopped()) return; auto cpu_util = monitor.getCpuUtilization(); @@ -35,8 +34,7 @@ void poll_system_metrics(boost::asio::steady_timer &timer, double mem_usage_percentage = (mem_info.total_kb > 0) - ? (100.0 * (mem_info.total_kb - mem_info.available_kb) / - mem_info.total_kb) + ? (100.0 * (mem_info.total_kb - mem_info.available_kb) / mem_info.total_kb) : 0.0; auto thermalInfoString = monitor.getChipTemperature(); @@ -44,8 +42,7 @@ void poll_system_metrics(boost::asio::steady_timer &timer, std::string topic = "proxy/system_status"; std::string payload; - try - { + try { nlohmann::json payload_json; payload_json["cpu_usage"] = cpu_util.totalUsagePercentage; payload_json["mem_total_gb"] = mem_total_gb; @@ -53,11 +50,8 @@ void poll_system_metrics(boost::asio::steady_timer &timer, payload_json["thermal_info"] = nlohmann::json::parse(thermalInfoString); payload = payload_json.dump(); - } - catch (const nlohmann::json::parse_error &e) - { - spdlog::error("Failed to parse thermalInfo JSON: {}. Sending partial data.", - e.what()); + } catch (const nlohmann::json::parse_error& e) { + spdlog::error("Failed to parse thermalInfo JSON: {}. Sending partial data.", e.what()); nlohmann::json fallback_json; fallback_json["cpu_usage"] = cpu_util.totalUsagePercentage; fallback_json["mem_total_gb"] = mem_total_gb; @@ -74,44 +68,38 @@ void poll_system_metrics(boost::asio::steady_timer &timer, spdlog::debug("System metrics published."); timer.expires_at(timer.expiry() + std::chrono::seconds(15)); - timer.async_wait(std::bind(poll_system_metrics, std::ref(timer), - std::ref(monitor), std::ref(mqtt_client), - std::ref(alarm_service))); + timer.async_wait(std::bind(poll_system_metrics, std::ref(timer), std::ref(monitor), + std::ref(mqtt_client), std::ref(alarm_service))); } -int main(int argc, char *argv[]) -{ +int main(int argc, char* argv[]) { + // TODO: [GYK] DEV#1: 将 URL 放入 config.json 中读取 + std::string cam_rtsp_input = "rtsp://admin:123456@192.168.1.57:554/stream0"; + std::string algorithm_rtsp_output = "rtsp://127.0.0.1:8554/processed"; const std::string config_path = "/app/config/config.json"; - if (!ConfigManager::getInstance().load(config_path)) - { + if (!ConfigManager::getInstance().load(config_path)) { std::cerr << "Failed to load configuration from " << config_path - << ". Running with defaults, but this may cause issues." - << std::endl; + << ". Running with defaults, but this may cause issues." << std::endl; } - auto &config = ConfigManager::getInstance(); + auto& config = ConfigManager::getInstance(); - try - { + try { spdlog::set_level(spdlog::level::from_str(config.getLogLevel())); spdlog::info("Edge Proxy starting up..."); - } - catch (const spdlog::spdlog_ex &ex) - { + } catch (const spdlog::spdlog_ex& ex) { std::cerr << "Log initialization failed: " << ex.what() << std::endl; return 1; } spdlog::info("Initializing Data Storage..."); - auto &data_storage = DataStorage::getInstance(); - if (!data_storage.initialize(config.getDataStorageDbPath())) - { + auto& data_storage = DataStorage::getInstance(); + if (!data_storage.initialize(config.getDataStorageDbPath())) { spdlog::critical("Failed to initialize DataStorage. Exiting."); return 1; } - try - { + try { spdlog::info("Initializing Video Service..."); DataCache data_cache; @@ -120,38 +108,30 @@ int main(int argc, char *argv[]) AlarmService alarm_service(g_io_context, mqtt_client); - if (!alarm_service.load_rules(config.getAlarmRulesPath())) - { + VideoPipeline video_pipeline; + + if (!alarm_service.load_rules(config.getAlarmRulesPath())) { spdlog::error("Failed to load alarm rules. Alarms may be disabled."); } - auto report_to_mqtt = [&](const UnifiedData &data) - { - if (data_storage.storeProcessedData(data)) - { - spdlog::debug("Successfully stored PROCESSED data for device '{}'", - data.device_id); - } - else - { - spdlog::error("Failed to store PROCESSED data for device '{}'", - data.device_id); + auto report_to_mqtt = [&](const UnifiedData& data) { + if (data_storage.storeProcessedData(data)) { + spdlog::debug("Successfully stored PROCESSED data for device '{}'", data.device_id); + } else { + spdlog::error("Failed to store PROCESSED data for device '{}'", data.device_id); } live_data_cache.update_data(data.device_id, data.data_json); alarm_service.process_device_data(data.device_id, data.data_json); - if (mqtt_client.is_connected()) - { + if (mqtt_client.is_connected()) { std::string topic = "devices/" + data.device_id + "/data"; - g_io_context.post([&, topic, payload = data.data_json]() - { mqtt_client.publish(topic, payload, 1, false); }); - } - else - { - spdlog::warn("MQTT disconnected. Caching data for device '{}'.", - data.device_id); + g_io_context.post([&, topic, payload = data.data_json]() { + mqtt_client.publish(topic, payload, 1, false); + }); + } else { + spdlog::warn("MQTT disconnected. Caching data for device '{}'.", data.device_id); data_cache.add(data); } }; @@ -163,68 +143,67 @@ int main(int argc, char *argv[]) TCPServer tcp_server(g_io_context, listen_ports, mqtt_client); SystemMonitor::SystemMonitor monitor; - if (!data_cache.open(config.getDataCacheDbPath())) - { + if (!data_cache.open(config.getDataCacheDbPath())) { spdlog::critical("Failed to initialize data cache at '{}'. Exiting.", config.getDataCacheDbPath()); return 1; } CacheUploader cache_uploader(g_io_context, mqtt_client, data_cache); - mqtt_client.set_connected_handler([&](const std::string &cause) - { - spdlog::info("MQTT client connected: {}", cause); - cache_uploader.start_upload(); }); + mqtt_client.set_connected_handler([&](const std::string& cause) { + spdlog::info("MQTT client connected: {}", cause); + cache_uploader.start_upload(); + }); mqtt_client.connect(); mqtt_router.start(); monitor.getCpuUtilization(); - boost::asio::steady_timer system_monitor_timer(g_io_context, - std::chrono::seconds(15)); + boost::asio::steady_timer system_monitor_timer(g_io_context, std::chrono::seconds(15)); - system_monitor_timer.async_wait(std::bind( - poll_system_metrics, std::ref(system_monitor_timer), std::ref(monitor), - std::ref(mqtt_client), std::ref(alarm_service))); + system_monitor_timer.async_wait(std::bind(poll_system_metrics, + std::ref(system_monitor_timer), std::ref(monitor), + std::ref(mqtt_client), std::ref(alarm_service))); device_manager.load_and_start(config.getDevicesConfigPath()); - WebServer web_server(monitor, device_manager, live_data_cache, - alarm_service, config.getWebServerPort()); - web_server.set_shutdown_handler([&]() - { - spdlog::warn("Received shutdown command from Web API. Shutting down."); - g_io_context.stop(); }); + WebServer web_server(monitor, device_manager, live_data_cache, alarm_service, + config.getWebServerPort()); + web_server.set_shutdown_handler([&]() { + spdlog::warn("Received shutdown command from Web API. Shutting down."); + g_io_context.stop(); + }); web_server.start(); + spdlog::info("Starting Video Pipeline Service..."); + video_pipeline.Start(cam_rtsp_input, algorithm_rtsp_output); + boost::asio::signal_set signals(g_io_context, SIGINT, SIGTERM); - signals.async_wait( - [&](const boost::system::error_code &error, int signal_number) - { - spdlog::warn("Interrupt signal ({}) received. Shutting down.", - signal_number); + signals.async_wait([&](const boost::system::error_code& error, int signal_number) { + spdlog::warn("Interrupt signal ({}) received. Shutting down.", signal_number); - spdlog::info("[Shutdown] A. Stopping device manager services..."); - device_manager.stop_all(); + spdlog::info("[Shutdown] A. Stopping device manager services..."); + device_manager.stop_all(); - spdlog::info("[Shutdown] B. Stopping web server..."); - web_server.stop(); + spdlog::info("[Shutdown] B. Stopping web server..."); + web_server.stop(); - spdlog::info("[Shutdown] C. Stopping alarm service..."); - alarm_service.stop(); + spdlog::info("[Shutdown] B2. Stopping video pipeline..."); + video_pipeline.Stop(); - spdlog::info("[Shutdown] D. Disconnecting from MQTT broker..."); - mqtt_client.disconnect(); + spdlog::info("[Shutdown] C. Stopping alarm service..."); + alarm_service.stop(); - spdlog::info("[Shutdown] E. Stopping main event loop..."); - g_io_context.stop(); - }); + spdlog::info("[Shutdown] D. Disconnecting from MQTT broker..."); + mqtt_client.disconnect(); + + spdlog::info("[Shutdown] E. Stopping main event loop..."); + g_io_context.stop(); + }); spdlog::info("All services are running. Press Ctrl+C to exit."); g_io_context.run(); - } - catch (const std::exception &e) - { + } catch (const std::exception& e) { spdlog::critical("An unhandled exception occurred: {}", e.what()); return 1; } diff --git a/src/videoService/algorithm_service.cc b/src/videoService/algorithm_service.cc new file mode 100644 index 0000000..22a81b9 --- /dev/null +++ b/src/videoService/algorithm_service.cc @@ -0,0 +1,45 @@ +#include "algorithm_service.hpp" + +#include + +#include + +AlgorithmService::AlgorithmService() {} +AlgorithmService::~AlgorithmService() {} + +absl::Status AlgorithmService::Init() { + spdlog::info("Initializing Algorithm Model..."); + // TODO: 在这里加载你的 AI 模型 (如 TensorRT, OpenVINO 等) + // 模拟加载耗时 + // std::this_thread::sleep_for(std::chrono::seconds(1)); + return absl::OkStatus(); +} + +absl::Status AlgorithmService::Process(cv::Mat& frame) { + if (frame.empty()) + return absl::InvalidArgumentError("Empty frame"); + + // --- 算法处理区域 START --- + + // 1. 模拟:绘制时间戳 + auto now = std::chrono::system_clock::now(); + std::time_t now_c = std::chrono::system_clock::to_time_t(now); + std::string timeStr = std::ctime(&now_c); + if (!timeStr.empty()) + timeStr.pop_back(); // 去掉换行符 + + cv::putText(frame, "Live: " + timeStr, cv::Point(30, 50), cv::FONT_HERSHEY_SIMPLEX, 1.0, + cv::Scalar(0, 255, 0), 2); + + // 2. 模拟:绘制检测框 (假设检测到了物体) + int centerX = frame.cols / 2; + int centerY = frame.rows / 2; + cv::rectangle(frame, cv::Rect(centerX - 100, centerY - 100, 200, 200), cv::Scalar(0, 0, 255), + 3); + cv::putText(frame, "Detected Object", cv::Point(centerX - 100, centerY - 110), + cv::FONT_HERSHEY_SIMPLEX, 0.8, cv::Scalar(0, 0, 255), 2); + + // --- 算法处理区域 END --- + + return absl::OkStatus(); +} diff --git a/src/videoService/algorithm_service.hpp b/src/videoService/algorithm_service.hpp new file mode 100644 index 0000000..ae788d7 --- /dev/null +++ b/src/videoService/algorithm_service.hpp @@ -0,0 +1,21 @@ +#ifndef ALGORITHM_SERVICE_HPP +#define ALGORITHM_SERVICE_HPP + +#include + +#include "absl/status/status.h" + +class AlgorithmService { +public: + AlgorithmService(); + ~AlgorithmService(); + + // 初始化算法模型 (例如加载 YOLO 权重) + absl::Status Init(); + + // 处理每一帧 + // frame: 输入输出参数,直接在原图上修改 + absl::Status Process(cv::Mat& frame); +}; + +#endif // ALGORITHM_SERVICE_HPP diff --git a/src/videoService/rtsp_camera_service.cc b/src/videoService/rtsp_camera_service.cc new file mode 100644 index 0000000..636bc98 --- /dev/null +++ b/src/videoService/rtsp_camera_service.cc @@ -0,0 +1,140 @@ +#include "rtsp_camera_service.hpp" + +// spdlog +#include +#include +#include + +// Abseil +#include "absl/strings/str_cat.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" + +static std::shared_ptr getLogger() { + auto logger = spdlog::get("camera_service"); + if (!logger) { + try { + auto consoleSink = std::make_shared(); + + auto fileSink = std::make_shared( + "logs/camera_service.log", true); + + std::vector sinks{consoleSink, fileSink}; + logger = std::make_shared("camera_service", sinks.begin(), sinks.end()); + spdlog::register_logger(logger); + + logger->set_pattern("[%Y-%m-%d %H:%M:%S.%e] [%n] [%^%l%$] %v"); + logger->set_level(spdlog::level::info); + + // 设置 flush 策略(可选):比如遇到 info 级别就立即刷新到文件,防止崩溃时日志丢失 + logger->flush_on(spdlog::level::info); + + } catch (const spdlog::spdlog_ex& ex) { + spdlog::error("Log init failed: {}", ex.what()); + return spdlog::default_logger(); + } + } + return logger; +} + +RtspCameraService::RtspCameraService() : isRunning(false) {} + +RtspCameraService::~RtspCameraService() { + this->Close(); +} + +absl::Status RtspCameraService::Open(const std::string& rtspUrl) { + this->cameraUrl = rtspUrl; + auto logger = getLogger(); + logger->info("Connecting to Camera via GStreamer: {}", rtspUrl); + + // --- 构建 GStreamer 读取管道 --- + // 1. rtspsrc: 从 RTSP 拉流 + // 2. latency=0: 低延迟设置 + // 3. rtph264depay + h264parse: 解析 H264 数据 + // 4. mppvideodec: [关键] 使用瑞芯微硬件解码器 + // 5. videoconvert: 确保格式转为 BGR 供 OpenCV 使用 (appsink 默认需 BGR) + // 6. appsink: 将数据传给 OpenCV + std::string gst_pipeline = absl::StrCat( + "rtspsrc location=", rtspUrl, " latency=0 protocols=tcp ! ", // 添加 protocols=tcp + "queue ! ", "rtph265depay ! h265parse ! ", "mppvideodec ! ", "videoconvert ! ", + "appsink sync=false"); + + logger->info("GStreamer Pipeline: {}", gst_pipeline); + + this->capture.open(gst_pipeline, cv::CAP_GSTREAMER); + + if (!this->capture.isOpened()) { + std::string errMsg = "Failed to open RTSP stream via GStreamer."; + logger->error(errMsg); + return absl::UnavailableError(errMsg); + } + + this->isRunning = true; + this->workerThread = std::thread(&RtspCameraService::captureThreadFunc, this); + + logger->info("Camera connected successfully."); + return absl::OkStatus(); +} + +void RtspCameraService::Close() { + if (this->isRunning.exchange(false)) { + getLogger()->info("Stopping camera service..."); + + // --- 修改开始 --- + if (this->capture.isOpened()) { + this->capture.release(); + getLogger()->info("Camera capture released."); + } + + if (this->workerThread.joinable()) { + this->workerThread.join(); + getLogger()->info("Camera worker thread joined."); + } + + getLogger()->info("Camera service stopped."); + } +} +absl::Status RtspCameraService::GetLatestFrame(cv::Mat& outFrame) { + absl::MutexLock lock(&this->frameMutex); + + if (this->currentFrame.empty()) { + return absl::NotFoundError("Frame buffer is empty"); + } + + this->currentFrame.copyTo(outFrame); + return absl::OkStatus(); +} + +void RtspCameraService::captureThreadFunc() { + auto logger = getLogger(); + cv::Mat tmpFrame; + + while (this->isRunning.load(std::memory_order_relaxed)) { + bool success = this->capture.read(tmpFrame); + if (!this->isRunning.load(std::memory_order_relaxed)) { + break; + } + if (!this->capture.read(tmpFrame)) { + logger->warn("Dropped connection or empty frame received from RTSP."); + + // 可以在此处添加重连逻辑 + absl::SleepFor(absl::Seconds(1)); + continue; + } + + if (tmpFrame.empty()) { + continue; + } + + { + // 更新共享的最新帧 + absl::MutexLock lock(&this->frameMutex); + tmpFrame.copyTo(this->currentFrame); + } + + // 适当微休眠以防 CPU 占用过高(根据实际帧率调整) + // 对于 30fps 的流,可以忽略或设置极短休眠 + absl::SleepFor(absl::Milliseconds(1)); + } +} diff --git a/src/videoService/rtsp_camera_service.hpp b/src/videoService/rtsp_camera_service.hpp new file mode 100644 index 0000000..6d674e2 --- /dev/null +++ b/src/videoService/rtsp_camera_service.hpp @@ -0,0 +1,38 @@ +#ifndef RTSP_CAMERA_SERVICE_HPP +#define RTSP_CAMERA_SERVICE_HPP + +#include +#include +#include +#include + +// Abseil 头文件 +#include "absl/status/status.h" +#include "absl/synchronization/mutex.h" + +class RtspCameraService { +public: + RtspCameraService(); + ~RtspCameraService(); + + absl::Status Open(const std::string& rtspUrl); + void Close(); + + absl::Status GetLatestFrame(cv::Mat& outFrame); + +private: + void captureThreadFunc(); + +private: + cv::VideoCapture capture; + + absl::Mutex frameMutex; + + cv::Mat currentFrame; // GUARDED_BY(frameMutex) + + std::thread workerThread; + std::atomic isRunning; + std::string cameraUrl; +}; + +#endif // RTSP_CAMERA_SERVICE_HPP diff --git a/src/videoService/rtsp_stream_pusher.cc b/src/videoService/rtsp_stream_pusher.cc new file mode 100644 index 0000000..1c3689e --- /dev/null +++ b/src/videoService/rtsp_stream_pusher.cc @@ -0,0 +1,88 @@ +#include "rtsp_stream_pusher.hpp" + +#include + +#include "absl/strings/str_format.h" + +RtspStreamPusher::RtspStreamPusher() {} + +RtspStreamPusher::~RtspStreamPusher() { + this->Close(); +} + +absl::Status RtspStreamPusher::Open(const std::string& rtspUrl, int width, int height, int fps) { + if (this->isOpened) { + return absl::OkStatus(); + } + + this->width = width; + this->height = height; + + // --- GStreamer 命令构建 --- + // 1. fdsrc: 从文件描述符读取 (这里默认是 stdin) + // 2. videoparse: 解析原始数据格式 + // - format=bgr: OpenCV 默认是 BGR + // - width/height/framerate: 必须与输入一致 + // 3. videoconvert: 转换颜色空间 (BGR -> I420/YUV) 以便编码 + // 4. x264enc: H.264 编码器 + // - tune=zerolatency: 低延迟模式 + // - speed-preset=ultrafast: 极速编码 + // 5. rtspclientsink: 推流到 RTSP 服务器 (MediaMTX) + // - protocols=tcp: 强制使用 TCP,通常网络穿透性更好 + std::string cmd = absl::StrFormat( + "gst-launch-1.0 -v fdsrc ! " + "videoparse width=%d height=%d framerate=%d/1 format=bgr ! " + "queue max-size-buffers=2 leaky=downstream ! " + "videoconvert ! " + "video/x-raw,format=NV12 ! " + "mpph264enc gop=25 rc-mode=fixqp qp-init=26 ! " + "h264parse ! " + "rtspclientsink location=%s latency=0 protocols=tcp", + width, height, fps, rtspUrl); + + spdlog::info("Starting GStreamer pipe: {}", cmd); + + // 打开管道 + // 这里的 "w" 表示我们可以向这个进程写入数据 (Write) + this->gstPipe = popen(cmd.c_str(), "w"); + + if (!this->gstPipe) { + return absl::InternalError("Failed to open GStreamer pipe"); + } + + this->isOpened = true; + return absl::OkStatus(); +} + +void RtspStreamPusher::Close() { + if (this->gstPipe) { + pclose(this->gstPipe); + this->gstPipe = nullptr; + } + this->isOpened = false; +} + +absl::Status RtspStreamPusher::PushFrame(const cv::Mat& frame) { + if (!this->isOpened || !this->gstPipe) { + return absl::FailedPreconditionError("Pusher is not open"); + } + + if (frame.empty()) { + return absl::InvalidArgumentError("Frame is empty"); + } + + // 再次确认尺寸,GStreamer 的 videoparse 对尺寸非常敏感, + // 如果写入的数据量不对,管道会立即报错断开。 + if (frame.cols != this->width || frame.rows != this->height) { + return absl::InvalidArgumentError("Frame size mismatch"); + } + + // 写入原始数据 + size_t written = fwrite(frame.data, 1, frame.total() * frame.elemSize(), this->gstPipe); + if (written != frame.total() * frame.elemSize()) { + spdlog::error("Failed to write frame to GStreamer pipe"); + return absl::DataLossError("Failed to write full frame"); + } + + return absl::OkStatus(); +} diff --git a/src/videoService/rtsp_stream_pusher.hpp b/src/videoService/rtsp_stream_pusher.hpp new file mode 100644 index 0000000..d4a65d5 --- /dev/null +++ b/src/videoService/rtsp_stream_pusher.hpp @@ -0,0 +1,34 @@ +#ifndef RTSP_STREAM_PUSHER_HPP +#define RTSP_STREAM_PUSHER_HPP + +#include // for popen +#include +#include + +// Abseil +#include "absl/status/status.h" + +class RtspStreamPusher { +public: + RtspStreamPusher(); + ~RtspStreamPusher(); + + // 初始化推流器 + // rtspUrl: 目标推流地址 (例如 rtsp://127.0.0.1:8554/processed) + // width, height, fps: 输出视频的参数 + absl::Status Open(const std::string& rtspUrl, int width, int height, int fps); + + // 关闭推流 + void Close(); + + // 推送一帧图像 + absl::Status PushFrame(const cv::Mat& frame); + +private: + FILE* gstPipe = nullptr; + int width = 0; + int height = 0; + bool isOpened = false; +}; + +#endif // RTSP_STREAM_PUSHER_HPP diff --git a/src/videoService/video_pipeline.cc b/src/videoService/video_pipeline.cc new file mode 100644 index 0000000..e21493d --- /dev/null +++ b/src/videoService/video_pipeline.cc @@ -0,0 +1,100 @@ +#include "video_pipeline.hpp" + +#include + +VideoPipeline::VideoPipeline() : isRunning(false) {} + +VideoPipeline::~VideoPipeline() { + Stop(); +} + +void VideoPipeline::Start(const std::string& inputUrl, const std::string& outputUrl) { + if (isRunning) + return; + + this->inputUrl = inputUrl; + this->outputUrl = outputUrl; + this->isRunning = true; + + // 启动工作线程 + this->workThread = std::thread(&VideoPipeline::pipelineLoop, this); + spdlog::info("Video Pipeline Service Started."); +} + +void VideoPipeline::Stop() { + if (isRunning.exchange(false)) { + spdlog::info("Stopping Video Pipeline..."); + if (workThread.joinable()) { + workThread.join(); + } + + // 显式关闭各个组件 + camera.Close(); + pusher.Close(); + spdlog::info("Video Pipeline Stopped."); + } +} + +void VideoPipeline::pipelineLoop() { + // 1. 初始化算法 + if (!algo.Init().ok()) { + spdlog::error("Algorithm init failed."); + return; + } + + // 2. 打开摄像头 + // 简单的重试机制 + while (isRunning && !camera.Open(inputUrl).ok()) { + spdlog::warn("Retrying camera connection in 2s..."); + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + + // 3. 等待获取第一帧以确定分辨率 + cv::Mat frame; + while (isRunning) { + if (camera.GetLatestFrame(frame).ok() && !frame.empty()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + if (!isRunning) + return; // 如果在等待期间被关闭 + + // 4. 初始化推流器 + // 假设输出 25fps,分辨率与输入一致 + if (!pusher.Open(outputUrl, frame.cols, frame.rows, 25).ok()) { + spdlog::error("Failed to open RTSP pusher."); + return; + } + + spdlog::info("Pipeline loop running: Capture -> Algo -> Push"); + + // 5. 主处理循环 + while (isRunning) { + auto start = std::chrono::steady_clock::now(); + + if (!camera.GetLatestFrame(frame).ok()) { + // 获取失败 (可能是流断了),稍微休眠 + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + + absl::Status algoStatus = algo.Process(frame); + if (!algoStatus.ok()) { + spdlog::warn("Algorithm processing failed: {}", algoStatus.ToString()); + } + + absl::Status pushStatus = pusher.PushFrame(frame); + if (!pushStatus.ok()) { + spdlog::warn("Push frame failed: {}", pushStatus.ToString()); + } + + auto end = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(end - start).count(); + // 目标 25fps = 40ms/frame + if (elapsed < 40) { + std::this_thread::sleep_for(std::chrono::milliseconds(40 - elapsed)); + } + } +} diff --git a/src/videoService/video_pipeline.hpp b/src/videoService/video_pipeline.hpp new file mode 100644 index 0000000..cb78bc1 --- /dev/null +++ b/src/videoService/video_pipeline.hpp @@ -0,0 +1,40 @@ +#ifndef VIDEO_PIPELINE_HPP +#define VIDEO_PIPELINE_HPP + +#include +#include +#include + +#include "algorithm_service.hpp" +#include "rtsp_camera_service.hpp" +#include "rtsp_stream_pusher.hpp" + +class VideoPipeline { +public: + VideoPipeline(); + ~VideoPipeline(); + + // 启动流水线 + // inputUrl: 拉流地址 (摄像头) + // outputUrl: 推流地址 (MediaMTX) + void Start(const std::string& inputUrl, const std::string& outputUrl); + + // 停止流水线 + void Stop(); + +private: + void pipelineLoop(); + +private: + RtspCameraService camera; + RtspStreamPusher pusher; + AlgorithmService algo; + + std::string inputUrl; + std::string outputUrl; + + std::thread workThread; + std::atomic isRunning; +}; + +#endif // VIDEO_PIPELINE_HPP