package publicway; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import publicway.ProtocolParser01.ParseResult; public class SerialProtocolParser { // 协议常量定义 private static final byte[] START_MARKER = {(byte) 0xDD, (byte) 0xCC}; private static final int MIN_PACKET_LENGTH = 9; // 最小包长度: 起始标记2 + 数据长度2 + 主机地址1 + 卡槽地址1 + 功能码1 + CRC2 // 功能码定义 private static final byte FUNCTION_01 = 0x01; // 查询数据 private static final byte FUNCTION_51 = 0x51; // 开门控制 private static final byte FUNCTION_52 = 0x52; // LED亮度控制 private static final byte FUNCTION_80 = (byte) 0x80; // 工卡升级使能 private static final byte FUNCTION_81 = (byte) 0x81; // 工作卡升级数据包 private static final byte FUNCTION_82 = (byte) 0x82; // 单板升级使能 private static final byte FUNCTION_83 = (byte) 0x83; // 单板升级数据包 // 数据缓冲区,用于处理粘包 private byte[] dataBuffer = new byte[1024]; private int bufferPosition = 0; // 数据接收队列 private BlockingQueue dataQueue = new ArrayBlockingQueue<>(100); // 批量处理队列 private final BlockingQueue batchQueue = new ArrayBlockingQueue<>(1000); private final ScheduledExecutorService batchExecutor = Executors.newSingleThreadScheduledExecutor(); // 线程控制 private volatile boolean isRunning = false; private Thread processorThread; // 重用StringBuilder减少对象创建 private final StringBuilder hexBuilder = new StringBuilder(256); /** * 启动解析器 */ public void start() { if (isRunning) { System.out.println("串口协议解析器已经在运行中"); return; } isRunning = true; // 启动批量处理(每50ms处理一次) batchExecutor.scheduleAtFixedRate(this::batchProcess, 50, 50, TimeUnit.MILLISECONDS); processorThread = new Thread(this::processPackets, "Serial-Protocol-Parser"); processorThread.setDaemon(true); processorThread.start(); System.out.println("串口协议解析器已启动"); } /** * 停止解析器 */ public void stop() { if (!isRunning) { return; } isRunning = false; // 关闭批量处理器 batchExecutor.shutdown(); try { if (!batchExecutor.awaitTermination(1, TimeUnit.SECONDS)) { batchExecutor.shutdownNow(); } } catch (InterruptedException e) { batchExecutor.shutdownNow(); Thread.currentThread().interrupt(); } if (processorThread != null) { processorThread.interrupt(); try { processorThread.join(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } processorThread = null; } // 清空队列和缓冲区 dataQueue.clear(); batchQueue.clear(); bufferPosition = 0; System.out.println("串口协议解析器已停止"); } /** * 检查解析器是否在运行 */ public boolean isRunning() { return isRunning; } /** * 接收串口原始数据 - 使用批量处理 */ public void receiveData(byte[] rawData) { if (!isRunning) { System.out.println("警告: 串口协议解析器未启动,忽略接收的数据"); return; } if (rawData == null || rawData.length == 0) { return; } // 将数据添加到批量队列 if (!batchQueue.offer(rawData)) { System.err.println("批量队列已满,丢弃数据"); } } /** * 批量处理数据 */ private void batchProcess() { if (batchQueue.isEmpty()) { return; } // 批量处理数据 java.util.List batch = new java.util.ArrayList<>(100); batchQueue.drainTo(batch, 100); for (byte[] rawData : batch) { // 将数据添加到缓冲区 if (bufferPosition + rawData.length > dataBuffer.length) { // 缓冲区不足时,清理并重新开始 System.arraycopy(dataBuffer, bufferPosition - rawData.length, dataBuffer, 0, rawData.length); bufferPosition = rawData.length; } else { System.arraycopy(rawData, 0, dataBuffer, bufferPosition, rawData.length); bufferPosition += rawData.length; } // 处理缓冲区中的数据 processBuffer(); } // 定期检查内存 checkMemory(); } /** * 内存监控 */ private void checkMemory() { Runtime runtime = Runtime.getRuntime(); long usedMem = runtime.totalMemory() - runtime.freeMemory(); long maxMem = runtime.maxMemory(); if (usedMem > maxMem * 0.8) { System.out.println("内存使用率超过80%,当前使用: " + (usedMem / 1024 / 1024) + "MB"); } } /** * 处理缓冲区中的数据,解析完整数据包 */ private void processBuffer() { while (bufferPosition >= MIN_PACKET_LENGTH) { // 查找起始标记 int startIndex = findStartMarker(); if (startIndex == -1) { // 没有找到起始标记,清空无效数据 bufferPosition = 0; return; } // 检查是否有足够的数据读取数据长度 if (startIndex + 4 > bufferPosition) { // 数据不足,等待更多数据 compactBuffer(startIndex); return; } // 读取数据长度 (大端序) int dataLength = ((dataBuffer[startIndex + 2] & 0xFF) << 8) | (dataBuffer[startIndex + 3] & 0xFF); int totalPacketLength = 2 + 2 + dataLength + 2; // 起始标记2 + 数据长度2 + 数据内容 + CRC2 // 检查是否收到完整数据包 if (startIndex + totalPacketLength > bufferPosition) { // 数据包不完整,等待更多数据 compactBuffer(startIndex); return; } // 提取完整数据包 byte[] packet = new byte[totalPacketLength]; System.arraycopy(dataBuffer, startIndex, packet, 0, totalPacketLength); // 将数据包放入队列供解析 try { if (!dataQueue.offer(packet)) { System.err.println("数据队列已满,丢弃数据包"); } } catch (Exception e) { System.err.println("放入数据队列时发生异常: " + e.getMessage()); } // 移动缓冲区位置 int remaining = bufferPosition - (startIndex + totalPacketLength); if (remaining > 0) { System.arraycopy(dataBuffer, startIndex + totalPacketLength, dataBuffer, 0, remaining); } bufferPosition = remaining; } } /** * 查找起始标记位置 */ private int findStartMarker() { for (int i = 0; i <= bufferPosition - START_MARKER.length; i++) { if (dataBuffer[i] == START_MARKER[0] && dataBuffer[i + 1] == START_MARKER[1]) { return i; } } return -1; } /** * 压缩缓冲区,将有效数据移到开头 */ private void compactBuffer(int startIndex) { if (startIndex > 0) { System.arraycopy(dataBuffer, startIndex, dataBuffer, 0, bufferPosition - startIndex); bufferPosition -= startIndex; } } /** * 处理数据包的主方法 */ private void processPackets() { System.out.println("串口数据包处理线程开始运行"); while (isRunning && !Thread.currentThread().isInterrupted()) { try { byte[] packet = dataQueue.take(); // 阻塞直到有数据 parsePacket(packet); } catch (InterruptedException e) { System.out.println("串口数据包处理线程被中断"); Thread.currentThread().interrupt(); break; } catch (Exception e) { System.err.println("处理数据包时发生异常: " + e.getMessage()); e.printStackTrace(); } } System.out.println("串口数据包处理线程结束运行"); } /** * 解析数据包并根据功能码调用相应方法 */ private void parsePacket(byte[] packet) { try { // 解析基本字段 byte hostAddress = packet[4]; // 主机地址 byte slotAddress = packet[5]; // 卡槽地址 byte functionCode = packet[6]; // 功能码 // 数据长度 (从协议中读取) int dataLength = ((packet[2] & 0xFF) << 8) | (packet[3] & 0xFF); // 返回值数据 int returnValueLength = dataLength - 3; // N-3 (减去主机地址、卡槽地址、功能码) byte[] returnValue = null; if (returnValueLength > 0) { returnValue = new byte[returnValueLength]; System.arraycopy(packet, 7, returnValue, 0, returnValueLength); } // 根据功能码调用不同的解析方法 switch (functionCode) { case FUNCTION_01: if (returnValue != null) { // 使用优化的字节数组解析方法,避免字符串转换 ParseResult rst = ProtocolParser01.parseDDCC01Data(packet); rst.fuzhi(); rst.toString(); } break; case FUNCTION_51: // 调用 ProtocolParser51 处理数据 String hexPacket = bytesToHex(packet); int result = ProtocolParser51.parse(hexPacket); if (result == 1) { System.out.println("功能码 0x51 - 开门控制成功"); } else { System.out.println("功能码 0x51 - 开门控制失败或报文不合法"); } break; case FUNCTION_52: System.out.println("功能码 0x52 - LED亮度控制"); break; case FUNCTION_80: System.out.println("功能码 0x80 - 工卡升级使能"); break; case FUNCTION_81: System.out.println("功能码 0x81 - 工作卡升级数据包"); break; case FUNCTION_82: System.out.println("功能码 0x82 - 单板升级使能"); break; case FUNCTION_83: System.out.println("功能码 0x83 - 单板升级数据包"); break; default: System.err.println("未知功能码: 0x" + Integer.toHexString(functionCode & 0xFF)); break; } } catch (Exception e) { System.err.println("解析数据包时发生错误: " + e.getMessage()); e.printStackTrace(); } } /** * 优化的字节数组转十六进制字符串方法 */ private String bytesToHex(byte[] bytes) { hexBuilder.setLength(0); // 清空重用 for (byte b : bytes) { hexBuilder.append(String.format("%02X", b)); } return hexBuilder.toString(); } public static byte[] hexStringToBytes(String hexString) { if (hexString == null || hexString.trim().isEmpty()) { return new byte[0]; } // 移除所有空格 String cleanedHex = hexString.replaceAll("\\s", ""); // 检查长度是否为偶数 if (cleanedHex.length() % 2 != 0) { throw new IllegalArgumentException("HEX字符串长度必须为偶数: " + cleanedHex); } byte[] result = new byte[cleanedHex.length() / 2]; for (int i = 0; i < cleanedHex.length(); i += 2) { String byteStr = cleanedHex.substring(i, i + 2); try { result[i / 2] = (byte) Integer.parseInt(byteStr, 16); } catch (NumberFormatException e) { throw new IllegalArgumentException("无效的HEX字符: " + byteStr, e); } } return result; } /** * 获取解析器状态信息 */ public String getStatusInfo() { return String.format("串口解析器状态: %s, 队列大小: %d/%d, 批量队列: %d/%d", isRunning ? "运行中" : "已停止", dataQueue.size(), dataQueue.remainingCapacity() + dataQueue.size(), batchQueue.size(), batchQueue.remainingCapacity() + batchQueue.size()); } /** * 设置原始数据最大打印长度 */ public void setMaxRawDataPrintLength(int length) { // 实现根据需要调整 } }