feat(video): 增加了视频服务。

增加了视频抓取与推送服务,通过HLC发送给前端。预留了算法处理接口。
This commit is contained in:
GuanYuankai 2025-12-24 16:14:25 +08:00
parent 3e5fa75135
commit e91238d114
12 changed files with 689 additions and 189 deletions

View File

@ -14,6 +14,8 @@ find_package(PkgConfig REQUIRED)
pkg_check_modules(GST REQUIRED gstreamer-1.0 gstreamer-app-1.0)
find_package(OpenCV REQUIRED)
find_package(absl REQUIRED)
add_subdirectory(src/vendor/crow)
add_library(nlohmann_json INTERFACE)
@ -60,11 +62,17 @@ add_library(vehicle_road_lib STATIC
src/config/config_manager.cc
#
src/alarm/alarm_service.cc
#
src/videoService/rtsp_camera_service.cc
src/videoService/rtsp_stream_pusher.cc
src/videoService/algorithm_service.cc
src/videoService/video_pipeline.cc
)
target_include_directories(vehicle_road_lib PUBLIC
/usr/include/opencv4
${CMAKE_CURRENT_SOURCE_DIR}/src
${CMAKE_CURRENT_SOURCE_DIR}/src/videoService
${GST_INCLUDE_DIRS}
/usr/include/rga
)
@ -82,6 +90,12 @@ target_link_libraries(vehicle_road_lib PRIVATE
rga
${OpenCV_LIBS}
${GST_LIBRARIES}
# --- Abseil ---
absl::strings
absl::time
absl::synchronization
absl::status
absl::statusor
)

View File

@ -3,7 +3,8 @@
## 当前任务 (Doing)
- [ ] 验证 OpenCV 在 RK3588 容器内的调用 (/dev/video0)
- [ ] 接入摄像头设备
- [ ] 接入摄像头设备 rtsp://admin:123456@192.168.1.57:554/stream0
- [ ] 接入 RKNN 模型
## 待办 (To Do)

View File

@ -1,7 +1,7 @@
{
"modbus_rtu_devices": [
{
"enabled": true,
"enabled": false,
"device_id": "rtu_temp_sensor_lab",
"port_path": "/dev/ttyS7",
"baud_rate": 9600,

View File

@ -19,14 +19,13 @@
#include "nlohmann/json.hpp"
#include "spdlog/spdlog.h"
#include "systemMonitor/system_monitor.h"
#include "videoService/video_pipeline.hpp"
#include "web/web_server.h"
boost::asio::io_context g_io_context;
void poll_system_metrics(boost::asio::steady_timer &timer,
SystemMonitor::SystemMonitor &monitor,
MqttClient &mqtt_client, AlarmService &alarm_service)
{
void poll_system_metrics(boost::asio::steady_timer& timer, SystemMonitor::SystemMonitor& monitor,
MqttClient& mqtt_client, AlarmService& alarm_service) {
if (g_io_context.stopped())
return;
auto cpu_util = monitor.getCpuUtilization();
@ -35,8 +34,7 @@ void poll_system_metrics(boost::asio::steady_timer &timer,
double mem_usage_percentage =
(mem_info.total_kb > 0)
? (100.0 * (mem_info.total_kb - mem_info.available_kb) /
mem_info.total_kb)
? (100.0 * (mem_info.total_kb - mem_info.available_kb) / mem_info.total_kb)
: 0.0;
auto thermalInfoString = monitor.getChipTemperature();
@ -44,8 +42,7 @@ void poll_system_metrics(boost::asio::steady_timer &timer,
std::string topic = "proxy/system_status";
std::string payload;
try
{
try {
nlohmann::json payload_json;
payload_json["cpu_usage"] = cpu_util.totalUsagePercentage;
payload_json["mem_total_gb"] = mem_total_gb;
@ -53,11 +50,8 @@ void poll_system_metrics(boost::asio::steady_timer &timer,
payload_json["thermal_info"] = nlohmann::json::parse(thermalInfoString);
payload = payload_json.dump();
}
catch (const nlohmann::json::parse_error &e)
{
spdlog::error("Failed to parse thermalInfo JSON: {}. Sending partial data.",
e.what());
} catch (const nlohmann::json::parse_error& e) {
spdlog::error("Failed to parse thermalInfo JSON: {}. Sending partial data.", e.what());
nlohmann::json fallback_json;
fallback_json["cpu_usage"] = cpu_util.totalUsagePercentage;
fallback_json["mem_total_gb"] = mem_total_gb;
@ -74,44 +68,38 @@ void poll_system_metrics(boost::asio::steady_timer &timer,
spdlog::debug("System metrics published.");
timer.expires_at(timer.expiry() + std::chrono::seconds(15));
timer.async_wait(std::bind(poll_system_metrics, std::ref(timer),
std::ref(monitor), std::ref(mqtt_client),
std::ref(alarm_service)));
timer.async_wait(std::bind(poll_system_metrics, std::ref(timer), std::ref(monitor),
std::ref(mqtt_client), std::ref(alarm_service)));
}
int main(int argc, char *argv[])
{
int main(int argc, char* argv[]) {
// TODO: [GYK] DEV#1: 将 URL 放入 config.json 中读取
std::string cam_rtsp_input = "rtsp://admin:123456@192.168.1.57:554/stream0";
std::string algorithm_rtsp_output = "rtsp://127.0.0.1:8554/processed";
const std::string config_path = "/app/config/config.json";
if (!ConfigManager::getInstance().load(config_path))
{
if (!ConfigManager::getInstance().load(config_path)) {
std::cerr << "Failed to load configuration from " << config_path
<< ". Running with defaults, but this may cause issues."
<< std::endl;
<< ". Running with defaults, but this may cause issues." << std::endl;
}
auto& config = ConfigManager::getInstance();
try
{
try {
spdlog::set_level(spdlog::level::from_str(config.getLogLevel()));
spdlog::info("Edge Proxy starting up...");
}
catch (const spdlog::spdlog_ex &ex)
{
} catch (const spdlog::spdlog_ex& ex) {
std::cerr << "Log initialization failed: " << ex.what() << std::endl;
return 1;
}
spdlog::info("Initializing Data Storage...");
auto& data_storage = DataStorage::getInstance();
if (!data_storage.initialize(config.getDataStorageDbPath()))
{
if (!data_storage.initialize(config.getDataStorageDbPath())) {
spdlog::critical("Failed to initialize DataStorage. Exiting.");
return 1;
}
try
{
try {
spdlog::info("Initializing Video Service...");
DataCache data_cache;
@ -120,38 +108,30 @@ int main(int argc, char *argv[])
AlarmService alarm_service(g_io_context, mqtt_client);
if (!alarm_service.load_rules(config.getAlarmRulesPath()))
{
VideoPipeline video_pipeline;
if (!alarm_service.load_rules(config.getAlarmRulesPath())) {
spdlog::error("Failed to load alarm rules. Alarms may be disabled.");
}
auto report_to_mqtt = [&](const UnifiedData &data)
{
if (data_storage.storeProcessedData(data))
{
spdlog::debug("Successfully stored PROCESSED data for device '{}'",
data.device_id);
}
else
{
spdlog::error("Failed to store PROCESSED data for device '{}'",
data.device_id);
auto report_to_mqtt = [&](const UnifiedData& data) {
if (data_storage.storeProcessedData(data)) {
spdlog::debug("Successfully stored PROCESSED data for device '{}'", data.device_id);
} else {
spdlog::error("Failed to store PROCESSED data for device '{}'", data.device_id);
}
live_data_cache.update_data(data.device_id, data.data_json);
alarm_service.process_device_data(data.device_id, data.data_json);
if (mqtt_client.is_connected())
{
if (mqtt_client.is_connected()) {
std::string topic = "devices/" + data.device_id + "/data";
g_io_context.post([&, topic, payload = data.data_json]()
{ mqtt_client.publish(topic, payload, 1, false); });
}
else
{
spdlog::warn("MQTT disconnected. Caching data for device '{}'.",
data.device_id);
g_io_context.post([&, topic, payload = data.data_json]() {
mqtt_client.publish(topic, payload, 1, false);
});
} else {
spdlog::warn("MQTT disconnected. Caching data for device '{}'.", data.device_id);
data_cache.add(data);
}
};
@ -163,46 +143,44 @@ int main(int argc, char *argv[])
TCPServer tcp_server(g_io_context, listen_ports, mqtt_client);
SystemMonitor::SystemMonitor monitor;
if (!data_cache.open(config.getDataCacheDbPath()))
{
if (!data_cache.open(config.getDataCacheDbPath())) {
spdlog::critical("Failed to initialize data cache at '{}'. Exiting.",
config.getDataCacheDbPath());
return 1;
}
CacheUploader cache_uploader(g_io_context, mqtt_client, data_cache);
mqtt_client.set_connected_handler([&](const std::string &cause)
{
mqtt_client.set_connected_handler([&](const std::string& cause) {
spdlog::info("MQTT client connected: {}", cause);
cache_uploader.start_upload(); });
cache_uploader.start_upload();
});
mqtt_client.connect();
mqtt_router.start();
monitor.getCpuUtilization();
boost::asio::steady_timer system_monitor_timer(g_io_context,
std::chrono::seconds(15));
boost::asio::steady_timer system_monitor_timer(g_io_context, std::chrono::seconds(15));
system_monitor_timer.async_wait(std::bind(
poll_system_metrics, std::ref(system_monitor_timer), std::ref(monitor),
system_monitor_timer.async_wait(std::bind(poll_system_metrics,
std::ref(system_monitor_timer), std::ref(monitor),
std::ref(mqtt_client), std::ref(alarm_service)));
device_manager.load_and_start(config.getDevicesConfigPath());
WebServer web_server(monitor, device_manager, live_data_cache,
alarm_service, config.getWebServerPort());
web_server.set_shutdown_handler([&]()
{
WebServer web_server(monitor, device_manager, live_data_cache, alarm_service,
config.getWebServerPort());
web_server.set_shutdown_handler([&]() {
spdlog::warn("Received shutdown command from Web API. Shutting down.");
g_io_context.stop(); });
g_io_context.stop();
});
web_server.start();
spdlog::info("Starting Video Pipeline Service...");
video_pipeline.Start(cam_rtsp_input, algorithm_rtsp_output);
boost::asio::signal_set signals(g_io_context, SIGINT, SIGTERM);
signals.async_wait(
[&](const boost::system::error_code &error, int signal_number)
{
spdlog::warn("Interrupt signal ({}) received. Shutting down.",
signal_number);
signals.async_wait([&](const boost::system::error_code& error, int signal_number) {
spdlog::warn("Interrupt signal ({}) received. Shutting down.", signal_number);
spdlog::info("[Shutdown] A. Stopping device manager services...");
device_manager.stop_all();
@ -210,6 +188,9 @@ int main(int argc, char *argv[])
spdlog::info("[Shutdown] B. Stopping web server...");
web_server.stop();
spdlog::info("[Shutdown] B2. Stopping video pipeline...");
video_pipeline.Stop();
spdlog::info("[Shutdown] C. Stopping alarm service...");
alarm_service.stop();
@ -222,9 +203,7 @@ int main(int argc, char *argv[])
spdlog::info("All services are running. Press Ctrl+C to exit.");
g_io_context.run();
}
catch (const std::exception &e)
{
} catch (const std::exception& e) {
spdlog::critical("An unhandled exception occurred: {}", e.what());
return 1;
}

View File

@ -0,0 +1,45 @@
#include "algorithm_service.hpp"
#include <spdlog/spdlog.h>
#include <chrono>
AlgorithmService::AlgorithmService() {}
AlgorithmService::~AlgorithmService() {}
absl::Status AlgorithmService::Init() {
spdlog::info("Initializing Algorithm Model...");
// TODO: 在这里加载你的 AI 模型 (如 TensorRT, OpenVINO 等)
// 模拟加载耗时
// std::this_thread::sleep_for(std::chrono::seconds(1));
return absl::OkStatus();
}
absl::Status AlgorithmService::Process(cv::Mat& frame) {
if (frame.empty())
return absl::InvalidArgumentError("Empty frame");
// --- 算法处理区域 START ---
// 1. 模拟:绘制时间戳
auto now = std::chrono::system_clock::now();
std::time_t now_c = std::chrono::system_clock::to_time_t(now);
std::string timeStr = std::ctime(&now_c);
if (!timeStr.empty())
timeStr.pop_back(); // 去掉换行符
cv::putText(frame, "Live: " + timeStr, cv::Point(30, 50), cv::FONT_HERSHEY_SIMPLEX, 1.0,
cv::Scalar(0, 255, 0), 2);
// 2. 模拟:绘制检测框 (假设检测到了物体)
int centerX = frame.cols / 2;
int centerY = frame.rows / 2;
cv::rectangle(frame, cv::Rect(centerX - 100, centerY - 100, 200, 200), cv::Scalar(0, 0, 255),
3);
cv::putText(frame, "Detected Object", cv::Point(centerX - 100, centerY - 110),
cv::FONT_HERSHEY_SIMPLEX, 0.8, cv::Scalar(0, 0, 255), 2);
// --- 算法处理区域 END ---
return absl::OkStatus();
}

View File

@ -0,0 +1,21 @@
#ifndef ALGORITHM_SERVICE_HPP
#define ALGORITHM_SERVICE_HPP
#include <opencv2/opencv.hpp>
#include "absl/status/status.h"
class AlgorithmService {
public:
AlgorithmService();
~AlgorithmService();
// 初始化算法模型 (例如加载 YOLO 权重)
absl::Status Init();
// 处理每一帧
// frame: 输入输出参数,直接在原图上修改
absl::Status Process(cv::Mat& frame);
};
#endif // ALGORITHM_SERVICE_HPP

View File

@ -0,0 +1,140 @@
#include "rtsp_camera_service.hpp"
// spdlog
#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/spdlog.h>
// Abseil
#include "absl/strings/str_cat.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
static std::shared_ptr<spdlog::logger> getLogger() {
auto logger = spdlog::get("camera_service");
if (!logger) {
try {
auto consoleSink = std::make_shared<spdlog::sinks::stdout_color_sink_mt>();
auto fileSink = std::make_shared<spdlog::sinks::basic_file_sink_mt>(
"logs/camera_service.log", true);
std::vector<spdlog::sink_ptr> sinks{consoleSink, fileSink};
logger = std::make_shared<spdlog::logger>("camera_service", sinks.begin(), sinks.end());
spdlog::register_logger(logger);
logger->set_pattern("[%Y-%m-%d %H:%M:%S.%e] [%n] [%^%l%$] %v");
logger->set_level(spdlog::level::info);
// 设置 flush 策略(可选):比如遇到 info 级别就立即刷新到文件,防止崩溃时日志丢失
logger->flush_on(spdlog::level::info);
} catch (const spdlog::spdlog_ex& ex) {
spdlog::error("Log init failed: {}", ex.what());
return spdlog::default_logger();
}
}
return logger;
}
RtspCameraService::RtspCameraService() : isRunning(false) {}
RtspCameraService::~RtspCameraService() {
this->Close();
}
absl::Status RtspCameraService::Open(const std::string& rtspUrl) {
this->cameraUrl = rtspUrl;
auto logger = getLogger();
logger->info("Connecting to Camera via GStreamer: {}", rtspUrl);
// --- 构建 GStreamer 读取管道 ---
// 1. rtspsrc: 从 RTSP 拉流
// 2. latency=0: 低延迟设置
// 3. rtph264depay + h264parse: 解析 H264 数据
// 4. mppvideodec: [关键] 使用瑞芯微硬件解码器
// 5. videoconvert: 确保格式转为 BGR 供 OpenCV 使用 (appsink 默认需 BGR)
// 6. appsink: 将数据传给 OpenCV
std::string gst_pipeline = absl::StrCat(
"rtspsrc location=", rtspUrl, " latency=0 protocols=tcp ! ", // 添加 protocols=tcp
"queue ! ", "rtph265depay ! h265parse ! ", "mppvideodec ! ", "videoconvert ! ",
"appsink sync=false");
logger->info("GStreamer Pipeline: {}", gst_pipeline);
this->capture.open(gst_pipeline, cv::CAP_GSTREAMER);
if (!this->capture.isOpened()) {
std::string errMsg = "Failed to open RTSP stream via GStreamer.";
logger->error(errMsg);
return absl::UnavailableError(errMsg);
}
this->isRunning = true;
this->workerThread = std::thread(&RtspCameraService::captureThreadFunc, this);
logger->info("Camera connected successfully.");
return absl::OkStatus();
}
void RtspCameraService::Close() {
if (this->isRunning.exchange(false)) {
getLogger()->info("Stopping camera service...");
// --- 修改开始 ---
if (this->capture.isOpened()) {
this->capture.release();
getLogger()->info("Camera capture released.");
}
if (this->workerThread.joinable()) {
this->workerThread.join();
getLogger()->info("Camera worker thread joined.");
}
getLogger()->info("Camera service stopped.");
}
}
absl::Status RtspCameraService::GetLatestFrame(cv::Mat& outFrame) {
absl::MutexLock lock(&this->frameMutex);
if (this->currentFrame.empty()) {
return absl::NotFoundError("Frame buffer is empty");
}
this->currentFrame.copyTo(outFrame);
return absl::OkStatus();
}
void RtspCameraService::captureThreadFunc() {
auto logger = getLogger();
cv::Mat tmpFrame;
while (this->isRunning.load(std::memory_order_relaxed)) {
bool success = this->capture.read(tmpFrame);
if (!this->isRunning.load(std::memory_order_relaxed)) {
break;
}
if (!this->capture.read(tmpFrame)) {
logger->warn("Dropped connection or empty frame received from RTSP.");
// 可以在此处添加重连逻辑
absl::SleepFor(absl::Seconds(1));
continue;
}
if (tmpFrame.empty()) {
continue;
}
{
// 更新共享的最新帧
absl::MutexLock lock(&this->frameMutex);
tmpFrame.copyTo(this->currentFrame);
}
// 适当微休眠以防 CPU 占用过高(根据实际帧率调整)
// 对于 30fps 的流,可以忽略或设置极短休眠
absl::SleepFor(absl::Milliseconds(1));
}
}

View File

@ -0,0 +1,38 @@
#ifndef RTSP_CAMERA_SERVICE_HPP
#define RTSP_CAMERA_SERVICE_HPP
#include <atomic>
#include <opencv2/opencv.hpp>
#include <string>
#include <thread>
// Abseil 头文件
#include "absl/status/status.h"
#include "absl/synchronization/mutex.h"
class RtspCameraService {
public:
RtspCameraService();
~RtspCameraService();
absl::Status Open(const std::string& rtspUrl);
void Close();
absl::Status GetLatestFrame(cv::Mat& outFrame);
private:
void captureThreadFunc();
private:
cv::VideoCapture capture;
absl::Mutex frameMutex;
cv::Mat currentFrame; // GUARDED_BY(frameMutex)
std::thread workerThread;
std::atomic<bool> isRunning;
std::string cameraUrl;
};
#endif // RTSP_CAMERA_SERVICE_HPP

View File

@ -0,0 +1,88 @@
#include "rtsp_stream_pusher.hpp"
#include <spdlog/spdlog.h>
#include "absl/strings/str_format.h"
RtspStreamPusher::RtspStreamPusher() {}
RtspStreamPusher::~RtspStreamPusher() {
this->Close();
}
absl::Status RtspStreamPusher::Open(const std::string& rtspUrl, int width, int height, int fps) {
if (this->isOpened) {
return absl::OkStatus();
}
this->width = width;
this->height = height;
// --- GStreamer 命令构建 ---
// 1. fdsrc: 从文件描述符读取 (这里默认是 stdin)
// 2. videoparse: 解析原始数据格式
// - format=bgr: OpenCV 默认是 BGR
// - width/height/framerate: 必须与输入一致
// 3. videoconvert: 转换颜色空间 (BGR -> I420/YUV) 以便编码
// 4. x264enc: H.264 编码器
// - tune=zerolatency: 低延迟模式
// - speed-preset=ultrafast: 极速编码
// 5. rtspclientsink: 推流到 RTSP 服务器 (MediaMTX)
// - protocols=tcp: 强制使用 TCP通常网络穿透性更好
std::string cmd = absl::StrFormat(
"gst-launch-1.0 -v fdsrc ! "
"videoparse width=%d height=%d framerate=%d/1 format=bgr ! "
"queue max-size-buffers=2 leaky=downstream ! "
"videoconvert ! "
"video/x-raw,format=NV12 ! "
"mpph264enc gop=25 rc-mode=fixqp qp-init=26 ! "
"h264parse ! "
"rtspclientsink location=%s latency=0 protocols=tcp",
width, height, fps, rtspUrl);
spdlog::info("Starting GStreamer pipe: {}", cmd);
// 打开管道
// 这里的 "w" 表示我们可以向这个进程写入数据 (Write)
this->gstPipe = popen(cmd.c_str(), "w");
if (!this->gstPipe) {
return absl::InternalError("Failed to open GStreamer pipe");
}
this->isOpened = true;
return absl::OkStatus();
}
void RtspStreamPusher::Close() {
if (this->gstPipe) {
pclose(this->gstPipe);
this->gstPipe = nullptr;
}
this->isOpened = false;
}
absl::Status RtspStreamPusher::PushFrame(const cv::Mat& frame) {
if (!this->isOpened || !this->gstPipe) {
return absl::FailedPreconditionError("Pusher is not open");
}
if (frame.empty()) {
return absl::InvalidArgumentError("Frame is empty");
}
// 再次确认尺寸GStreamer 的 videoparse 对尺寸非常敏感,
// 如果写入的数据量不对,管道会立即报错断开。
if (frame.cols != this->width || frame.rows != this->height) {
return absl::InvalidArgumentError("Frame size mismatch");
}
// 写入原始数据
size_t written = fwrite(frame.data, 1, frame.total() * frame.elemSize(), this->gstPipe);
if (written != frame.total() * frame.elemSize()) {
spdlog::error("Failed to write frame to GStreamer pipe");
return absl::DataLossError("Failed to write full frame");
}
return absl::OkStatus();
}

View File

@ -0,0 +1,34 @@
#ifndef RTSP_STREAM_PUSHER_HPP
#define RTSP_STREAM_PUSHER_HPP
#include <cstdio> // for popen
#include <opencv2/opencv.hpp>
#include <string>
// Abseil
#include "absl/status/status.h"
class RtspStreamPusher {
public:
RtspStreamPusher();
~RtspStreamPusher();
// 初始化推流器
// rtspUrl: 目标推流地址 (例如 rtsp://127.0.0.1:8554/processed)
// width, height, fps: 输出视频的参数
absl::Status Open(const std::string& rtspUrl, int width, int height, int fps);
// 关闭推流
void Close();
// 推送一帧图像
absl::Status PushFrame(const cv::Mat& frame);
private:
FILE* gstPipe = nullptr;
int width = 0;
int height = 0;
bool isOpened = false;
};
#endif // RTSP_STREAM_PUSHER_HPP

View File

@ -0,0 +1,100 @@
#include "video_pipeline.hpp"
#include <spdlog/spdlog.h>
VideoPipeline::VideoPipeline() : isRunning(false) {}
VideoPipeline::~VideoPipeline() {
Stop();
}
void VideoPipeline::Start(const std::string& inputUrl, const std::string& outputUrl) {
if (isRunning)
return;
this->inputUrl = inputUrl;
this->outputUrl = outputUrl;
this->isRunning = true;
// 启动工作线程
this->workThread = std::thread(&VideoPipeline::pipelineLoop, this);
spdlog::info("Video Pipeline Service Started.");
}
void VideoPipeline::Stop() {
if (isRunning.exchange(false)) {
spdlog::info("Stopping Video Pipeline...");
if (workThread.joinable()) {
workThread.join();
}
// 显式关闭各个组件
camera.Close();
pusher.Close();
spdlog::info("Video Pipeline Stopped.");
}
}
void VideoPipeline::pipelineLoop() {
// 1. 初始化算法
if (!algo.Init().ok()) {
spdlog::error("Algorithm init failed.");
return;
}
// 2. 打开摄像头
// 简单的重试机制
while (isRunning && !camera.Open(inputUrl).ok()) {
spdlog::warn("Retrying camera connection in 2s...");
std::this_thread::sleep_for(std::chrono::seconds(2));
}
// 3. 等待获取第一帧以确定分辨率
cv::Mat frame;
while (isRunning) {
if (camera.GetLatestFrame(frame).ok() && !frame.empty()) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
if (!isRunning)
return; // 如果在等待期间被关闭
// 4. 初始化推流器
// 假设输出 25fps分辨率与输入一致
if (!pusher.Open(outputUrl, frame.cols, frame.rows, 25).ok()) {
spdlog::error("Failed to open RTSP pusher.");
return;
}
spdlog::info("Pipeline loop running: Capture -> Algo -> Push");
// 5. 主处理循环
while (isRunning) {
auto start = std::chrono::steady_clock::now();
if (!camera.GetLatestFrame(frame).ok()) {
// 获取失败 (可能是流断了),稍微休眠
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
absl::Status algoStatus = algo.Process(frame);
if (!algoStatus.ok()) {
spdlog::warn("Algorithm processing failed: {}", algoStatus.ToString());
}
absl::Status pushStatus = pusher.PushFrame(frame);
if (!pushStatus.ok()) {
spdlog::warn("Push frame failed: {}", pushStatus.ToString());
}
auto end = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
// 目标 25fps = 40ms/frame
if (elapsed < 40) {
std::this_thread::sleep_for(std::chrono::milliseconds(40 - elapsed));
}
}
}

View File

@ -0,0 +1,40 @@
#ifndef VIDEO_PIPELINE_HPP
#define VIDEO_PIPELINE_HPP
#include <atomic>
#include <string>
#include <thread>
#include "algorithm_service.hpp"
#include "rtsp_camera_service.hpp"
#include "rtsp_stream_pusher.hpp"
class VideoPipeline {
public:
VideoPipeline();
~VideoPipeline();
// 启动流水线
// inputUrl: 拉流地址 (摄像头)
// outputUrl: 推流地址 (MediaMTX)
void Start(const std::string& inputUrl, const std::string& outputUrl);
// 停止流水线
void Stop();
private:
void pipelineLoop();
private:
RtspCameraService camera;
RtspStreamPusher pusher;
AlgorithmService algo;
std::string inputUrl;
std::string outputUrl;
std::thread workThread;
std::atomic<bool> isRunning;
};
#endif // VIDEO_PIPELINE_HPP