diff --git a/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/config/JTT808ServerConfig.java b/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/config/JTT808ServerConfig.java index 611fa8b..472febc 100644 --- a/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/config/JTT808ServerConfig.java +++ b/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/config/JTT808ServerConfig.java @@ -9,6 +9,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import com.bonus.sgzb.material.utils.JTT808MessageProcessor; +import com.bonus.sgzb.material.utils.SocketConnectionManager; +import com.bonus.sgzb.material.service.IJtt808DataService; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -40,6 +42,12 @@ public class JTT808ServerConfig { @Autowired private JTT808MessageProcessor messageProcessor; + @Autowired + private SocketConnectionManager socketConnectionManager; + + @Autowired + private IJtt808DataService jtt808DataService; + private ServerSocket serverSocket; private final AtomicBoolean running = new AtomicBoolean(false); private final AtomicInteger activeConnections = new AtomicInteger(0); @@ -177,6 +185,7 @@ public class JTT808ServerConfig { private final Socket clientSocket; private final int connectionId; private final Logger logger = LoggerFactory.getLogger(ClientHandler.class); + private String terminalPhoneNumber = null; // 记录终端手机号 public ClientHandler(Socket clientSocket, int connectionId) { this.clientSocket = clientSocket; @@ -211,6 +220,23 @@ public class JTT808ServerConfig { // 减少活跃连接计数 int remaining = activeConnections.decrementAndGet(); + // ✅ 从连接管理器中移除连接 + String phoneNumber = socketConnectionManager.getPhoneNumberBySocket(clientSocket); + if (phoneNumber != null) { + boolean removed = socketConnectionManager.removeConnection(phoneNumber); + if (removed) { + logger.info("✅ 终端 {} 连接已从连接管理器移除", phoneNumber); + + // 更新数据库中的离线状态 + try { + jtt808DataService.updateTerminalOnlineStatus(phoneNumber, false, clientAddress); + logger.info("✅ 终端 {} 离线状态已更新到数据库", phoneNumber); + } catch (Exception e) { + logger.error("更新终端离线状态异常: {}", e.getMessage()); + } + } + } + try { clientSocket.close(); logger.info("连接 #{} 已关闭 (剩余活跃连接: {}): {}", connectionId, remaining, clientAddress); @@ -236,7 +262,7 @@ public class JTT808ServerConfig { // 使用消息处理器解析和处理消息 byte[] responseData = messageProcessor.processMessage( - Arrays.copyOf(data, length), clientAddress); + Arrays.copyOf(data, length), clientAddress, clientSocket); // 发送应答消息 if (responseData != null) { diff --git a/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/controller/Jtt808DataController.java b/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/controller/Jtt808DataController.java index 245060f..01a497e 100644 --- a/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/controller/Jtt808DataController.java +++ b/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/controller/Jtt808DataController.java @@ -211,7 +211,8 @@ public class Jtt808DataController { return AjaxResult.error(result.get("message").toString()).put("data", result); } } catch (Exception e) { - return AjaxResult.error("发送指令失败: " + e.getMessage()); + System.err.println("指令下发失败:" + e.getMessage()); + throw e; } } @@ -228,7 +229,8 @@ public class Jtt808DataController { Map result = jtt808CommandService.sendTextCommandBatch(phoneList, command); return AjaxResult.success(result); } catch (Exception e) { - return AjaxResult.error("批量发送指令失败: " + e.getMessage()); + System.err.println("批量发送指令失败: " + e.getMessage()); + throw e; } } @@ -246,7 +248,8 @@ public class Jtt808DataController { result.put("status", online ? "在线" : "离线"); return AjaxResult.success(result); } catch (Exception e) { - return AjaxResult.error("查询在线状态失败: " + e.getMessage()); + System.err.println("查询在线状态失败: " + e.getMessage()); + throw e; } } @@ -271,7 +274,8 @@ public class Jtt808DataController { return AjaxResult.success(results); } catch (Exception e) { - return AjaxResult.error("批量查询在线状态失败: " + e.getMessage()); + System.err.println("批量查询在线状态失败: " + e.getMessage()); + throw e; } } } \ No newline at end of file diff --git a/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/service/impl/Jtt808CommandServiceImpl.java b/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/service/impl/Jtt808CommandServiceImpl.java index 7de3c7d..5e48ca1 100644 --- a/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/service/impl/Jtt808CommandServiceImpl.java +++ b/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/service/impl/Jtt808CommandServiceImpl.java @@ -2,11 +2,9 @@ package com.bonus.sgzb.material.service.impl; import com.bonus.sgzb.material.service.IJtt808CommandService; import com.bonus.sgzb.material.service.IJtt808DataService; -import com.bonus.sgzb.material.utils.ChannelManager; +import com.bonus.sgzb.material.utils.SocketConnectionManager; import com.bonus.sgzb.material.utils.JTT808MessageBuilder; import com.bonus.sgzb.material.domain.Jtt808Terminal; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -16,28 +14,28 @@ import java.util.*; /** * JTT808指令下发服务实现 - * + * * @author system * @date 2025-12-19 */ @Service public class Jtt808CommandServiceImpl implements IJtt808CommandService { - + private static final Logger logger = LoggerFactory.getLogger(Jtt808CommandServiceImpl.class); - + @Autowired - private ChannelManager channelManager; - + private SocketConnectionManager socketConnectionManager; + @Autowired private IJtt808DataService jtt808DataService; - + @Autowired private JTT808MessageBuilder messageBuilder; @Override public Map sendTextCommand(String phoneNumber, String command) { Map result = new HashMap<>(); - + try { // 检查终端是否存在 Jtt808Terminal terminal = jtt808DataService.getTerminalByPhone(phoneNumber); @@ -46,16 +44,15 @@ public class Jtt808CommandServiceImpl implements IJtt808CommandService { result.put("message", "终端不存在: " + phoneNumber); return result; } - - // 检查终端是否在线 - Channel channel = channelManager.get(phoneNumber); - if (channel == null || !channel.isActive()) { + + // 检查终端是否在线(使用 SocketConnectionManager) + if (!socketConnectionManager.isOnline(phoneNumber)) { result.put("success", false); result.put("message", "终端离线: " + phoneNumber); result.put("terminalInfo", terminal); return result; } - + // 构造文本信息下发消息(0x8300) byte[] message = messageBuilder.buildTextMessage(phoneNumber, command); if (message == null) { @@ -63,15 +60,15 @@ public class Jtt808CommandServiceImpl implements IJtt808CommandService { result.put("message", "构造指令消息失败"); return result; } - + // 发送消息 logger.info("向终端 {} 发送文本指令: {}", phoneNumber, command); logger.debug("消息内容: {}", bytesToHex(message)); - - ChannelFuture future = channel.writeAndFlush(message); - future.await(5000); // 等待5秒 - - if (future.isSuccess()) { + + // 使用 SocketConnectionManager 发送数据 + boolean sent = socketConnectionManager.sendData(phoneNumber, message); + + if (sent) { result.put("success", true); result.put("message", "指令发送成功"); result.put("phoneNumber", phoneNumber); @@ -80,8 +77,8 @@ public class Jtt808CommandServiceImpl implements IJtt808CommandService { logger.info("✅ 指令发送成功: {} -> {}", phoneNumber, command); } else { result.put("success", false); - result.put("message", "指令发送失败: " + future.cause()); - logger.error("❌ 指令发送失败: {} -> {}, 原因: {}", phoneNumber, command, future.cause()); + result.put("message", "指令发送失败"); + logger.error("❌ 指令发送失败: {} -> {}", phoneNumber, command); } } catch (Exception e) { @@ -145,13 +142,19 @@ public class Jtt808CommandServiceImpl implements IJtt808CommandService { @Override public boolean isTerminalOnline(String phoneNumber) { - Channel channel = channelManager.get(phoneNumber); - return channel != null && channel.isActive(); + // 从数据库查询终端在线状态 + Jtt808Terminal terminal = jtt808DataService.getTerminalByPhone(phoneNumber); + if (terminal == null) { + return false; + } + // onlineStatus: 1=在线, 0=离线 + return terminal.getOnlineStatus() != null && terminal.getOnlineStatus() == 1; } @Override public Object getTerminalChannel(String phoneNumber) { - return channelManager.get(phoneNumber); + // 返回 Socket 连接 + return socketConnectionManager.getConnection(phoneNumber); } /** diff --git a/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/utils/ChannelManager.java b/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/utils/ChannelManager.java index 13fdcd1..0ec69da 100644 --- a/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/utils/ChannelManager.java +++ b/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/utils/ChannelManager.java @@ -16,7 +16,12 @@ import java.util.concurrent.ConcurrentHashMap; /** * @author 马三炮 * @date 2025/6/5 + * + * @deprecated 此类已废弃,不再使用。 + * 原因:项目使用原生 Socket 服务器,而非 Netty 框架。 + * 请使用 {@link SocketConnectionManager} 代替。 */ +@Deprecated @Slf4j @Component public class ChannelManager { @@ -50,11 +55,23 @@ public class ChannelManager { } public boolean remove(String terminalPhone) { - return channelGroup.remove(channelIdMap.remove(terminalPhone)); + ChannelId channelId = channelIdMap.remove(terminalPhone); + if (channelId == null) { + return false; + } + Channel channel = channelGroup.find(channelId); + if (channel == null) { + return false; + } + return channelGroup.remove(channel); } public Channel get(String terminalPhone) { - return channelGroup.find(channelIdMap.get(terminalPhone)); + ChannelId channelId = channelIdMap.get(terminalPhone); + if (channelId == null) { + return null; + } + return channelGroup.find(channelId); } public ChannelGroup getChannelGroup() { diff --git a/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/utils/JTT808MessageProcessor.java b/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/utils/JTT808MessageProcessor.java index 1af07ff..9a067e5 100644 --- a/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/utils/JTT808MessageProcessor.java +++ b/sgzb-modules/sgzb-material/src/main/java/com/bonus/sgzb/material/utils/JTT808MessageProcessor.java @@ -10,6 +10,7 @@ import org.springframework.stereotype.Component; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.net.Socket; import java.util.Arrays; import java.util.Calendar; import java.util.Date; @@ -30,6 +31,9 @@ public class JTT808MessageProcessor { @Autowired private IGeofenceService geofenceService; + @Autowired + private SocketConnectionManager socketConnectionManager; + // JTT808消息ID常量 private static final int MSG_TERMINAL_REGISTER = 0x0100; // 终端注册 private static final int MSG_TERMINAL_AUTH = 0x0102; // 终端鉴权 @@ -50,8 +54,13 @@ public class JTT808MessageProcessor { /** * 处理接收到的JTT808消息 + * + * @param data 消息数据 + * @param clientAddress 客户端地址 + * @param clientSocket 客户端 Socket 连接 + * @return 应答消息 */ - public byte[] processMessage(byte[] data, String clientAddress) { + public byte[] processMessage(byte[] data, String clientAddress, Socket clientSocket) { try { logger.info("开始处理来自客户端 {} 的JTT808消息,数据长度: {}", clientAddress, data.length); @@ -90,7 +99,7 @@ public class JTT808MessageProcessor { clientAddress, String.format("%04X", message.messageId), getMessageTypeName(message.messageId), message.phoneNumber, message.serialNumber); // 根据消息类型处理 - return handleMessage(message, clientAddress); + return handleMessage(message, clientAddress, clientSocket); } catch (Exception e) { logger.error("处理JTT808消息异常,客户端: {}, 错误: {}", clientAddress, e.getMessage(), e); @@ -116,9 +125,9 @@ public class JTT808MessageProcessor { logger.info("========== 测试处理日志中的70字节数据 =========="); logger.info("原始数据: {}", hexData); - - // 调用处理方法 - byte[] response = processMessage(testData, "TEST_CLIENT"); + + // 调用处理方法(测试时传入 null Socket) + byte[] response = processMessage(testData, "TEST_CLIENT", null); if (response != null) { logger.info("测试成功:生成了应答消息,长度: {} bytes", response.length); @@ -424,7 +433,7 @@ public class JTT808MessageProcessor { /** * 根据消息类型处理消息 */ - private byte[] handleMessage(JTT808Message message, String clientAddress) { + private byte[] handleMessage(JTT808Message message, String clientAddress, Socket clientSocket) { String messageType = getMessageTypeName(message.messageId); byte[] response = null; int processResult = 0; // 0-成功, 1-失败 @@ -436,7 +445,7 @@ public class JTT808MessageProcessor { response = handleTerminalRegister(message, clientAddress); break; case MSG_TERMINAL_AUTH: - response = handleTerminalAuth(message, clientAddress); + response = handleTerminalAuth(message, clientAddress, clientSocket); break; case MSG_HEARTBEAT: response = handleHeartbeat(message, clientAddress); @@ -582,25 +591,44 @@ public class JTT808MessageProcessor { /** * 处理终端鉴权 */ - private byte[] handleTerminalAuth(JTT808Message message, String clientAddress) { + private byte[] handleTerminalAuth(JTT808Message message, String clientAddress, Socket clientSocket) { logger.info("处理终端鉴权: 手机号={}", message.phoneNumber); - + if (message.body != null) { String authCode = new String(message.body).trim(); logger.info("鉴权码: {}", authCode); - - // TODO: 验证鉴权码 + + // 验证鉴权码 boolean authValid = validateAuthCode(message.phoneNumber, authCode); - + if (authValid) { logger.info("终端 {} 鉴权成功", message.phoneNumber); + + // ✅ 鉴权成功后,将 Socket 连接注册到连接管理器 + if (clientSocket != null) { + boolean registered = socketConnectionManager.addConnection(message.phoneNumber, clientSocket); + if (registered) { + logger.info("✅ 终端 {} 连接已注册到连接管理器", message.phoneNumber); + + // 更新数据库中的在线状态 + try { + jtt808DataService.updateTerminalOnlineStatus(message.phoneNumber, true, clientAddress); + logger.info("✅ 终端 {} 在线状态已更新到数据库", message.phoneNumber); + } catch (Exception e) { + logger.error("更新终端在线状态异常: {}", e.getMessage()); + } + } else { + logger.warn("⚠️ 终端 {} 连接注册失败", message.phoneNumber); + } + } + return buildGeneralResponse(message, RESULT_SUCCESS); } else { logger.warn("终端 {} 鉴权失败", message.phoneNumber); return buildGeneralResponse(message, RESULT_FAILURE); } } - + return buildGeneralResponse(message, RESULT_ERROR_MESSAGE); }