// video_service.cc (最终稳定版: NV12 输入 + CPU 转换) #include "video_service.h" #include "opencv2/imgproc/imgproc.hpp" #include "spdlog/spdlog.h" #include #include #include // posix_memalign, free VideoService::VideoService(std::unique_ptr module, std::string input_url, std::string output_rtsp_url, nlohmann::json module_config) : module_(std::move(module)), input_url_(input_url), output_rtsp_url_(output_rtsp_url), module_config_(std::move(module_config)), running_(false) { gst_init(nullptr, nullptr); log_prefix_ = "[VideoService: " + input_url + "]"; spdlog::info("{} Created. Input: {}, Output: {}", log_prefix_, input_url_.c_str(), output_rtsp_url_.c_str()); } VideoService::~VideoService() { if (running_) { stop(); } } // 核心辅助函数:BGR -> NV12 (用于推流) void VideoService::bgr_to_nv12(const cv::Mat &src, std::vector &dst) { int w = src.cols; int h = src.rows; int y_size = w * h; int uv_size = y_size / 2; dst.resize(y_size + uv_size); cv::Mat i420_mat; cv::cvtColor(src, i420_mat, cv::COLOR_BGR2YUV_I420); memcpy(dst.data(), i420_mat.data, y_size); const uint8_t *u_src = i420_mat.data + y_size; const uint8_t *v_src = i420_mat.data + y_size + (y_size / 4); uint8_t *uv_dst = dst.data() + y_size; for (int i = 0; i < y_size / 4; ++i) { uv_dst[2 * i] = u_src[i]; // U uv_dst[2 * i + 1] = v_src[i]; // V } } // 辅助函数:创建 4K 对齐的 Mat cv::Mat VideoService::create_aligned_mat(int width, int height, int type) { size_t elem_size = cv::Mat(1, 1, type).elemSize(); size_t total_size = width * height * elem_size; void *ptr = nullptr; int ret = posix_memalign(&ptr, 4096, total_size); if (ret != 0 || !ptr) { spdlog::error("Fatal: Failed to allocate aligned memory!"); return cv::Mat(); } return cv::Mat(height, width, type, ptr); } bool VideoService::start() { if (!module_ || !module_->init(module_config_)) { spdlog::error("{} Failed to initialize analysis module!", log_prefix_); return false; } // ------------------------------------------------------------------------- // [关键修改] 更改输入 Pipeline 为 NV12 // 移除了 'videoconvert' 和 'format=BGR',消除了 GStreamer 内部 RGA 的竞争 // ------------------------------------------------------------------------- std::string gst_input_pipeline = "rtspsrc location=" + input_url_ + " latency=0 protocols=tcp ! " "rtph265depay ! " "h265parse ! " "mppvideodec ! " // mpp 解码默认输出 NV12 "video/x-raw,format=NV12 ! " "appsink"; spdlog::info("Try to Open RTSP Stream (NV12 Mode)"); capture_.open(gst_input_pipeline, cv::CAP_GSTREAMER); if (!capture_.isOpened()) { printf("Error: Could not open RTSP stream: %s\n", input_url_.c_str()); return false; } // 注意:在 NV12 模式下,capture_.get 可能返回包含 padding 的尺寸 // 或者 OpenCV 会将 NV12 读取为 height * 1.5 的单通道图像 frame_width_ = static_cast(capture_.get(cv::CAP_PROP_FRAME_WIDTH)); frame_height_ = static_cast(capture_.get(cv::CAP_PROP_FRAME_HEIGHT)); frame_fps_ = capture_.get(cv::CAP_PROP_FPS); if (frame_fps_ <= 0) frame_fps_ = 25.0; // 读取一帧以确认真实的图像尺寸 cv::Mat test_frame; if (capture_.read(test_frame)) { // NV12 判定:如果是单通道且高度是宽度的 1.5 倍 (或接近) if (test_frame.type() == CV_8UC1) { // 修正 frame_height (去除 UV 部分的高度) frame_height_ = (test_frame.rows * 2) / 3; frame_width_ = test_frame.cols; } else { frame_width_ = test_frame.cols; frame_height_ = test_frame.rows; } std::lock_guard lock(frame_mutex_); latest_frame_ = test_frame; new_frame_available_ = true; } else { return false; } printf("RTSP stream opened! Real Res: %dx%d @ %.2f FPS (Mode: %s)\n", frame_width_, frame_height_, frame_fps_, (latest_frame_.type() == CV_8UC1 ? "NV12" : "BGR")); // --- 输出部分保持不变 --- std::string gst_out_pipeline = "appsrc name=mysource is-live=true format=3 ! " "queue max-size-buffers=2 leaky=downstream ! " "video/x-raw,format=NV12,width=" + std::to_string(frame_width_) + ",height=" + std::to_string(frame_height_) + ",framerate=" + std::to_string((int)frame_fps_) + "/1 ! " "mpph264enc gop=25 rc-mode=fixqp qp-init=26 ! " "h264parse ! " "rtspclientsink location=" + output_rtsp_url_ + " latency=0 protocols=tcp"; GError *error = nullptr; gst_pipeline_ = gst_parse_launch(gst_out_pipeline.c_str(), &error); if (error) { spdlog::error("Failed to parse output pipeline: {}", error->message); g_error_free(error); return false; } gst_appsrc_ = gst_bin_get_by_name(GST_BIN(gst_pipeline_), "mysource"); if (!gst_appsrc_) { spdlog::error("Failed to get 'mysource' from pipeline"); return false; } gst_element_set_state(gst_pipeline_, GST_STATE_PLAYING); printf("GStreamer Output Pipeline started manually.\n"); running_ = true; reading_thread_ = std::thread(&VideoService::reading_loop, this); processing_thread_ = std::thread(&VideoService::processing_loop, this); return true; } void VideoService::stop() { printf("Stopping VideoService...\n"); running_ = false; frame_cv_.notify_all(); if (reading_thread_.joinable()) reading_thread_.join(); if (processing_thread_.joinable()) processing_thread_.join(); if (capture_.isOpened()) capture_.release(); if (gst_pipeline_) { gst_element_set_state(gst_pipeline_, GST_STATE_NULL); gst_object_unref(gst_pipeline_); gst_pipeline_ = nullptr; } if (gst_appsrc_) { gst_object_unref(gst_appsrc_); gst_appsrc_ = nullptr; } module_->stop(); module_.reset(); printf("VideoService stopped.\n"); } void VideoService::reading_loop() { cv::Mat frame; while (running_) { if (!capture_.read(frame)) { running_ = false; break; } if (frame.empty()) continue; { std::lock_guard lock(frame_mutex_); latest_frame_ = frame; new_frame_available_ = true; } frame_cv_.notify_one(); } frame_cv_.notify_all(); } void VideoService::processing_loop() { cv::Mat raw_frame; // RGA 专用 4K 对齐内存 (用于传给 AI 模块) // 依然保留,因为 module_->process 可能需要稳定的 BGR/RGBA 输入 cv::Mat frame_rgba = create_aligned_mat(frame_width_, frame_height_, CV_8UC4); // 临时 BGR 帧 (CPU 转换用) cv::Mat frame_bgr; if (frame_rgba.empty() || frame_rgba.data == nullptr) { spdlog::error("Fatal: Failed to allocate aligned buffer for RGA!"); return; } std::vector nv12_buffer; spdlog::info("Processing thread ready. (CPU NV12->BGR enabled)"); while (running_) { { std::unique_lock lock(frame_mutex_); frame_cv_.wait(lock, [&] { return new_frame_available_ || !running_; }); if (!running_) break; raw_frame = latest_frame_.clone(); new_frame_available_ = false; } if (raw_frame.empty()) continue; // --------------------------------------------------------------- // [关键修正] CPU 格式转换 (NV12 -> BGR) // --------------------------------------------------------------- if (raw_frame.type() == CV_8UC1 && raw_frame.rows == frame_height_ * 3 / 2) { // 输入是 NV12,使用 CPU 转换为 BGR // 这避免了使用不稳定的 GStreamer RGA 插件 cv::cvtColor(raw_frame, frame_bgr, cv::COLOR_YUV2BGR_NV12); } else { // 如果已经是 BGR (fallback) frame_bgr = raw_frame; } // --------------------------------------------------------------- // [准备 AI 输入] BGR -> RGBA (写入对齐内存) // --------------------------------------------------------------- // rkYolov8 内部已经加锁,这里传递 4K 对齐内存也是安全的 cv::cvtColor(frame_bgr, frame_rgba, cv::COLOR_BGR2RGBA); // --------------------------------------------------------------- // [调用 AI 模块] // --------------------------------------------------------------- // 模块会在 frame_rgba 上进行检测,并在 frame_rgba (或者我们需要传 BGR?) // 等等,module_->process 接收的是引用并绘制结果。 // HumanDetectionModule 的 draw_results 是用 opencv 绘图的。 // 为了让绘图结果能推流出去,我们应该让 module 处理 frame_bgr。 // 修正:rkYolov8::infer 内部只读,不修改。 // HumanDetectionModule::process 会调用 draw_results 修改图像。 // 我们传入 frame_bgr 给 AI 模块 (它内部会转 RGBA 传给 NPU,这没问题)。 // 这里的 frame_bgr 是 CPU 内存,不是 4K 对齐的,但 rkYolov8 现在有锁且能处理非对齐。 // 或者,为了极致性能和匹配之前的逻辑,我们还是传 frame_rgba 进去? // HumanDetectionModule::process 接收 Mat&。 // 如果我们传 frame_rgba,它画框也是画在 RGBA 上。 if (!module_->process(frame_rgba)) { // process fail } // --------------------------------------------------------------- // [推流部分] RGBA -> NV12 // --------------------------------------------------------------- if (gst_appsrc_) { // 将画好框的 RGBA 转回 NV12 推流 // 注意:bgr_to_nv12 原本是 BGR->NV12。我们需要适配一下,或者转回 BGR。 // 简单的做法:RGBA -> BGR -> NV12 (虽然多了一步,但逻辑简单) cv::Mat temp_bgr; cv::cvtColor(frame_rgba, temp_bgr, cv::COLOR_RGBA2BGR); bgr_to_nv12(temp_bgr, nv12_buffer); guint size = nv12_buffer.size(); GstBuffer *buffer = gst_buffer_new_allocate(NULL, size, NULL); GstMapInfo map; gst_buffer_map(buffer, &map, GST_MAP_WRITE); memcpy(map.data, nv12_buffer.data(), size); gst_buffer_unmap(buffer, &map); GstFlowReturn ret; g_signal_emit_by_name(gst_appsrc_, "push-buffer", buffer, &ret); gst_buffer_unref(buffer); } } // 释放手动分配的内存 if (frame_rgba.data) free(frame_rgba.data); spdlog::info("VideoService: Processing loop finished."); }