废弃 ChannelManager 类,改用 SocketConnectionManager 处理 Socket 连接,优化指令发送和在线状态检查逻辑

This commit is contained in:
syruan 2025-12-24 09:07:17 +08:00
parent 25b11bbe04
commit 1a7aa385f1
5 changed files with 124 additions and 46 deletions

View File

@ -9,6 +9,8 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import com.bonus.sgzb.material.utils.JTT808MessageProcessor;
import com.bonus.sgzb.material.utils.SocketConnectionManager;
import com.bonus.sgzb.material.service.IJtt808DataService;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@ -40,6 +42,12 @@ public class JTT808ServerConfig {
@Autowired
private JTT808MessageProcessor messageProcessor;
@Autowired
private SocketConnectionManager socketConnectionManager;
@Autowired
private IJtt808DataService jtt808DataService;
private ServerSocket serverSocket;
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicInteger activeConnections = new AtomicInteger(0);
@ -177,6 +185,7 @@ public class JTT808ServerConfig {
private final Socket clientSocket;
private final int connectionId;
private final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
private String terminalPhoneNumber = null; // 记录终端手机号
public ClientHandler(Socket clientSocket, int connectionId) {
this.clientSocket = clientSocket;
@ -211,6 +220,23 @@ public class JTT808ServerConfig {
// 减少活跃连接计数
int remaining = activeConnections.decrementAndGet();
// 从连接管理器中移除连接
String phoneNumber = socketConnectionManager.getPhoneNumberBySocket(clientSocket);
if (phoneNumber != null) {
boolean removed = socketConnectionManager.removeConnection(phoneNumber);
if (removed) {
logger.info("✅ 终端 {} 连接已从连接管理器移除", phoneNumber);
// 更新数据库中的离线状态
try {
jtt808DataService.updateTerminalOnlineStatus(phoneNumber, false, clientAddress);
logger.info("✅ 终端 {} 离线状态已更新到数据库", phoneNumber);
} catch (Exception e) {
logger.error("更新终端离线状态异常: {}", e.getMessage());
}
}
}
try {
clientSocket.close();
logger.info("连接 #{} 已关闭 (剩余活跃连接: {}): {}", connectionId, remaining, clientAddress);
@ -236,7 +262,7 @@ public class JTT808ServerConfig {
// 使用消息处理器解析和处理消息
byte[] responseData = messageProcessor.processMessage(
Arrays.copyOf(data, length), clientAddress);
Arrays.copyOf(data, length), clientAddress, clientSocket);
// 发送应答消息
if (responseData != null) {

View File

@ -211,7 +211,8 @@ public class Jtt808DataController {
return AjaxResult.error(result.get("message").toString()).put("data", result);
}
} catch (Exception e) {
return AjaxResult.error("发送指令失败: " + e.getMessage());
System.err.println("指令下发失败:" + e.getMessage());
throw e;
}
}
@ -228,7 +229,8 @@ public class Jtt808DataController {
Map<String, Object> result = jtt808CommandService.sendTextCommandBatch(phoneList, command);
return AjaxResult.success(result);
} catch (Exception e) {
return AjaxResult.error("批量发送指令失败: " + e.getMessage());
System.err.println("批量发送指令失败: " + e.getMessage());
throw e;
}
}
@ -246,7 +248,8 @@ public class Jtt808DataController {
result.put("status", online ? "在线" : "离线");
return AjaxResult.success(result);
} catch (Exception e) {
return AjaxResult.error("查询在线状态失败: " + e.getMessage());
System.err.println("查询在线状态失败: " + e.getMessage());
throw e;
}
}
@ -271,7 +274,8 @@ public class Jtt808DataController {
return AjaxResult.success(results);
} catch (Exception e) {
return AjaxResult.error("批量查询在线状态失败: " + e.getMessage());
System.err.println("批量查询在线状态失败: " + e.getMessage());
throw e;
}
}
}

View File

@ -2,11 +2,9 @@ package com.bonus.sgzb.material.service.impl;
import com.bonus.sgzb.material.service.IJtt808CommandService;
import com.bonus.sgzb.material.service.IJtt808DataService;
import com.bonus.sgzb.material.utils.ChannelManager;
import com.bonus.sgzb.material.utils.SocketConnectionManager;
import com.bonus.sgzb.material.utils.JTT808MessageBuilder;
import com.bonus.sgzb.material.domain.Jtt808Terminal;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -16,28 +14,28 @@ import java.util.*;
/**
* JTT808指令下发服务实现
*
*
* @author system
* @date 2025-12-19
*/
@Service
public class Jtt808CommandServiceImpl implements IJtt808CommandService {
private static final Logger logger = LoggerFactory.getLogger(Jtt808CommandServiceImpl.class);
@Autowired
private ChannelManager channelManager;
private SocketConnectionManager socketConnectionManager;
@Autowired
private IJtt808DataService jtt808DataService;
@Autowired
private JTT808MessageBuilder messageBuilder;
@Override
public Map<String, Object> sendTextCommand(String phoneNumber, String command) {
Map<String, Object> result = new HashMap<>();
try {
// 检查终端是否存在
Jtt808Terminal terminal = jtt808DataService.getTerminalByPhone(phoneNumber);
@ -46,16 +44,15 @@ public class Jtt808CommandServiceImpl implements IJtt808CommandService {
result.put("message", "终端不存在: " + phoneNumber);
return result;
}
// 检查终端是否在线
Channel channel = channelManager.get(phoneNumber);
if (channel == null || !channel.isActive()) {
// 检查终端是否在线使用 SocketConnectionManager
if (!socketConnectionManager.isOnline(phoneNumber)) {
result.put("success", false);
result.put("message", "终端离线: " + phoneNumber);
result.put("terminalInfo", terminal);
return result;
}
// 构造文本信息下发消息0x8300
byte[] message = messageBuilder.buildTextMessage(phoneNumber, command);
if (message == null) {
@ -63,15 +60,15 @@ public class Jtt808CommandServiceImpl implements IJtt808CommandService {
result.put("message", "构造指令消息失败");
return result;
}
// 发送消息
logger.info("向终端 {} 发送文本指令: {}", phoneNumber, command);
logger.debug("消息内容: {}", bytesToHex(message));
ChannelFuture future = channel.writeAndFlush(message);
future.await(5000); // 等待5秒
if (future.isSuccess()) {
// 使用 SocketConnectionManager 发送数据
boolean sent = socketConnectionManager.sendData(phoneNumber, message);
if (sent) {
result.put("success", true);
result.put("message", "指令发送成功");
result.put("phoneNumber", phoneNumber);
@ -80,8 +77,8 @@ public class Jtt808CommandServiceImpl implements IJtt808CommandService {
logger.info("✅ 指令发送成功: {} -> {}", phoneNumber, command);
} else {
result.put("success", false);
result.put("message", "指令发送失败: " + future.cause());
logger.error("❌ 指令发送失败: {} -> {}, 原因: {}", phoneNumber, command, future.cause());
result.put("message", "指令发送失败");
logger.error("❌ 指令发送失败: {} -> {}", phoneNumber, command);
}
} catch (Exception e) {
@ -145,13 +142,19 @@ public class Jtt808CommandServiceImpl implements IJtt808CommandService {
@Override
public boolean isTerminalOnline(String phoneNumber) {
Channel channel = channelManager.get(phoneNumber);
return channel != null && channel.isActive();
// 从数据库查询终端在线状态
Jtt808Terminal terminal = jtt808DataService.getTerminalByPhone(phoneNumber);
if (terminal == null) {
return false;
}
// onlineStatus: 1=在线, 0=离线
return terminal.getOnlineStatus() != null && terminal.getOnlineStatus() == 1;
}
@Override
public Object getTerminalChannel(String phoneNumber) {
return channelManager.get(phoneNumber);
// 返回 Socket 连接
return socketConnectionManager.getConnection(phoneNumber);
}
/**

View File

@ -16,7 +16,12 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* @author 马三炮
* @date 2025/6/5
*
* @deprecated 此类已废弃不再使用
* 原因项目使用原生 Socket 服务器而非 Netty 框架
* 请使用 {@link SocketConnectionManager} 代替
*/
@Deprecated
@Slf4j
@Component
public class ChannelManager {
@ -50,11 +55,23 @@ public class ChannelManager {
}
public boolean remove(String terminalPhone) {
return channelGroup.remove(channelIdMap.remove(terminalPhone));
ChannelId channelId = channelIdMap.remove(terminalPhone);
if (channelId == null) {
return false;
}
Channel channel = channelGroup.find(channelId);
if (channel == null) {
return false;
}
return channelGroup.remove(channel);
}
public Channel get(String terminalPhone) {
return channelGroup.find(channelIdMap.get(terminalPhone));
ChannelId channelId = channelIdMap.get(terminalPhone);
if (channelId == null) {
return null;
}
return channelGroup.find(channelId);
}
public ChannelGroup getChannelGroup() {

View File

@ -10,6 +10,7 @@ import org.springframework.stereotype.Component;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
@ -30,6 +31,9 @@ public class JTT808MessageProcessor {
@Autowired
private IGeofenceService geofenceService;
@Autowired
private SocketConnectionManager socketConnectionManager;
// JTT808消息ID常量
private static final int MSG_TERMINAL_REGISTER = 0x0100; // 终端注册
private static final int MSG_TERMINAL_AUTH = 0x0102; // 终端鉴权
@ -50,8 +54,13 @@ public class JTT808MessageProcessor {
/**
* 处理接收到的JTT808消息
*
* @param data 消息数据
* @param clientAddress 客户端地址
* @param clientSocket 客户端 Socket 连接
* @return 应答消息
*/
public byte[] processMessage(byte[] data, String clientAddress) {
public byte[] processMessage(byte[] data, String clientAddress, Socket clientSocket) {
try {
logger.info("开始处理来自客户端 {} 的JTT808消息数据长度: {}", clientAddress, data.length);
@ -90,7 +99,7 @@ public class JTT808MessageProcessor {
clientAddress, String.format("%04X", message.messageId), getMessageTypeName(message.messageId), message.phoneNumber, message.serialNumber);
// 根据消息类型处理
return handleMessage(message, clientAddress);
return handleMessage(message, clientAddress, clientSocket);
} catch (Exception e) {
logger.error("处理JTT808消息异常客户端: {}, 错误: {}", clientAddress, e.getMessage(), e);
@ -116,9 +125,9 @@ public class JTT808MessageProcessor {
logger.info("========== 测试处理日志中的70字节数据 ==========");
logger.info("原始数据: {}", hexData);
// 调用处理方法
byte[] response = processMessage(testData, "TEST_CLIENT");
// 调用处理方法测试时传入 null Socket
byte[] response = processMessage(testData, "TEST_CLIENT", null);
if (response != null) {
logger.info("测试成功:生成了应答消息,长度: {} bytes", response.length);
@ -424,7 +433,7 @@ public class JTT808MessageProcessor {
/**
* 根据消息类型处理消息
*/
private byte[] handleMessage(JTT808Message message, String clientAddress) {
private byte[] handleMessage(JTT808Message message, String clientAddress, Socket clientSocket) {
String messageType = getMessageTypeName(message.messageId);
byte[] response = null;
int processResult = 0; // 0-成功, 1-失败
@ -436,7 +445,7 @@ public class JTT808MessageProcessor {
response = handleTerminalRegister(message, clientAddress);
break;
case MSG_TERMINAL_AUTH:
response = handleTerminalAuth(message, clientAddress);
response = handleTerminalAuth(message, clientAddress, clientSocket);
break;
case MSG_HEARTBEAT:
response = handleHeartbeat(message, clientAddress);
@ -582,25 +591,44 @@ public class JTT808MessageProcessor {
/**
* 处理终端鉴权
*/
private byte[] handleTerminalAuth(JTT808Message message, String clientAddress) {
private byte[] handleTerminalAuth(JTT808Message message, String clientAddress, Socket clientSocket) {
logger.info("处理终端鉴权: 手机号={}", message.phoneNumber);
if (message.body != null) {
String authCode = new String(message.body).trim();
logger.info("鉴权码: {}", authCode);
// TODO: 验证鉴权码
// 验证鉴权码
boolean authValid = validateAuthCode(message.phoneNumber, authCode);
if (authValid) {
logger.info("终端 {} 鉴权成功", message.phoneNumber);
// 鉴权成功后 Socket 连接注册到连接管理器
if (clientSocket != null) {
boolean registered = socketConnectionManager.addConnection(message.phoneNumber, clientSocket);
if (registered) {
logger.info("✅ 终端 {} 连接已注册到连接管理器", message.phoneNumber);
// 更新数据库中的在线状态
try {
jtt808DataService.updateTerminalOnlineStatus(message.phoneNumber, true, clientAddress);
logger.info("✅ 终端 {} 在线状态已更新到数据库", message.phoneNumber);
} catch (Exception e) {
logger.error("更新终端在线状态异常: {}", e.getMessage());
}
} else {
logger.warn("⚠️ 终端 {} 连接注册失败", message.phoneNumber);
}
}
return buildGeneralResponse(message, RESULT_SUCCESS);
} else {
logger.warn("终端 {} 鉴权失败", message.phoneNumber);
return buildGeneralResponse(message, RESULT_FAILURE);
}
}
return buildGeneralResponse(message, RESULT_ERROR_MESSAGE);
}