feat(videoService): 完成视频服务的链路打通,从采集摄像头信号到转发。

This commit is contained in:
GuanYuankai 2025-12-31 16:30:10 +08:00
parent 4302cf6d4f
commit 021133c0b4
12 changed files with 201 additions and 493 deletions

View File

@ -63,10 +63,7 @@ add_library(vehicle_road_lib STATIC
#
src/alarm/alarm_service.cc
#
src/videoService/rtsp_camera_service.cc
src/videoService/rtsp_stream_pusher.cc
src/videoService/algorithm_service.cc
src/videoService/video_pipeline.cc
src/videoService/video_pipeline.cpp
)
target_include_directories(vehicle_road_lib PUBLIC

9
restart.sh Normal file
View File

@ -0,0 +1,9 @@
#!/bin/bash
# 强制移除可能冲突的容器
docker rm -f media-gateway || true
# 启动服务
docker compose up -d
# 查看日志
docker compose logs -f

View File

@ -75,6 +75,9 @@ void poll_system_metrics(boost::asio::steady_timer& timer, SystemMonitor::System
int main(int argc, char* argv[]) {
// TODO: [GYK] DEV#1: 将 URL 放入 config.json 中读取
std::string cam_rtsp_input = "rtsp://admin:123456@192.168.1.57:554/stream0";
// std::string cam_rtsp_input =
// "rtsp://admin:hzx12345@192.168.1.10:554/Streaming/Channels/1901";
std::string algorithm_rtsp_output = "rtsp://127.0.0.1:8554/processed";
const std::string config_path = "/app/config/config.json";
if (!ConfigManager::getInstance().load(config_path)) {
@ -100,7 +103,8 @@ int main(int argc, char* argv[]) {
}
try {
spdlog::info("Initializing Video Service...");
VideoPipeline video_pipeline;
std::string output_stream_url = "rtsp://127.0.0.1:8554/processed";
DataCache data_cache;
LiveDataCache live_data_cache;
@ -108,7 +112,7 @@ int main(int argc, char* argv[]) {
AlarmService alarm_service(g_io_context, mqtt_client);
VideoPipeline video_pipeline;
// VideoPipeline video_pipeline;
if (!alarm_service.load_rules(config.getAlarmRulesPath())) {
spdlog::error("Failed to load alarm rules. Alarms may be disabled.");
@ -176,7 +180,8 @@ int main(int argc, char* argv[]) {
web_server.start();
spdlog::info("Starting Video Pipeline Service...");
video_pipeline.Start(cam_rtsp_input, algorithm_rtsp_output);
video_pipeline.Start(cam_rtsp_input, output_stream_url);
// video_pipeline.Start(cam_rtsp_input, algorithm_rtsp_output);
boost::asio::signal_set signals(g_io_context, SIGINT, SIGTERM);
signals.async_wait([&](const boost::system::error_code& error, int signal_number) {

View File

@ -1,45 +0,0 @@
#include "algorithm_service.hpp"
#include <spdlog/spdlog.h>
#include <chrono>
AlgorithmService::AlgorithmService() {}
AlgorithmService::~AlgorithmService() {}
absl::Status AlgorithmService::Init() {
spdlog::info("Initializing Algorithm Model...");
// TODO: 在这里加载你的 AI 模型 (如 TensorRT, OpenVINO 等)
// 模拟加载耗时
// std::this_thread::sleep_for(std::chrono::seconds(1));
return absl::OkStatus();
}
absl::Status AlgorithmService::Process(cv::Mat& frame) {
if (frame.empty())
return absl::InvalidArgumentError("Empty frame");
// --- 算法处理区域 START ---
// 1. 模拟:绘制时间戳
auto now = std::chrono::system_clock::now();
std::time_t now_c = std::chrono::system_clock::to_time_t(now);
std::string timeStr = std::ctime(&now_c);
if (!timeStr.empty())
timeStr.pop_back(); // 去掉换行符
cv::putText(frame, "Live: " + timeStr, cv::Point(30, 50), cv::FONT_HERSHEY_SIMPLEX, 1.0,
cv::Scalar(0, 255, 0), 2);
// 2. 模拟:绘制检测框 (假设检测到了物体)
int centerX = frame.cols / 2;
int centerY = frame.rows / 2;
cv::rectangle(frame, cv::Rect(centerX - 100, centerY - 100, 200, 200), cv::Scalar(0, 0, 255),
3);
cv::putText(frame, "Detected Object", cv::Point(centerX - 100, centerY - 110),
cv::FONT_HERSHEY_SIMPLEX, 0.8, cv::Scalar(0, 0, 255), 2);
// --- 算法处理区域 END ---
return absl::OkStatus();
}

View File

@ -1,21 +0,0 @@
#ifndef ALGORITHM_SERVICE_HPP
#define ALGORITHM_SERVICE_HPP
#include <opencv2/opencv.hpp>
#include "absl/status/status.h"
class AlgorithmService {
public:
AlgorithmService();
~AlgorithmService();
// 初始化算法模型 (例如加载 YOLO 权重)
absl::Status Init();
// 处理每一帧
// frame: 输入输出参数,直接在原图上修改
absl::Status Process(cv::Mat& frame);
};
#endif // ALGORITHM_SERVICE_HPP

View File

@ -1,140 +0,0 @@
#include "rtsp_camera_service.hpp"
// spdlog
#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/spdlog.h>
// Abseil
#include "absl/strings/str_cat.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
static std::shared_ptr<spdlog::logger> getLogger() {
auto logger = spdlog::get("camera_service");
if (!logger) {
try {
auto consoleSink = std::make_shared<spdlog::sinks::stdout_color_sink_mt>();
auto fileSink = std::make_shared<spdlog::sinks::basic_file_sink_mt>(
"logs/camera_service.log", true);
std::vector<spdlog::sink_ptr> sinks{consoleSink, fileSink};
logger = std::make_shared<spdlog::logger>("camera_service", sinks.begin(), sinks.end());
spdlog::register_logger(logger);
logger->set_pattern("[%Y-%m-%d %H:%M:%S.%e] [%n] [%^%l%$] %v");
logger->set_level(spdlog::level::info);
// 设置 flush 策略(可选):比如遇到 info 级别就立即刷新到文件,防止崩溃时日志丢失
logger->flush_on(spdlog::level::info);
} catch (const spdlog::spdlog_ex& ex) {
spdlog::error("Log init failed: {}", ex.what());
return spdlog::default_logger();
}
}
return logger;
}
RtspCameraService::RtspCameraService() : isRunning(false) {}
RtspCameraService::~RtspCameraService() {
this->Close();
}
absl::Status RtspCameraService::Open(const std::string& rtspUrl) {
this->cameraUrl = rtspUrl;
auto logger = getLogger();
logger->info("Connecting to Camera via GStreamer: {}", rtspUrl);
// --- 构建 GStreamer 读取管道 ---
// 1. rtspsrc: 从 RTSP 拉流
// 2. latency=0: 低延迟设置
// 3. rtph264depay + h264parse: 解析 H264 数据
// 4. mppvideodec: [关键] 使用瑞芯微硬件解码器
// 5. videoconvert: 确保格式转为 BGR 供 OpenCV 使用 (appsink 默认需 BGR)
// 6. appsink: 将数据传给 OpenCV
std::string gst_pipeline = absl::StrCat(
"rtspsrc location=", rtspUrl, " latency=0 protocols=tcp ! ", // 添加 protocols=tcp
"queue ! ", "rtph265depay ! h265parse ! ", "mppvideodec ! ", "videoconvert ! ",
"appsink sync=false");
logger->info("GStreamer Pipeline: {}", gst_pipeline);
this->capture.open(gst_pipeline, cv::CAP_GSTREAMER);
if (!this->capture.isOpened()) {
std::string errMsg = "Failed to open RTSP stream via GStreamer.";
logger->error(errMsg);
return absl::UnavailableError(errMsg);
}
this->isRunning = true;
this->workerThread = std::thread(&RtspCameraService::captureThreadFunc, this);
logger->info("Camera connected successfully.");
return absl::OkStatus();
}
void RtspCameraService::Close() {
if (this->isRunning.exchange(false)) {
getLogger()->info("Stopping camera service...");
// --- 修改开始 ---
if (this->capture.isOpened()) {
this->capture.release();
getLogger()->info("Camera capture released.");
}
if (this->workerThread.joinable()) {
this->workerThread.join();
getLogger()->info("Camera worker thread joined.");
}
getLogger()->info("Camera service stopped.");
}
}
absl::Status RtspCameraService::GetLatestFrame(cv::Mat& outFrame) {
absl::MutexLock lock(&this->frameMutex);
if (this->currentFrame.empty()) {
return absl::NotFoundError("Frame buffer is empty");
}
this->currentFrame.copyTo(outFrame);
return absl::OkStatus();
}
void RtspCameraService::captureThreadFunc() {
auto logger = getLogger();
cv::Mat tmpFrame;
while (this->isRunning.load(std::memory_order_relaxed)) {
bool success = this->capture.read(tmpFrame);
if (!this->isRunning.load(std::memory_order_relaxed)) {
break;
}
if (!this->capture.read(tmpFrame)) {
logger->warn("Dropped connection or empty frame received from RTSP.");
// 可以在此处添加重连逻辑
absl::SleepFor(absl::Seconds(1));
continue;
}
if (tmpFrame.empty()) {
continue;
}
{
// 更新共享的最新帧
absl::MutexLock lock(&this->frameMutex);
tmpFrame.copyTo(this->currentFrame);
}
// 适当微休眠以防 CPU 占用过高(根据实际帧率调整)
// 对于 30fps 的流,可以忽略或设置极短休眠
absl::SleepFor(absl::Milliseconds(1));
}
}

View File

@ -1,38 +0,0 @@
#ifndef RTSP_CAMERA_SERVICE_HPP
#define RTSP_CAMERA_SERVICE_HPP
#include <atomic>
#include <opencv2/opencv.hpp>
#include <string>
#include <thread>
// Abseil 头文件
#include "absl/status/status.h"
#include "absl/synchronization/mutex.h"
class RtspCameraService {
public:
RtspCameraService();
~RtspCameraService();
absl::Status Open(const std::string& rtspUrl);
void Close();
absl::Status GetLatestFrame(cv::Mat& outFrame);
private:
void captureThreadFunc();
private:
cv::VideoCapture capture;
absl::Mutex frameMutex;
cv::Mat currentFrame; // GUARDED_BY(frameMutex)
std::thread workerThread;
std::atomic<bool> isRunning;
std::string cameraUrl;
};
#endif // RTSP_CAMERA_SERVICE_HPP

View File

@ -1,88 +0,0 @@
#include "rtsp_stream_pusher.hpp"
#include <spdlog/spdlog.h>
#include "absl/strings/str_format.h"
RtspStreamPusher::RtspStreamPusher() {}
RtspStreamPusher::~RtspStreamPusher() {
this->Close();
}
absl::Status RtspStreamPusher::Open(const std::string& rtspUrl, int width, int height, int fps) {
if (this->isOpened) {
return absl::OkStatus();
}
this->width = width;
this->height = height;
// --- GStreamer 命令构建 ---
// 1. fdsrc: 从文件描述符读取 (这里默认是 stdin)
// 2. videoparse: 解析原始数据格式
// - format=bgr: OpenCV 默认是 BGR
// - width/height/framerate: 必须与输入一致
// 3. videoconvert: 转换颜色空间 (BGR -> I420/YUV) 以便编码
// 4. x264enc: H.264 编码器
// - tune=zerolatency: 低延迟模式
// - speed-preset=ultrafast: 极速编码
// 5. rtspclientsink: 推流到 RTSP 服务器 (MediaMTX)
// - protocols=tcp: 强制使用 TCP通常网络穿透性更好
std::string cmd = absl::StrFormat(
"gst-launch-1.0 -v fdsrc ! "
"videoparse width=%d height=%d framerate=%d/1 format=bgr ! "
"queue max-size-buffers=2 leaky=downstream ! "
"videoconvert ! "
"video/x-raw,format=NV12 ! "
"mpph264enc gop=25 rc-mode=fixqp qp-init=26 ! "
"h264parse ! "
"rtspclientsink location=%s latency=0 protocols=tcp",
width, height, fps, rtspUrl);
spdlog::info("Starting GStreamer pipe: {}", cmd);
// 打开管道
// 这里的 "w" 表示我们可以向这个进程写入数据 (Write)
this->gstPipe = popen(cmd.c_str(), "w");
if (!this->gstPipe) {
return absl::InternalError("Failed to open GStreamer pipe");
}
this->isOpened = true;
return absl::OkStatus();
}
void RtspStreamPusher::Close() {
if (this->gstPipe) {
pclose(this->gstPipe);
this->gstPipe = nullptr;
}
this->isOpened = false;
}
absl::Status RtspStreamPusher::PushFrame(const cv::Mat& frame) {
if (!this->isOpened || !this->gstPipe) {
return absl::FailedPreconditionError("Pusher is not open");
}
if (frame.empty()) {
return absl::InvalidArgumentError("Frame is empty");
}
// 再次确认尺寸GStreamer 的 videoparse 对尺寸非常敏感,
// 如果写入的数据量不对,管道会立即报错断开。
if (frame.cols != this->width || frame.rows != this->height) {
return absl::InvalidArgumentError("Frame size mismatch");
}
// 写入原始数据
size_t written = fwrite(frame.data, 1, frame.total() * frame.elemSize(), this->gstPipe);
if (written != frame.total() * frame.elemSize()) {
spdlog::error("Failed to write frame to GStreamer pipe");
return absl::DataLossError("Failed to write full frame");
}
return absl::OkStatus();
}

View File

@ -1,34 +0,0 @@
#ifndef RTSP_STREAM_PUSHER_HPP
#define RTSP_STREAM_PUSHER_HPP
#include <cstdio> // for popen
#include <opencv2/opencv.hpp>
#include <string>
// Abseil
#include "absl/status/status.h"
class RtspStreamPusher {
public:
RtspStreamPusher();
~RtspStreamPusher();
// 初始化推流器
// rtspUrl: 目标推流地址 (例如 rtsp://127.0.0.1:8554/processed)
// width, height, fps: 输出视频的参数
absl::Status Open(const std::string& rtspUrl, int width, int height, int fps);
// 关闭推流
void Close();
// 推送一帧图像
absl::Status PushFrame(const cv::Mat& frame);
private:
FILE* gstPipe = nullptr;
int width = 0;
int height = 0;
bool isOpened = false;
};
#endif // RTSP_STREAM_PUSHER_HPP

View File

@ -1,100 +0,0 @@
#include "video_pipeline.hpp"
#include <spdlog/spdlog.h>
VideoPipeline::VideoPipeline() : isRunning(false) {}
VideoPipeline::~VideoPipeline() {
Stop();
}
void VideoPipeline::Start(const std::string& inputUrl, const std::string& outputUrl) {
if (isRunning)
return;
this->inputUrl = inputUrl;
this->outputUrl = outputUrl;
this->isRunning = true;
// 启动工作线程
this->workThread = std::thread(&VideoPipeline::pipelineLoop, this);
spdlog::info("Video Pipeline Service Started.");
}
void VideoPipeline::Stop() {
if (isRunning.exchange(false)) {
spdlog::info("Stopping Video Pipeline...");
if (workThread.joinable()) {
workThread.join();
}
// 显式关闭各个组件
camera.Close();
pusher.Close();
spdlog::info("Video Pipeline Stopped.");
}
}
void VideoPipeline::pipelineLoop() {
// 1. 初始化算法
if (!algo.Init().ok()) {
spdlog::error("Algorithm init failed.");
return;
}
// 2. 打开摄像头
// 简单的重试机制
while (isRunning && !camera.Open(inputUrl).ok()) {
spdlog::warn("Retrying camera connection in 2s...");
std::this_thread::sleep_for(std::chrono::seconds(2));
}
// 3. 等待获取第一帧以确定分辨率
cv::Mat frame;
while (isRunning) {
if (camera.GetLatestFrame(frame).ok() && !frame.empty()) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
if (!isRunning)
return; // 如果在等待期间被关闭
// 4. 初始化推流器
// 假设输出 25fps分辨率与输入一致
if (!pusher.Open(outputUrl, frame.cols, frame.rows, 25).ok()) {
spdlog::error("Failed to open RTSP pusher.");
return;
}
spdlog::info("Pipeline loop running: Capture -> Algo -> Push");
// 5. 主处理循环
while (isRunning) {
auto start = std::chrono::steady_clock::now();
if (!camera.GetLatestFrame(frame).ok()) {
// 获取失败 (可能是流断了),稍微休眠
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
absl::Status algoStatus = algo.Process(frame);
if (!algoStatus.ok()) {
spdlog::warn("Algorithm processing failed: {}", algoStatus.ToString());
}
absl::Status pushStatus = pusher.PushFrame(frame);
if (!pushStatus.ok()) {
spdlog::warn("Push frame failed: {}", pushStatus.ToString());
}
auto end = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
// 目标 25fps = 40ms/frame
if (elapsed < 40) {
std::this_thread::sleep_for(std::chrono::milliseconds(40 - elapsed));
}
}
}

View File

@ -0,0 +1,160 @@
#include "video_pipeline.hpp"
#include <chrono>
VideoPipeline::VideoPipeline() : running_(false) {}
VideoPipeline::~VideoPipeline() {
Stop();
}
void VideoPipeline::Start(const std::string& inputUrl, const std::string& outputUrl) {
if (running_)
return;
running_ = true;
spdlog::info("Starting VideoPipeline with Input: {}", inputUrl);
processingThread_ = std::thread(&VideoPipeline::processLoop, this, inputUrl, outputUrl);
}
void VideoPipeline::Stop() {
if (!running_)
return;
running_ = false;
if (processingThread_.joinable()) {
processingThread_.join();
}
spdlog::info("VideoPipeline Stopped.");
}
std::vector<DetectionResult> VideoPipeline::mockInference(const cv::Mat& frame) {
std::vector<DetectionResult> results;
static int dummyX = 100;
static int direction = 5;
// 简单的移动逻辑,模拟每帧的变化
dummyX += direction;
if (dummyX > frame.cols - 200 || dummyX < 0)
direction *= -1;
DetectionResult res;
res.x = dummyX;
res.y = 200;
res.width = 150;
res.height = 300;
res.label = "EV CAR";
res.confidence = 0.95f;
results.push_back(res);
return results;
}
void VideoPipeline::drawOverlay(cv::Mat& frame, const std::vector<DetectionResult>& results) {
for (const auto& res : results) {
cv::rectangle(frame, cv::Rect(res.x, res.y, res.width, res.height), cv::Scalar(0, 255, 0),
2);
std::string text = res.label + " " + std::to_string(res.confidence).substr(0, 4);
cv::putText(frame, text, cv::Point(res.x, res.y - 5), cv::FONT_HERSHEY_SIMPLEX, 0.6,
cv::Scalar(0, 255, 0), 2);
}
cv::putText(frame, "RK3588 H.264 @ 20FPS", cv::Point(20, 50), cv::FONT_HERSHEY_SIMPLEX, 1.0,
cv::Scalar(0, 0, 255), 2);
}
void VideoPipeline::processLoop(std::string inputUrl, std::string outputUrl) {
cv::VideoCapture cap;
// [MOD] 尝试设置 FFmpeg 后端参数以减少延迟(可选,依赖于 OpenCV 版本)
// os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp" //
// C++中通常通过API设置或环境变量
cap.open(inputUrl);
if (!cap.isOpened()) {
spdlog::error("Failed to open input RTSP stream: {}", inputUrl);
running_ = false;
return;
}
// [MOD] 强制指定 20 FPS
// 虽然 cap.get 可能读取到 20但为了稳健性我们在输出端强制使用 20
const double TARGET_FPS = 20.0;
int width = cap.get(cv::CAP_PROP_FRAME_WIDTH);
int height = cap.get(cv::CAP_PROP_FRAME_HEIGHT);
spdlog::info("Video Source: {}x{} | Input FPS: {} | Target Output FPS: {}", width, height,
cap.get(cv::CAP_PROP_FPS), TARGET_FPS);
// === [MOD] 优化后的 GStreamer H.264 推流管道 ===
// 1. appsrc: OpenCV 数据源
// 2. videoconvert: 像素格式转换
// 3. capsfilter: 强制转换为 NV12 (RK3588 编码器首选格式) 并 锁定 20/1 帧率
// 4. mpph264enc: Rockchip 硬件 H.264 编码器
// 5. h264parse: 解析 NALU对 RTSP 传输至关重要
// 6. rtspclientsink: 推流到 MediaMTX
std::stringstream pipeline;
pipeline << "appsrc ! "
<< "videoconvert ! "
<< "video/x-raw,format=NV12,width=" << width << ",height=" << height
<< ",framerate=20/1 ! "
<< "mpph264enc ! "
<< "h264parse ! "
<< "rtspclientsink location=" << outputUrl
<< " protocols=tcp"; // [MOD] 使用 TCP 协议推流更稳定
spdlog::debug("GStreamer Pipeline: {}", pipeline.str());
cv::VideoWriter writer;
// [MOD] 在 open 时传入 TARGET_FPS (20.0)
writer.open(pipeline.str(), cv::CAP_GSTREAMER, 0, TARGET_FPS, cv::Size(width, height), true);
if (!writer.isOpened()) {
spdlog::error("Failed to initialize VideoWriter. Check GStreamer plugins.");
}
cv::Mat frame;
// [MOD] 帧率控制辅助
// 如果摄像头实际输出稍微快于或慢于20帧OpenCV的阻塞读取会自动对齐
// 但如果源断流,我们需要处理
while (running_) {
// [MOD] 记录时间以监测实际处理耗时
auto start = std::chrono::steady_clock::now();
if (!cap.read(frame)) {
spdlog::warn("Frame read failed. Reconnecting...");
std::this_thread::sleep_for(std::chrono::seconds(1));
cap.release();
cap.open(inputUrl);
continue;
}
if (frame.empty())
continue;
// 1. 算法处理
auto results = mockInference(frame);
// 2. 绘制叠加
drawOverlay(frame, results);
// 3. 硬件编码推流
if (writer.isOpened()) {
writer.write(frame);
}
// 简单监控处理延迟
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double, std::milli> elapsed = end - start;
// 如果处理太快(例如只是简单的画框,几毫秒就完了),
// 这里的 cap.read 会自动阻塞等待下一帧,所以不需要手动 sleep。
// 只要输入是 20fps这个循环就会被输入流“带”着以 20fps 运行。
}
cap.release();
writer.release();
}

View File

@ -1,40 +1,43 @@
#ifndef VIDEO_PIPELINE_HPP
#define VIDEO_PIPELINE_HPP
#pragma once
#include <atomic>
#include <opencv2/opencv.hpp>
#include <string>
#include <thread>
#include <vector>
#include "algorithm_service.hpp"
#include "rtsp_camera_service.hpp"
#include "rtsp_stream_pusher.hpp"
#include "spdlog/spdlog.h"
// 模拟的检测结果结构体(对应未来的 YOLO 结果)
struct DetectionResult {
int x;
int y;
int width;
int height;
std::string label;
float confidence;
};
class VideoPipeline {
public:
VideoPipeline();
~VideoPipeline();
// 启动流水线
// inputUrl: 拉流地址 (摄像头)
// outputUrl: 推流地址 (MediaMTX)
// 启动视频流处理
void Start(const std::string& inputUrl, const std::string& outputUrl);
// 停止流水线
// 停止处理
void Stop();
private:
void pipelineLoop();
void processLoop(std::string inputUrl, std::string outputUrl);
private:
RtspCameraService camera;
RtspStreamPusher pusher;
AlgorithmService algo;
// 占位算法函数:模拟推理
std::vector<DetectionResult> mockInference(const cv::Mat& frame);
std::string inputUrl;
std::string outputUrl;
// 绘图函数
void drawOverlay(cv::Mat& frame, const std::vector<DetectionResult>& results);
std::thread workThread;
std::atomic<bool> isRunning;
std::atomic<bool> running_;
std::thread processingThread_;
};
#endif // VIDEO_PIPELINE_HPP