完成了通过MQTT进行devices配置更新功能

This commit is contained in:
GuanYuankai 2025-10-22 09:47:10 +00:00
parent e3ff3bd050
commit 7a5375ef68
12 changed files with 740 additions and 622 deletions

View File

@ -23,9 +23,6 @@ target_include_directories(nlohmann_json INTERFACE
${CMAKE_CURRENT_SOURCE_DIR}/src/vendor
)
add_library(edge_proxy_lib STATIC
# TCP
src/network/tcp_server.cc
@ -83,13 +80,13 @@ target_link_libraries(edge_proxy_lib PRIVATE
PahoMqttCpp::paho-mqttpp3
SQLite::SQLite3
pthread
nlohmann_json
# rknn_api
rknnrt
)
target_link_libraries(edge_proxy_lib PUBLIC
Crow
nlohmann_json
)
# =================================================================
# Main Application Target (UNCOMMENTED AND ACTIVATED)
@ -102,19 +99,6 @@ target_link_libraries(edge_proxy PRIVATE
edge_proxy_lib
)
# =================#================================================
#
# =================================================================
add_executable(test
src/test.cc
src/systemMonitor/iio_sensor.cc
)
target_link_libraries(test PRIVATE
edge_proxy_lib
)
add_executable(edge_streamer
src/streamer/main_streamer.cpp
@ -122,17 +106,25 @@ add_executable(edge_streamer
#
target_link_libraries(edge_streamer PRIVATE
# --- GStreamer (来自 pkg_check_modules) ---
${GST_LIBRARIES}
pthread
)
#
target_include_directories(edge_streamer PRIVATE
#
${CMAKE_CURRENT_SOURCE_DIR}/src/streamer/include
# --- GStreamer (来自 pkg_check_modules) ---
${GST_INCLUDE_DIRS}
)
# =================#================================================
#
# =================================================================
# add_executable(test
# src/test.cc
# src/systemMonitor/iio_sensor.cc
# )
# target_link_libraries(test PRIVATE
# edge_proxy_lib
# )

View File

@ -1,59 +1,59 @@
{
"modbus_rtu_devices": [
{
"enabled": false,
"device_id": "rtu_temp_sensor_lab",
"port_path": "/dev/ttyS7",
"baud_rate": 9600,
"slave_id": 1,
"poll_interval_ms": 5000,
"data_points": [
{"name": "temperature", "address": 0, "type": "INT16", "scale": 0.1},
{"name": "humidity", "address": 1, "type": "UINT16", "scale": 0.1}
]
},
{
"enabled": false,
"device_id": "rotary encoder",
"port_path": "/dev/ttyS7",
"baud_rate": 9600,
"slave_id": 111,
"poll_interval_ms": 5000,
"data_points": [
{"name": "count", "address": 1, "type": "INT16", "scale": 1.0},
{"name": "total_count", "address": 2, "type": "INT16", "scale": 1.0}
]
},
{
"enabled": false,
"device_id": "backup_counter",
"port_path": "/dev/ttyS7",
"baud_rate": 9600,
"slave_id": 10,
"poll_interval_ms": 1000,
"data_points": [
{"name": "count", "address": 32, "type": "UINT32"}
]
{
"modbus_rtu_devices": [
{
"enabled": true,
"device_id": "rtu_temp_sensor_lab",
"port_path": "/dev/ttyS7",
"baud_rate": 9600,
"slave_id": 1,
"poll_interval_ms": 5000,
"data_points": [
{"name": "temperature", "address": 0, "type": "INT16", "scale": 0.1},
{"name": "humidity", "address": 1, "type": "UINT16", "scale": 0.1}
]
},
{
"enabled": false,
"device_id": "rotary encoder",
"port_path": "/dev/ttyS7",
"baud_rate": 9600,
"slave_id": 111,
"poll_interval_ms": 5000,
"data_points": [
{"name": "count", "address": 1, "type": "INT16", "scale": 1.0},
{"name": "total_count", "address": 2, "type": "INT16", "scale": 1.0}
]
},
{
"enabled": false,
"device_id": "backup_counter",
"port_path": "/dev/ttyS7",
"baud_rate": 9600,
"slave_id": 10,
"poll_interval_ms": 1000,
"data_points": [
{"name": "count", "address": 32, "type": "UINT32"}
]
}
],
"modbus_tcp_devices": [
{
"enabled": false,
"device_id": "plc_workshop1",
"ip_address": "192.168.1.120",
"port": 502,
"slave_id": 1,
"poll_interval_ms": 1000,
"data_points": [
{"name": "motor_speed", "address": 100, "type": "UINT16", "scale": 1.0},
{"name": "pressure", "address": 102, "type": "FLOAT32", "scale": 0.01},
{"name": "valve_status","address": 104, "type": "UINT16", "scale": 1.0}
]
}
],
"modbus_rtu_bus_configs": {
"/dev/ttyS7": {
"inter_device_delay_ms": 150
}
}
],
"modbus_tcp_devices": [
{
"enabled": false,
"device_id": "plc_workshop1",
"ip_address": "192.168.1.120",
"port": 502,
"slave_id": 1,
"poll_interval_ms": 1000,
"data_points": [
{"name": "motor_speed", "address": 100, "type": "UINT16", "scale": 1.0},
{"name": "pressure", "address": 102, "type": "FLOAT32", "scale": 0.01},
{"name": "valve_status","address": 104, "type": "UINT16", "scale": 1.0}
]
}
],
"modbus_rtu_bus_configs": {
"/dev/ttyS7": {
"inter_device_delay_ms": 150
}
}
}
}

View File

@ -0,0 +1,280 @@
#!/usr/bin/env python3
import subprocess
import json
import sys
import ipaddress
import socket
import re
from typing import List, Set, Dict, Any
import time # 导入 time
import threading # 导入 threading
# 尝试导入 psutil
try:
import psutil
except ImportError:
print("错误: 'psutil' 库未找到。请在 Dockerfile 中或使用 'pip install psutil' 安装。")
sys.exit(1)
# --- 配置 ---
COMMON_CAMERA_PORTS = [
80, # HTTP (web interface)
443, # HTTPS (secure web interface)
554, # RTSP (Real Time Streaming Protocol)
8000, # Often used by Hikvision (SDK/HTTP)
8080, # Alternative HTTP/RTSP
8001, # Hikvision stream port
37777, # Dahua primary port
37778, # Dahua secondary port
8002, # Often used for camera APIs or secondary streams
]
NMAP_PORT_STRING = "T:" + ",".join(map(str, COMMON_CAMERA_PORTS))
# 用于过滤 Docker 网络的常见前缀
DOCKER_NETWORK_PREFIXES = ["172.17.", "172.18.", "172.19.", "172.20."]
# 全局标志,用于停止进度线程
stop_progress_flag = threading.Event()
start_time = time.time()
def progress_indicator():
"""在后台线程中运行,定期打印进度信息。"""
interval = 10 # 每 10 秒报告一次
count = 0
# 打印一个提示,说明进度指示器已启动
print(f"--- Nmap 扫描进行中(每 {interval} 秒更新一次)---", file=sys.stderr)
while not stop_progress_flag.is_set():
time_elapsed = time.time() - start_time
# 使用 is_set() 检查标志,并等待最多 interval 秒
stop_progress_flag.wait(interval)
if not stop_progress_flag.is_set():
count += 1
# 打印一个简单的状态信息
print(f"--- 状态更新 #{count}: 扫描已运行 {time_elapsed:.1f} 秒... ---", file=sys.stderr)
def get_local_ip() -> str:
"""获取本机的本地 IP 地址。"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# 连接到一个虚拟地址
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
def get_all_local_networks() -> List[str]:
"""
使用 psutil 查找所有活动的网络接口及其关联的网络范围
返回一个网络 CIDR 字符串列表 (例如 '192.168.1.0/24')
"""
networks = set()
try:
for interface, snics in psutil.net_if_addrs().items():
for snic in snics:
if snic.family == socket.AF_INET: # IPv4
ip_address = snic.address
netmask = snic.netmask
if ip_address and netmask and ip_address != '127.0.0.1':
try:
# strict=False 允许主机地址
network_obj = ipaddress.IPv4Network(f"{ip_address}/{netmask}", strict=False)
networks.add(str(network_obj))
except ipaddress.AddressValueError as e:
print(f"警告: 无法解析网络 {interface}: {ip_address}, {netmask}。错误: {e}", file=sys.stderr)
return list(networks)
except Exception as e:
print(f"错误: 使用 psutil 获取网络接口时出错: {e}", file=sys.stderr)
print("回退: 仅扫描本地 IP 的 /24 子网。", file=sys.stderr)
local_ip = get_local_ip()
if local_ip == '127.0.0.1':
return []
return [str(ipaddress.IPv4Network(f"{local_ip}/24", strict=False))]
def filter_docker_networks(all_networks: List[str]) -> List[str]:
"""过滤掉已知的 Docker 内部网络。"""
filtered_networks = []
print("--- 检测到以下网络 ---", file=sys.stderr)
for net_cidr in all_networks:
print(f" - {net_cidr}", file=sys.stderr)
for net_cidr in all_networks:
is_docker_internal = False
for prefix in DOCKER_NETWORK_PREFIXES:
if net_cidr.startswith(prefix):
is_docker_internal = True
break
if not is_docker_internal:
filtered_networks.append(net_cidr)
else:
print(f" - 排除 Docker 内部网络: {net_cidr}", file=sys.stderr)
if not filtered_networks and all_networks:
print(f"警告: 所有检测到的网络似乎都是 Docker 内部网络。扫描可能不会发现外部设备。", file=sys.stderr)
print(f"将继续扫描所有检测到的网络: {', '.join(all_networks)}", file=sys.stderr)
return all_networks # 如果过滤后为空,但原来不为空,则返回原列表
return filtered_networks
def run_nmap_scan(targets: List[str]) -> str:
"""
对目标网络运行 nmap 扫描
-sV: 服务版本检测
-p [ports]: 扫描指定的 TCP 端口
-T4: 加快扫描速度
-oJ -: JSON 输出到 stdout
"""
print(f"\n--- 开始 Nmap 扫描 (目标: {', '.join(targets)}) ---", file=sys.stderr)
print(f"--- 扫描端口: {NMAP_PORT_STRING} ---", file=sys.stderr)
# 需要 sudo 权限来运行 nmap
# 您的 Dockerfile 已经为 'dev' 用户设置了 NOPASSWD:ALL
nmap_command = [
'sudo', 'nmap',
'-oJ', '-', # 输出 JSON 到 stdout
'-T4', # 更快的时间模板
'--open', # 只显示打开的端口
'-sV', # 服务版本指纹识别
# --- 移除了导致错误的 ONVIF 脚本 ---
# '--script=broadcast-onvif-discover',
# --- --------------------------- ---
'-p', NMAP_PORT_STRING, # 扫描特定端口
] + targets
start_time = time.time()
stop_progress_flag.clear() # 确保标志是清除的
progress_thread = threading.Thread(target=progress_indicator, daemon=True)
progress_thread.start()
try:
# 设置超时例如10分钟以防扫描卡住
result = subprocess.run(nmap_command,
capture_output=True,
text=True,
check=True,
timeout=600)
stop_progress_flag.set()
progress_thread.join() # 等
print("--- Nmap 扫描完成 ---", file=sys.stderr)
待线程清理
return result.stdout
except subprocess.CalledProcessError as e:
print(f"错误: Nmap 执行失败 (返回码 {e.returncode}):", file=sys.stderr)
print(f"Nmap Stderr: {e.stderr}", file=sys.stderr)
return None
except subprocess.TimeoutExpired as e:
print(f"错误: Nmap 扫描超时 ({e.timeout} 秒)。", file=sys.stderr)
print(f"Nmap Stdout: {e.stdout}", file=sys.stderr)
print(f"Nmap Stderr: {e.stderr}", file=sys.stderr)
return None
except FileNotFoundError:
print("错误: 'sudo''nmap' 命令未找到。请确保它在 PATH 中并且已安装。", file=sys.stderr)
return None
def parse_nmap_json(nmap_stdout: str) -> List[Dict[str, Any]]:
"""将 nmap 的 JSON 输出解析为我们想要的简洁格式。"""
results = []
if not nmap_stdout:
return results
try:
nmap_data = json.loads(nmap_stdout)
except json.JSONDecodeError as e:
print(f"错误: 无法解析 Nmap 的 JSON 输出。{e}", file=sys.stderr)
print(f"原始输出: {nmap_stdout[:500]}...", file=sys.stderr)
return results
if 'hosts' not in nmap_data:
print("警告: Nmap 输出中未找到 'hosts' 键。", file=sys.stderr)
return results
for host in nmap_data.get('hosts', []):
if host.get('status', {}).get('state') != 'up':
continue
ip = host.get('addresses', {}).get('ipv4')
mac = host.get('addresses', {}).get('mac')
if not ip:
continue
vendor = host.get('vendor', {}).get(mac, None) if mac else None
device_info = {
"ip": ip,
"mac": mac,
"vendor": vendor,
"services": [],
"onvif_xaddrs": [] # 即使没有脚本,也保留此键以保持格式一致
}
# 1. 解析端口和服务
for port_info in host.get('ports', []):
if port_info.get('state', {}).get('state') == 'open':
service_data = port_info.get('service', {})
service = {
"port": int(port_info.get('portid')),
"service_name": service_data.get('name'),
"product": service_data.get('product'),
"version": service_data.get('version'),
"extra_info": service_data.get('extrainfo')
}
device_info["services"].append(service)
# 2. 解析 ONVIF 脚本输出 (现在为空)
# 'hostscript' 字段包含主机级别的脚本结果
for script in host.get('hostscript', []):
if script.get('id') == 'broadcast-onvif-discover':
# Nmap 脚本输出是一个字符串。我们用正则提取 XAddrs。
output = script.get('output', '')
# 匹配 'http://...' 格式的 URL直到遇到空格或换行符
xaddrs = re.findall(r'http://[^\s,]+', output)
device_info["onvif_xaddrs"] = xaddrs
# 只有当我们发现了开放服务时才添加该设备
if device_info["services"]:
results.append(device_info)
return results
def main():
print("--- 启动网络摄像头发现 (基于 Nmap) ---", file=sys.stderr)
# 1. 获取并过滤本地网络
all_local_networks = get_all_local_networks()
if not all_local_networks:
print("错误: 未检测到本地网络用于扫描。退出。", file=sys.stderr)
sys.exit(1)
networks_to_scan = filter_docker_networks(all_local_networks)
if not networks_to_scan:
print("警告: 过滤后没有可扫描的网络。退出。", file=sys.stderr)
sys.exit(0)
print("\n--- 将扫描以下网络 ---", file=sys.stderr)
for net in networks_to_scan:
print(f" - {net}", file=sys.stderr)
# 2. 运行 Nmap 扫描
nmap_json_output = run_nmap_scan(networks_to_scan)
if not nmap_json_output:
print("错误: 未从 Nmap 收到任何输出。退出。", file=sys.stderr)
sys.exit(1)
# 3. 解析 Nmap JSON
final_results = parse_nmap_json(nmap_json_output)
# 4. 将最终结果以 JSON 格式打印到 stdout
# 注意stderr 用于打印日志stdout 仅用于输出最终的 JSON
print(json.dumps(final_results, indent=2))
if __name__ == "__main__":
main()

View File

@ -5,10 +5,10 @@
#include <nlohmann/json.hpp>
#include <fstream>
#include <map>
#include <set>
using json = nlohmann::json;
static ModbusDataType string_to_modbus_data_type(const std::string& type_str) {
static const std::map<std::string, ModbusDataType> type_map = {
{"UINT16", ModbusDataType::UINT16},
@ -26,175 +26,272 @@ static ModbusDataType string_to_modbus_data_type(const std::string& type_str) {
DeviceManager::DeviceManager(boost::asio::io_context& io_context, ReportDataCallback report_cb)
: m_io_context(io_context), m_report_callback(std::move(report_cb)) {}
: m_io_context(io_context), m_report_callback(std::move(report_cb)) {} //
DeviceManager::~DeviceManager() {
stop_all();
stop_all();
}
ModbusRtuDeviceConfig DeviceManager::_parse_rtu_config(const json& dev_json) {
ModbusRtuDeviceConfig config;
config.device_id = dev_json.at("device_id").get<std::string>();
config.port_path = dev_json.at("port_path").get<std::string>();
config.baud_rate = dev_json.at("baud_rate").get<unsigned int>();
config.slave_id = dev_json.at("slave_id").get<uint8_t>();
config.poll_interval_ms = dev_json.at("poll_interval_ms").get<int>();
for (const auto& dp_json : dev_json.at("data_points")) {
config.data_points.push_back({
dp_json.at("name").get<std::string>(),
(uint16_t)dp_json.at("address").get<int>(),
string_to_modbus_data_type(dp_json.at("type").get<std::string>()),
dp_json.value("scale", 1.0)
});
}
return config;
}
ModbusTcpDeviceConfig DeviceManager::_parse_tcp_config(const json& dev_json) {
ModbusTcpDeviceConfig config;
config.device_id = dev_json.at("device_id").get<std::string>();
config.ip_address = dev_json.at("ip_address").get<std::string>();
config.port = dev_json.at("port").get<uint16_t>();
config.slave_id = dev_json.at("slave_id").get<uint8_t>();
config.poll_interval_ms = dev_json.at("poll_interval_ms").get<int>();
for (const auto& dp_json : dev_json.at("data_points")) {
config.data_points.push_back({
dp_json.at("name").get<std::string>(),
(uint16_t)dp_json.at("address").get<int>(),
string_to_modbus_data_type(dp_json.at("type").get<std::string>()),
dp_json.value("scale", 1.0)
});
}
return config;
}
void DeviceManager::load_and_start(const std::string& config_path) {
std::lock_guard<std::mutex> lock(m_mutex);
spdlog::info("Loading device configuration from '{}'...", config_path);
std::ifstream config_file(config_path);
if (!config_file.is_open()) {
spdlog::critical("Failed to open configuration file: {}", config_path);
throw std::runtime_error("Configuration file not found.");
std::lock_guard<std::mutex> lock(m_mutex); //
spdlog::info("Loading device configuration from '{}'...", config_path); //
std::ifstream config_file(config_path); //
if (!config_file.is_open()) { //
spdlog::critical("Failed to open configuration file: {}", config_path); //
throw std::runtime_error("Configuration file not found."); //
}
try {
json config_json = json::parse(config_file);
json config_json = json::parse(config_file); //
// --- 修改:委托给新的核心逻辑函数 ---
_parse_and_apply_config(config_json);
// --- 结束修改 ---
// --- MODIFIED: 加载 Modbus RTU 设备的逻辑 ---
if (config_json.contains("modbus_rtu_devices")) {
// 1. 先将所有启用的设备按 port_path (总线) 分组
std::map<std::string, std::vector<ModbusRtuDeviceConfig>> rtu_device_groups;
for (const auto& dev_json : config_json["modbus_rtu_devices"]) {
if (!dev_json.value("enabled", false)) continue;
ModbusRtuDeviceConfig config;
config.device_id = dev_json.at("device_id").get<std::string>();
config.port_path = dev_json.at("port_path").get<std::string>();
config.baud_rate = dev_json.at("baud_rate").get<unsigned int>();
config.slave_id = dev_json.at("slave_id").get<uint8_t>();
config.poll_interval_ms = dev_json.at("poll_interval_ms").get<int>();
for (const auto& dp_json : dev_json.at("data_points")) {
config.data_points.push_back({
dp_json.at("name").get<std::string>(),
(uint16_t)dp_json.at("address").get<int>(),
string_to_modbus_data_type(dp_json.at("type").get<std::string>()),
dp_json.value("scale", 1.0)
});
}
// 将解析后的配置加入对应的分组
rtu_device_groups[config.port_path].push_back(config);
}
// 2. 为每个分组 (即每个物理串口) 创建一个 BusService
for (const auto& pair : rtu_device_groups) {
const std::string& port_path = pair.first;
const std::vector<ModbusRtuDeviceConfig>& devices_on_bus = pair.second;
auto service = std::make_unique<ModbusRtuBusService>(port_path, devices_on_bus, m_report_callback);
service->start();
m_rtu_bus_services.push_back(std::move(service));
spdlog::info("Started Modbus RTU Bus service for port '{}' with {} device(s).", port_path, devices_on_bus.size());
}
}
// --- 加载 Modbus TCP 设备 (逻辑保持不变) ---
if (config_json.contains("modbus_tcp_devices")) {
for (const auto& dev_json : config_json["modbus_tcp_devices"]) {
if (!dev_json.value("enabled", false)) continue;
ModbusTcpDeviceConfig config;
config.device_id = dev_json.at("device_id").get<std::string>();
config.ip_address = dev_json.at("ip_address").get<std::string>();
config.port = dev_json.at("port").get<uint16_t>();
config.slave_id = dev_json.at("slave_id").get<uint8_t>();
config.poll_interval_ms = dev_json.at("poll_interval_ms").get<int>();
for (const auto& dp_json : dev_json.at("data_points")) {
config.data_points.push_back({
dp_json.at("name").get<std::string>(),
(uint16_t)dp_json.at("address").get<int>(),
string_to_modbus_data_type(dp_json.at("type").get<std::string>()),
dp_json.value("scale", 1.0)
});
}
auto poller = std::make_shared<ModbusMasterPoller>(m_io_context, config, m_report_callback);
poller->start();
m_tcp_pollers.push_back(poller);
spdlog::info("Started Modbus TCP service for device '{}'.", config.device_id);
}
}
} catch (const json::exception& e) {
spdlog::critical("Failed to parse JSON configuration: {}", e.what());
throw;
} catch (const json::exception& e) { //
spdlog::critical("Failed to parse JSON configuration: {}", e.what()); //
throw; //
}
}
void DeviceManager::stop_all() {
void DeviceManager::post_apply_device_configuration(const std::string& json_payload) {
spdlog::debug("Posting config update task to io_context.");
boost::asio::post(m_io_context, [this, json_payload]() {
this->_apply_config_task(json_payload);
});
}
// +++ 新增:在 io_context 线程上执行的私有任务 +++
void DeviceManager::_apply_config_task(std::string json_payload) {
// 现在我们在 io_context 线程上,安全地锁定互斥锁
std::lock_guard<std::mutex> lock(m_mutex);
spdlog::info("Stopping all device services...");
spdlog::info("Applying new device configuration (from MQTT)...");
// 停止所有 RTU 总线服务
for (auto& service : m_rtu_bus_services) {
service->stop();
try {
json config_json = json::parse(json_payload);
_parse_and_apply_config(config_json);
spdlog::info("Successfully applied new device configuration.");
} catch (const json::exception& e) {
spdlog::error("Failed to parse and apply new config: {}", e.what());
}
m_rtu_bus_services.clear();
}
// +++ 新增:核心 "Diff" 逻辑 (私有, 必须持锁调用) +++
void DeviceManager::_parse_and_apply_config(const json& config_json) {
// --- 解析新配置,构建“期望状态” ---
// 停止所有 TCP 轮询服务
for (auto& poller : m_tcp_pollers) {
poller->stop();
// key: port_path, value: 该总线上的所有设备配置
std::map<std::string, std::vector<ModbusRtuDeviceConfig>> new_rtu_groups;
if (config_json.contains("modbus_rtu_devices")) {
for (const auto& dev_json : config_json["modbus_rtu_devices"]) {
if (!dev_json.value("enabled", false)) continue;
auto config = _parse_rtu_config(dev_json);
new_rtu_groups[config.port_path].push_back(config);
}
}
m_tcp_pollers.clear();
// key: device_id, value: 该设备的配置
std::map<std::string, ModbusTcpDeviceConfig> new_tcp_configs;
if (config_json.contains("modbus_tcp_devices")) {
for (const auto& dev_json : config_json["modbus_tcp_devices"]) {
if (!dev_json.value("enabled", false)) continue;
auto config = _parse_tcp_config(dev_json);
new_tcp_configs[config.device_id] = config;
}
}
// --- "Diff" RTU 总线服务 ---
// (我们使用 port_path 作为服务的唯一标识)
// 查找需要停止的 RTU 服务 (存在于当前,但不存在于新配置)
for (auto it = m_rtu_bus_services.begin(); it != m_rtu_bus_services.end(); /* no increment */) {
const std::string& port_path = it->first;
if (new_rtu_groups.find(port_path) == new_rtu_groups.end()) {
spdlog::info("Config update: Stopping RTU service for port '{}'.", port_path);
it->second->stop();
it = m_rtu_bus_services.erase(it); // erase(it) 返回下一个迭代器
} else {
++it;
}
}
// 启动新/更新 RTU 服务
for (const auto& pair : new_rtu_groups) {
const std::string& port_path = pair.first;
const auto& devices_on_bus = pair.second;
auto it = m_rtu_bus_services.find(port_path);
if (it != m_rtu_bus_services.end()) {
// 服务已存在,我们必须重启它以应用新配置
// TODO: 高级实现可以检查配置是否真的改变了
spdlog::info("Config update: Restarting RTU service for port '{}'...", port_path);
it->second->stop();
m_rtu_bus_services.erase(it);
} else {
spdlog::info("Config update: Starting new RTU service for port '{}'...", port_path);
}
auto service = std::make_unique<ModbusRtuBusService>(port_path, devices_on_bus, m_report_callback);
service->start();
m_rtu_bus_services[port_path] = std::move(service);
spdlog::info("Started Modbus RTU Bus service for port '{}' with {} device(s).", port_path, devices_on_bus.size());
}
// --- "Diff" TCP 轮询服务 ---
// (我们使用 device_id 作为服务的唯一标识)
// 查找需要停止的 TCP 服务
for (auto it = m_tcp_pollers.begin(); it != m_tcp_pollers.end(); /* no increment */) {
const std::string& device_id = it->first;
if (new_tcp_configs.find(device_id) == new_tcp_configs.end()) {
spdlog::info("Config update: Stopping TCP poller for device '{}'.", device_id);
it->second->stop();
it = m_tcp_pollers.erase(it);
} else {
++it;
}
}
// 启动新/更新 TCP 服务
for (const auto& pair : new_tcp_configs) {
const std::string& device_id = pair.first;
const auto& config = pair.second;
auto it = m_tcp_pollers.find(device_id);
if (it != m_tcp_pollers.end()) {
spdlog::info("Config update: Restarting TCP poller for device '{}'...", device_id);
it->second->stop();
m_tcp_pollers.erase(it);
} else {
spdlog::info("Config update: Starting new TCP poller for device '{}'...", device_id);
}
auto poller = std::make_shared<ModbusMasterPoller>(m_io_context, config, m_report_callback);
poller->start();
m_tcp_pollers[device_id] = poller;
}
}
void DeviceManager::stop_all() {
std::lock_guard<std::mutex> lock(m_mutex); //
spdlog::info("Stopping all device services..."); //
spdlog::info("All device services stopped.");
// --- 遍历 map ---
for (auto& pair : m_rtu_bus_services) {
pair.second->stop();
}
m_rtu_bus_services.clear(); //
for (auto& pair : m_tcp_pollers) {
pair.second->stop();
}
m_tcp_pollers.clear(); //
spdlog::info("All device services stopped."); //
}
std::vector<DeviceInfo> DeviceManager::get_all_device_info() const {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_mutex); //
std::vector<DeviceInfo> all_devices;
all_devices.reserve(m_tcp_pollers.size()); // 初始容量可以先按TCP设备数算
// 遍历所有 RTU 总线服务
for (const auto& service : m_rtu_bus_services) {
const auto& configs_on_bus = service->get_all_device_configs();
for (const auto& config : configs_on_bus) {
// --- 历 map ---
for (const auto& pair : m_rtu_bus_services) {
const auto& service = pair.second;
const auto& configs_on_bus = service->get_all_device_configs();
for (const auto& config : configs_on_bus) {
DeviceInfo info;
info.id = config.device_id;
info.type = "ModbusRTU";
info.is_running = service->is_running();
info.connection_details["Port Path"] = config.port_path;
info.connection_details["Baud Rate"] = std::to_string(config.baud_rate);
info.connection_details["Slave ID"] = std::to_string(config.slave_id);
all_devices.push_back(info);
info.id = config.device_id;
info.type = "ModbusRTU";
info.is_running = service->is_running();
info.connection_details["Port Path"] = config.port_path;
info.connection_details["Baud Rate"] = std::to_string(config.baud_rate);
info.connection_details["Slave ID"] = std::to_string(config.slave_id);
all_devices.push_back(info);
}
}
// 遍历 TCP 设备 (逻辑不变)
for (const auto& poller : m_tcp_pollers) {
const auto& config = poller->get_config();
for (const auto& pair : m_tcp_pollers) {
const auto& poller = pair.second;
const auto& config = poller->get_config();
DeviceInfo info;
info.id = config.device_id;
info.type = "ModbusTCP";
info.is_running = poller->is_running();
info.connection_details["IP Address"] = config.ip_address;
info.connection_details["Port"] = std::to_string(config.port);
info.connection_details["Slave ID"] = std::to_string(config.slave_id);
all_devices.push_back(info);
info.id = config.device_id;
info.type = "ModbusTCP";
info.is_running = poller->is_running();
info.connection_details["IP Address"] = config.ip_address;
info.connection_details["Port"] = std::to_string(config.port);
info.connection_details["Slave ID"] = std::to_string(config.slave_id);
all_devices.push_back(info);
}
return all_devices;
return all_devices;
}
bool DeviceManager::send_control_command(const std::string& device_id, uint16_t address, uint16_t value) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_mutex); //
// 首先,在 TCP poller 列表中查找
for (const auto& poller : m_tcp_pollers) {
if (poller->get_config().device_id == device_id) {
spdlog::info("Found TCP device '{}'. Dispatching write command to address {}.", device_id, address);
poller->write_single_register(address, value);
return true;
// --- 使用 map::find 进行 O(logN) 查找 ---
auto tcp_it = m_tcp_pollers.find(device_id);
if (tcp_it != m_tcp_pollers.end()) {
spdlog::info("Found TCP device '{}'. Dispatching write command to address {}.", device_id, address);
tcp_it->second->write_single_register(address, value);
return true;
}
// RTU 查找逻辑保持不变
for (const auto& pair : m_rtu_bus_services) {
const auto& service = pair.second;
if (service->manages_device(device_id)) { //
spdlog::info("Found RTU device '{}' on a bus. Dispatching write command.", device_id); //
service->write_single_register(device_id, address, value); //
return true; //
}
}
// 然后,在 RTU bus service 列表中查找哪个服务管理着该设备
for (const auto& service : m_rtu_bus_services) {
if (service->manages_device(device_id)) {
spdlog::info("Found RTU device '{}' on a bus. Dispatching write command.", device_id);
service->write_single_register(device_id, address, value);
return true;
}
}
spdlog::warn("send_control_command failed: Device with ID '{}' not found in any service.", device_id);
return false;
spdlog::warn("send_control_command failed: Device with ID '{}' not found in any service.", device_id); //
return false; //
}

View File

@ -10,18 +10,17 @@
#include <memory>
#include <string>
#include <mutex>
#include <map>
#include <nlohmann/json.hpp> //
/**
* @brief API层传递设备信息的统一结构体
*/
struct DeviceInfo {
// ... (结构体保持不变) ...
std::string id;
std::string type; // 例如 "ModbusRTU", "ModbusTCP"
std::string type;
bool is_running;
// 使用 map 存储连接相关的详细信息,增加灵活性
// 例如: {"IP Address": "192.168.1.10", "Port": "502"}
// 或 {"Port Path": "/dev/ttyS0", "Baud Rate": "9600"}
std::map<std::string, std::string> connection_details;
};
@ -43,17 +42,25 @@ public:
* @brief JSON配置文件加载所有设备并启动服务
* @param config_path JSON配置文件的路径
*/
void load_and_start(const std::string& config_path);
void load_and_start(const std::string& config_path); //
/**
* @brief (线) MQTT 线
* post io_context 线
* @param json_payload JSON
*/
void post_apply_device_configuration(const std::string& json_payload);
/**
* @brief
* @return vector
*/
std::vector<DeviceInfo> get_all_device_info() const;
std::vector<DeviceInfo> get_all_device_info() const; //
/**
* @brief
*/
void stop_all();
void stop_all(); //
/**
* @brief Modbus设备发送一个写单个寄存器的命令
@ -62,17 +69,41 @@ public:
* @param value
* @return truefalse
*/
bool send_control_command(const std::string& device_id, uint16_t address, uint16_t value);
bool send_control_command(const std::string& device_id, uint16_t address, uint16_t value); //
private:
boost::asio::io_context& m_io_context;
ReportDataCallback m_report_callback;
/**
* @brief (, io_context 线)
*
*/
void _apply_config_task(std::string json_payload);
// 用于存储正在运行的服务实例,以管理其生命周期
std::vector<std::unique_ptr<ModbusRtuBusService>> m_rtu_bus_services;
std::vector<std::shared_ptr<ModbusMasterPoller>> m_tcp_pollers;
/**
* @brief (, )
* "Diff"
*/
void _parse_and_apply_config(const nlohmann::json& config_json);
mutable std::mutex m_mutex;
/**
* @brief (, ) Modbus RTU
*/
ModbusRtuDeviceConfig _parse_rtu_config(const nlohmann::json& dev_json);
/**
* @brief (, ) Modbus TCP
*/
ModbusTcpDeviceConfig _parse_tcp_config(const nlohmann::json& dev_json);
private:
boost::asio::io_context& m_io_context; //
ReportDataCallback m_report_callback; //
// key: port_path (e.g., /dev/ttyS0)
std::map<std::string, std::unique_ptr<ModbusRtuBusService>> m_rtu_bus_services;
// key: device_id
std::map<std::string, std::shared_ptr<ModbusMasterPoller>> m_tcp_pollers;
mutable std::mutex m_mutex; //
};
#endif // DEVICE_MANAGER_H

View File

@ -18,7 +18,6 @@
#include <iostream>
#include <functional>
// 用于 ASIO 服务的全局 io_context
boost::asio::io_context g_io_context;
/**
@ -87,7 +86,7 @@ int main(int argc, char* argv[]) {
};
DeviceManager device_manager(g_io_context, report_to_mqtt);
MqttRouter mqtt_router(mqtt_client, device_manager);
MqttRouter mqtt_router(mqtt_client, device_manager, "../config/devices.json");
std::vector<uint16_t> listen_ports = { 12345 };
TCPServer tcp_server(g_io_context, listen_ports, mqtt_client);
SystemMonitor::SystemMonitor monitor;

View File

@ -2,50 +2,112 @@
#include "command_handler.h"
#include "spdlog/spdlog.h"
#include "vendor/nlohmann/json.hpp"
#include "utils/mqtt_topic_matcher.h"
#include <fstream>
using json = nlohmann::json;
// 构造函数实现更新
CommandHandler::CommandHandler(MqttClient& client, DeviceManager& deviceManager)
: m_client(client), m_device_manager(deviceManager) {}
CommandHandler::CommandHandler(MqttClient& client, DeviceManager& deviceManager, std::string config_path)
: m_client(client),
m_device_manager(deviceManager),
m_config_file_path(std::move(config_path))
{
spdlog::info("CommandHandler initialized, will persist config to: {}", m_config_file_path);
}
void CommandHandler::handle(mqtt::const_message_ptr msg) {
const std::string topic = msg->get_topic();
const std::string payload = msg->get_payload_str();
spdlog::info("CommandHandler received a command on topic '{}'", topic);
try {
auto cmd_json = json::parse(payload);
const std::string command_filter = "commands/my-edge-proxy-device-01/#";
const std::string proxy_control_filter = "proxy/control/my-edge-proxy-device-01/config/update";
const std::string ack_topic = "proxy/control/my-edge-proxy-device-01/config/ack";
// 校验命令格式
if (!cmd_json.contains("deviceId") || !cmd_json.contains("address") || !cmd_json.contains("value")) {
spdlog::warn("Command JSON from topic '{}' is missing required fields.", topic);
if (MqttUtils::topic_matches(proxy_control_filter, topic)) {
spdlog::info("CommandHandler received proxy config update on topic '{}'", topic);
if (payload.empty()) {
spdlog::warn("Config update payload is empty. Ignoring.");
m_client.publish(ack_topic, "{\"status\":\"error\",\"reason\":\"payload empty\"}");
return;
}
// 解析命令
std::string device_id = cmd_json.at("deviceId").get<std::string>();
uint16_t address = cmd_json.at("address").get<uint16_t>();
uint16_t value = cmd_json.at("value").get<uint16_t>();
try {
// 步骤 1: 尝试将新配置写入磁盘文件
std::ofstream config_file(m_config_file_path);
if (!config_file.is_open()) {
spdlog::error("Failed to open config file for writing: {}", m_config_file_path);
m_client.publish(ack_topic, "{\"status\":\"error\", \"reason\":\"failed to write file\"}");
return;
}
// 验证JSON合法性
try {
[[maybe_unused]] auto parsed_json = json::parse(payload);
} catch (const json::exception& e) {
spdlog::error("Failed to parse new config JSON: {}", e.what());
m_client.publish(ack_topic, "{\"status\":\"error\", \"reason\":\"invalid json\"}");
return;
}
// 通过 DeviceManager 发送控制指令
bool success = m_device_manager.send_control_command(device_id, address, value);
config_file << payload;
config_file.close();
spdlog::info("Successfully persisted new config to '{}'.", m_config_file_path);
// (可选) 将执行结果反馈到 MQTT形成闭环
json response_json;
response_json["success"] = success;
response_json["original_command"] = cmd_json;
std::string response_topic = "proxy/command/result/" + device_id;
m_client.publish(response_topic, response_json.dump());
if (success) {
spdlog::info("Successfully dispatched command to device '{}'.", device_id);
} else {
spdlog::error("Failed to dispatch command to device '{}'.", device_id);
// 步骤 2: 只有在写入成功后,才应用到运行时
m_device_manager.post_apply_device_configuration(payload);
m_client.publish(ack_topic, "{\"status\":\"success\"}");
} catch (const std::exception& e) {
spdlog::error("Exception while persisting config: {}", e.what());
m_client.publish(ack_topic, "{\"status\":\"error\", \"reason\":\"exception\"}");
}
} catch (const json::exception& e) {
spdlog::error("Failed to parse command JSON from topic '{}': {}", topic, e.what());
// 调用 DeviceManager 的新方法
// 这个方法必须是线程安全的,它会将实际的更新操作 post 到 g_io_context
m_device_manager.post_apply_device_configuration(payload);
m_client.publish("proxy/control/my-edge-proxy-device-01/config/ack", "{\"status\":\"received\"}");
}
else if (MqttUtils::topic_matches(command_filter, topic)) {
spdlog::info("CommandHandler received device command on topic '{}'", topic);
try {
auto cmd_json = json::parse(payload);
// 校验命令格式
if (!cmd_json.contains("deviceId") || !cmd_json.contains("address") || !cmd_json.contains("value")) {
spdlog::warn("Command JSON from topic '{}' is missing required fields.", topic);
return;
}
std::string device_id = cmd_json.at("deviceId").get<std::string>();
uint16_t address = cmd_json.at("address").get<uint16_t>();
uint16_t value = cmd_json.at("value").get<uint16_t>();
bool success = m_device_manager.send_control_command(device_id, address, value);
json response_json;
response_json["success"] = success;
response_json["original_command"] = cmd_json;
std::string response_topic = "proxy/command/result/" + device_id;
m_client.publish(response_topic, response_json.dump());
if (success) {
spdlog::info("Successfully dispatched command to device '{}'.", device_id);
} else {
spdlog::error("Failed to dispatch command to device '{}'.", device_id);
}
} catch (const json::exception& e) {
spdlog::error("Failed to parse command JSON from topic '{}': {}", topic, e.what());
}
}
else {
spdlog::warn("CommandHandler received message on unhandled topic: {}", topic);
}
}

View File

@ -5,9 +5,10 @@
class CommandHandler {
public:
explicit CommandHandler(MqttClient& client, DeviceManager& deviceManager);
explicit CommandHandler(MqttClient& client, DeviceManager& deviceManager, std::string config_path);
void handle(mqtt::const_message_ptr msg);
private:
MqttClient& m_client;
DeviceManager& m_device_manager;
std::string m_config_file_path;
};

View File

@ -2,9 +2,14 @@
#include "spdlog/spdlog.h"
#include "utils/mqtt_topic_matcher.h"
MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager) : m_client(client) {
MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager, std::string config_path)
: m_client(client),
m_config_file_path(std::move(config_path)) // +++ 新增:初始化路径
{
m_data_handler = std::make_unique<DataHandler>(m_client);
m_command_handler = std::make_unique<CommandHandler>(m_client, deviceManager);
// +++ 修改:将 config_path 传递给 CommandHandler +++
m_command_handler = std::make_unique<CommandHandler>(m_client, deviceManager, m_config_file_path);
m_client.set_message_callback([this](mqtt::const_message_ptr msg) {
this->on_message_arrived(std::move(msg));
@ -15,20 +20,22 @@ MqttRouter::MqttRouter(MqttClient& client, DeviceManager& deviceManager) : m_cli
void MqttRouter::start() {
m_client.subscribe("devices/+/data");
m_client.subscribe("commands/my-edge-proxy-device-01/#");
m_client.subscribe("proxy/control/my-edge-proxy-device-01/config/update");
spdlog::info("MqttRouter has subscribed to all topics.");
}
void MqttRouter::on_message_arrived(mqtt::const_message_ptr msg) {
const auto& topic = msg->get_topic();
// 定义我们的主题过滤器
const std::string data_filter = "devices/+/data";
const std::string command_filter = "commands/my-edge-proxy-device-01/#";
const std::string proxy_control_filter = "proxy/control/my-edge-proxy-device-01/config/update";
if (MqttUtils::topic_matches(data_filter, topic)) {
m_data_handler->handle(msg);
}
else if (MqttUtils::topic_matches(command_filter, topic)) {
else if (MqttUtils::topic_matches(command_filter, topic) ||
MqttUtils::topic_matches(proxy_control_filter, topic)) {
m_command_handler->handle(msg);
}

View File

@ -6,15 +6,16 @@
class MqttRouter {
public:
// 构造函数现在接收 MqttClient 和所有的处理器
MqttRouter(MqttClient& client, DeviceManager& deviceManager);
MqttRouter(MqttClient& client, DeviceManager& deviceManager, std::string config_path);
void start();
private:
void on_message_arrived(mqtt::const_message_ptr msg);
MqttClient& m_client;
// 使用智能指针持有处理器实例
std::unique_ptr<DataHandler> m_data_handler;
std::unique_ptr<CommandHandler> m_command_handler;
std::string m_config_file_path;
};

View File

@ -4,7 +4,6 @@
namespace MqttUtils {
// 一个内部辅助函数,用于按 '/' 分割字符串
std::vector<std::string> split(const std::string& s, char delimiter) {
std::vector<std::string> tokens;
std::string token;
@ -28,7 +27,6 @@ bool topic_matches(const std::string& filter, const std::string& topic) {
for (size_t i = 0; i < filter_len; ++i) {
if (i >= topic_len) {
// 如果过滤器比主题长,只有当最后一个是 '#' 时才可能匹配
return (i == filter_len - 1 && filter_levels[i] == "#");
}
@ -36,17 +34,13 @@ bool topic_matches(const std::string& filter, const std::string& topic) {
const std::string& t_level = topic_levels[i];
if (f_level == "#") {
// '#' 匹配剩余所有层级
return true;
}
if (f_level != "+" && f_level != t_level) {
// 如果不是 '+' 且不相等,则不匹配
return false;
}
}
// 如果过滤器所有层级都匹配完了,主题不能比过滤器更长
return topic_len == filter_len;
}
} // namespace MqttUtils

View File

@ -1,346 +0,0 @@
感谢您提供 `Dockerfile` 和 `docker-compose.yml` 文件。这是极其宝贵的信息,它让我能将之前的 C++ 架构蓝图,完美适配到您**已经高度定制化**的 RK3588 开发环境中。
[cite\_start]您提供的 `Dockerfile` 非常出色。它已经正确处理了最复杂的部分:通过 `PPA` [cite: 1, 2] [cite\_start]安装了特定于 Rockchip 的 GStreamer 插件 (`gstreamer1.0-rockchip` [cite: 2][cite\_start]) 以及 MPP/RGA 的**C++ 开发库** (`librockchip-mpp-dev`, `librga-dev` [cite: 2])。
您的 `docker-compose.yml` 也配置正确,通过 `devices` 和 `group_add` 将 NPU/VPU/RGA 硬件能力 成功映射到了容器内。
我们的架构决策保持不变:**将流媒体服务作为独立的 C++ 进程**。这可以确保它在崩溃时(例如 C++ 的段错误)**绝对不会**影响您现有的 Modbus/MQTT 核心服务。
基于您的环境,我为您制定了以下**高度定制**的开发计划。
-----
### A. 核心架构规划 (基于您的 Docker 环境)
1. **独立进程:** 我们将开发一个全新的 C++ 可执行文件(例如 `edge-streamer-cpp`)。
2. [cite\_start]**共享容器:** 这个新进程将与您现有的Modbus/MQTTC++ 服务**运行在同一个 `edge-proxy-dev` 容器中**。这允许它们共享所有硬件资源 和已安装的库 [cite: 2],同时保持进程级的故障隔离。
3. **开发工作流:** 您将在主机上编写代码(位于 `docker-compose.yml` 同级的目录中),代码会通过 `volumes` 自动同步到容器的 `/app` 目录。您将在容器内执行所有编译和运行操作。
4. **服务间通信 (IPC) 优化:**
* **控制C++ -\> C++** 您现有的服务Modbus将通过 **HTTP**(例如 `http://localhost:8001/api/start`)来控制新的流媒体服务。
* [cite\_start]**AI 结果C++ -\> MQTT** 新的流媒体服务在获得 AI 结果后,将使用 `paho.mqtt.cpp` 库(您的 `Dockerfile` 已经编译了它 [cite: 8])将 JSON 结果直接发布到 `docker-compose.yml` 中定义的 `mqtt-broker` 服务。这是最高效、最解耦的方案。
-----
### B. 关键发现:环境依赖检查 (RKNN)
[cite\_start]我发现了一个关键点:您的 `Dockerfile` 安装了 GStreamer 和 VPU/MPP 的开发库 (`librockchip-mpp-dev` [cite: 2]),这对于**视频编解码**是完美的。
但是,它**缺失了 AI 检测所需的 NPU (RKNN) C-API 开发库**(即 `rknn_api.h` 和 `librknnrt.so`)。
我的计划将基于您需要 NPU 加速 AI 检测的前提。因此,**我们的第一步必须是**将这些缺失的库添加到您的 Docker 镜像中。
-----
### C. 阶段一:完善您的 Docker 环境 (添加 RKNN, HTTP, JSON)
**目标:** 将缺失的 NPU C-API、C++ HTTP 库和 C++ JSON 库添加到您的 `edge-proxy-dev` 镜像中。
**步骤:**
1. **准备 RKNN SDK (在主机上)**
* 在您的主机项目目录(`docker-compose.yml` 所在的目录)下,创建一个新目录,例如 `docker/rknn_sdk/`。
* 从您的 RK3588 SDK 中,复制 `include/rknn_api.h` 到 `docker/rknn_sdk/include/rknn_api.h`。
* 复制 `lib/librknnrt.so` 到 `docker/rknn_sdk/lib/librknnrt.so`。
2. **修改您的 `docker/Dockerfile`**
* [cite\_start]找到构建 `paho.mqtt.cpp` [cite: 8] 的 `RUN` 指令块。
* [cite\_start]在 `cmake --build build --target install && \` [cite: 9] [cite\_start]之后,`rm -rf /tmp/build-context` [cite: 9] 之前,插入以下代码:
<!-- end list -->
```dockerfile
# ... (paho.mqtt.cpp 的 cmake install)
cmake --build build --target install && \
# --- 规划师建议:添加 RKNN, HTTP, JSON 库 ---
# 1. 复制 RKNN C-API (NPU 库) (假设已按步骤1放置)
# 注意COPY 指令的源路径是相对于 docker-compose.yml 的 context
COPY docker/rknn_sdk/include/rknn_api.h /usr/local/include/
COPY docker/rknn_sdk/lib/librknnrt.so /usr/local/lib/
# 2. 安装 C++ HTTP 和 JSON 的 header-only 库
# (需要先安装 curl)
apt-get update && apt-get install -y --no-install-recommends curl && \
# C++ HTTP Lib
curl -L https://raw.githubusercontent.com/yhirose/cpp-httplib/master/httplib.h -o /usr/local/include/httplib.h && \
# C++ JSON Lib
curl -L https://github.com/nlohmann/json/releases/latest/download/json.hpp -o /usr/local/include/json.hpp && \
# 3. 更新动态链接库缓存 (使系统找到 librknnrt.so)
ldconfig && \
# 4. 清理 apt 缓存
apt-get remove -y curl && \
apt-get autoremove -y && \
# (Dockerfile 原有的清理命令)
rm -rf /tmp/build-context
# (Dockerfile 的剩余部分)
RUN rm -rf /var/lib/apt/lists/*
# ...
```
3. **重建 Docker 镜像 (关键步骤)**
* 在主机的终端中执行:
<!-- end list -->
```bash
docker-compose build edge-proxy-dev
```
-----
### D. 阶段二:开发工作流程与项目设置
1. **启动开发环境:**
```bash
docker-compose up -d
```
* 这将启动 `edge-proxy-dev` 和 `mqtt-broker` 两个服务。
2. **创建新服务目录 (在主机上)**
* 在您的项目根目录(`docker-compose.yml` 所在位置)创建一个新目录:
<!-- end list -->
```bash
mkdir edge-streamer-cpp
```
3. **进入容器的开发 Shell**
```bash
docker-compose exec edge-proxy-dev /bin/bash
```
4. **在容器内初始化项目骨架:**
* **注意:** 以下所有命令均在 **容器内的 shell** (`/app` 目录) 中执行。
<!-- end list -->
```bash
# (容器内)
# /app 目录是您主机项目的挂载点
cd /app/edge-streamer-cpp
# 创建源码和构建目录
mkdir src include build
# 创建初始文件 (您将在主机上编辑它们)
touch src/main.cpp
touch src/StreamManager.cpp
touch include/StreamManager.h
touch CMakeLists.txt
```
5. **开始编码:**
* 现在,在您的**主机**上,使用您喜欢的 IDE (如 VS Code) 打开 `edge-streamer-cpp` 目录,开始编辑 `CMakeLists.txt` 和 `.cpp` / `.h` 文件。
-----
### E. 阶段三:配置 CMakeLists.txt
[cite\_start]**目标:** 配置 CMake使其能正确链接 GStreamer [cite: 2][cite\_start]、RKNN (阶段 C 添加的)、Paho MQTT [cite: 7, 8] 和 HTTP/JSON 库。
* 将以下内容粘贴到您在**主机**上打开的 `edge-streamer-cpp/CMakeLists.txt` 文件中:
```cmake
cmake_minimum_required(VERSION 3.10)
project(EdgeStreamer CXX)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# [cite_start]--- 1. 查找 GStreamer (来自 Dockerfile) [cite: 2] ---
find_package(PkgConfig REQUIRED)
pkg_check_modules(GST REQUIRED gstreamer-1.0 gstreamer-app-1.0)
# [cite_start]--- 2. 查找 Paho MQTT (来自 Dockerfile) [cite: 7, 8] ---
# 您的 Dockerfile 编译并安装了它,我们可以直接 find_package
find_package(paho-mqttpp3 REQUIRED) # C++ 库
find_package(paho-mqtt-c REQUIRED) # C 库 (依赖)
# --- 3. 包含 RKNN, HTTP, JSON (来自 阶段C) ---
# 这些头文件已在 /usr/local/include会自动被 C++ 编译器找到
include_directories(
${CMAKE_CURRENT_SOURCE_DIR}/include
${GST_INCLUDE_DIRS}
/usr/local/include
)
# --- 4. 定义可执行文件 ---
add_executable(edge-streamer
src/main.cpp
src/StreamManager.cpp
)
# --- 5. 链接所有库 ---
target_link_libraries(edge-streamer
PRIVATE
# GStreamer
${GST_LIBRARIES}
# Paho MQTT (C++ 和 C)
paho-mqttpp3
paho-mqtt-c
# RKNN (来自 librknnrt.so)
rknnrt
# 线程 (GStreamer, httplib, Paho 都需要)
pthread
# [cite_start]Paho-MQTT SSL 依赖 [cite: 9] [cite_start](Dockerfile 有 libssl-dev [cite: 2])
ssl
crypto
)
```
-----
### F. 阶段四C++ 核心代码实现 (IPC 调整)
您在 `src/` 和 `include/` 中的代码逻辑与我上一个计划C++ 核心版)非常相似,但**IPC 部分将进行关键优化**
* **`main.cpp`**
* 完全不变。使用 `httplib.h` 在 `8001` 端口(或您选择的端口)启动一个 HTTP 服务,用于接收您现有 Modbus 服务的 "Start/Stop Stream" 命令。
* **`include/StreamManager.h`**
* 需要添加 `paho.mqtt.cpp` 的头文件和成员变量:
<!-- end list -->
```cpp
#include <mqtt/async_client.h> // Paho MQTT C++
// ...
class StreamManager {
// ...
private:
// ... (GstElement* 等)
// --- 新增 MQTT 客户端 ---
const std::string m_mqtt_server_address = "tcp://mqtt-broker:1883"; //
const std::string m_mqtt_client_id = "edge-streamer-ai";
mqtt::async_client m_mqtt_client;
};
```
* **`src/StreamManager.cpp` (关键的 AI 回调调整)**
* 在 `StreamManager` 的构造函数或初始化方法中,连接到 MQTT Broker。
* 在 GStreamer 的 `on_new_sample_from_sink` 回调函数中,当 RKNN 推理完成后:
<!-- end list -->
```cpp
// (伪代码)
// 静态 GStreamer 回调函数
static GstFlowReturn on_new_sample_from_sink(GstAppSink *sink, gpointer user_data) {
StreamManager *manager = static_cast<StreamManager*>(user_data);
// ... (1. 拉取 GstBuffer) ...
// ... (2. 调用 RKNN C-API 进行推理) ...
// rknn_outputs_get(ctx, 1, outputs, NULL);
// ... (3. (TODO) 将 'outputs' 格式化为 JSON 字符串) ...
// nlohmann::json ai_result;
// ai_result["stream_id"] = manager->get_stream_id();
// ai_result["object_count"] = ...;
// std::string payload = ai_result.dump();
// 4. (优化) 将 JSON 结果发布到内部 MQTT Broker
try {
std::string topic = "ai/results/" + manager->get_stream_id();
manager->get_mqtt_client().publish(topic, payload);
} catch (const mqtt::exception& exc) {
std::cerr << "Error publishing to MQTT: " << exc.what() << std::endl;
}
// ... (5. 释放 GstBuffer 和 RKNN outputs) ...
return GST_FLOW_OK;
}
```
-----
### G. 阶段五:编译与运行 (容器内)
在您**主机**上编写完代码后,回到您**容器内的 shell** (`docker-compose exec ...` 的那个)。
1. **编译项目 (在容器内)**
```bash
# (容器内)
cd /app/edge-streamer-cpp/build
# 运行 CMake (仅需一次)
cmake ..
# 编译 (每次代码变更后执行)
make -j$(nproc)
```
2. **运行新服务 (在容器内)**
```bash
# (容器内)
# 编译好的可执行文件位于 build/ 目录
./edge-streamer
# 您应该会看到 HTTP API 和 GStreamer 启动的日志
```
3. **测试 (在 *第二个* 容器 Shell 中)**
* 在**主机**上打开一个**新的**终端,再次 `exec` 进同一个容器:
<!-- end list -->
```bash
docker-compose exec edge-proxy-dev /bin/bash
```
* **A. 测试 API (控制)**
```bash
# (第二个容器内)
# (需要先 apt-get install curl
curl -X POST http://localhost:8001/api/v1/stream/start \
-d '{"stream_id":"cam1", "rtsp_url":"rtsp://..."}'
```
* **B. 监听 AI 结果 (MQTT)**
```bash
# (第二个容器内)
# (需要先 apt-get install mosquitto-clients)
mosquitto_sub -h mqtt-broker -t "ai/results/#" -v
```
* 您现在应该能在一个终端看到服务日志,在另一个终端看到 AI 推理结果。
-----
### H. 阶段六:生产部署 (可选)
您当前的 `docker-compose.yml` 使用 `command: sleep infinity`,这非常适合开发。
当您准备部署时,您需要一个进程管理器来同时启动和监控您的**两个 C++ 服务**Modbus服务 和 `edge-streamer`服务)。
**推荐方案:**
1. 修改 `Dockerfile` 以安装 `supervisor` (`apt-get install -y supervisor`)。
2. 创建一个 `supervisord.conf` 文件,配置 `[program:modbus_service]` 和 `[program:streamer_service]`。
3. 修改 `docker-compose.yml`,将 `command:` 更改为 `"/usr/bin/supervisord -c /etc/supervisor/supervisord.conf"`。
这将确保两个独立的服务都在容器启动时自动运行,并能在崩溃时自动重启。