| | |
| | | package publicway; |
| | | |
| | | import java.util.concurrent.ArrayBlockingQueue; |
| | | import java.util.concurrent.BlockingQueue; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.Executors; |
| | | import java.util.concurrent.ScheduledExecutorService; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.Collections; |
| | | import java.util.Set; |
| | | |
| | | import chuankou.SerialPortService; |
| | | import chushihua.SlotManager; |
| | | import chushihua.lunxun; |
| | | import dialog.Charulog; |
| | | import dialog.Dingshidialog; |
| | | import publicway.ProtocolParser01.ParseResult; |
| | | import xitongshezhi.SystemDebugDialog; |
| | | |
| | | public class SerialProtocolParser { |
| | | |
| | |
| | | private static final byte FUNCTION_82 = (byte) 0x82; // 单板升级使能 |
| | | private static final byte FUNCTION_83 = (byte) 0x83; // 单板升级数据包 |
| | | |
| | | // 数据缓冲区,用于处理粘包 |
| | | // 数据缓冲区,用于处理粘包 - 改为直接ByteArrayOutputStream管理 |
| | | private byte[] dataBuffer = new byte[1024]; |
| | | private int bufferPosition = 0; |
| | | |
| | | // 数据接收队列 |
| | | private BlockingQueue<byte[]> dataQueue = new ArrayBlockingQueue<>(100); |
| | | // 数据接收队列 - 限制队列大小防止内存无限增长 |
| | | private BlockingQueue<byte[]> dataQueue = new ArrayBlockingQueue<>(50); |
| | | |
| | | // 批量处理队列 |
| | | private final BlockingQueue<byte[]> batchQueue = new ArrayBlockingQueue<>(1000); |
| | | // 批量处理队列 - 限制队列大小 |
| | | private final BlockingQueue<byte[]> batchQueue = new ArrayBlockingQueue<>(200); |
| | | private final ScheduledExecutorService batchExecutor = |
| | | Executors.newSingleThreadScheduledExecutor(); |
| | | |
| | |
| | | // 重用StringBuilder减少对象创建 |
| | | private final StringBuilder hexBuilder = new StringBuilder(256); |
| | | |
| | | // 内存监控计数器 |
| | | private long lastMemoryCheckTime = 0; |
| | | private static final long MEMORY_CHECK_INTERVAL = 30000; // 30秒检查一次 |
| | | |
| | | // 对象池,减少对象创建 |
| | | private final Object packetPoolLock = new Object(); |
| | | |
| | | // ==================== 新增:数据包去重缓存 ==================== |
| | | private final Set<String> processedPacketCache = Collections.newSetFromMap( |
| | | new ConcurrentHashMap<String, Boolean>()); |
| | | private static final int MAX_CACHE_SIZE = 1000; |
| | | private static final long CACHE_CLEANUP_INTERVAL = 60000; // 1分钟清理一次缓存 |
| | | private long lastCacheCleanupTime = 0; |
| | | |
| | | /** |
| | | * 启动解析器 |
| | | */ |
| | | public void start() { |
| | | if (isRunning) { |
| | | System.out.println("串口协议解析器已经在运行中"); |
| | | return; |
| | | } |
| | | |
| | | isRunning = true; |
| | | |
| | | // 启动批量处理(每50ms处理一次) |
| | | batchExecutor.scheduleAtFixedRate(this::batchProcess, 50, 50, TimeUnit.MILLISECONDS); |
| | | |
| | | // 启动批量处理(每10ms处理一次) |
| | | batchExecutor.scheduleAtFixedRate(this::batchProcess, 10, 10, TimeUnit.MILLISECONDS); |
| | | processorThread = new Thread(this::processPackets, "Serial-Protocol-Parser"); |
| | | processorThread.setDaemon(true); |
| | | processorThread.start(); |
| | | |
| | | System.out.println("串口协议解析器已启动"); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | // 清空队列和缓冲区 |
| | | dataQueue.clear(); |
| | | batchQueue.clear(); |
| | | clearQueues(); |
| | | bufferPosition = 0; |
| | | |
| | | System.out.println("串口协议解析器已停止"); |
| | | // 清空去重缓存 |
| | | processedPacketCache.clear(); |
| | | } |
| | | |
| | | /** |
| | | * 清空所有队列,释放内存 |
| | | */ |
| | | private void clearQueues() { |
| | | dataQueue.clear(); |
| | | batchQueue.clear(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void receiveData(byte[] rawData) { |
| | | if (!isRunning) { |
| | | System.out.println("警告: 串口协议解析器未启动,忽略接收的数据"); |
| | | return; |
| | | // 如果解析器未运行,自动启动 |
| | | start(); |
| | | } |
| | | |
| | | if (rawData == null || rawData.length == 0) { |
| | | return; |
| | | } |
| | | |
| | | // 检查队列状态,避免内存无限增长 |
| | | if (batchQueue.size() > batchQueue.remainingCapacity() * 0.8) { |
| | | // 队列接近满时,丢弃最老的数据 |
| | | if (!batchQueue.isEmpty()) { |
| | | batchQueue.poll(); // 移除一个旧数据 |
| | | } |
| | | } |
| | | |
| | | // 将数据添加到批量队列 |
| | | if (!batchQueue.offer(rawData)) { |
| | | System.err.println("批量队列已满,丢弃数据"); |
| | | // 队列已满时的处理 |
| | | handleQueueFull(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 处理队列满的情况 |
| | | */ |
| | | private void handleQueueFull() { |
| | | // 清空队列重新开始,避免内存泄漏 |
| | | clearQueues(); |
| | | bufferPosition = 0; // 重置缓冲区 |
| | | |
| | | if (lunxun.DEBUG_ENABLED) { |
| | | SystemDebugDialog.appendAsciiData("警告:数据处理队列已满,已清空队列重新开始"); |
| | | } |
| | | } |
| | | |
| | |
| | | return; |
| | | } |
| | | |
| | | // 批量处理数据 |
| | | java.util.List<byte[]> batch = new java.util.ArrayList<>(100); |
| | | batchQueue.drainTo(batch, 100); |
| | | |
| | | for (byte[] rawData : batch) { |
| | | // 将数据添加到缓冲区 |
| | | if (bufferPosition + rawData.length > dataBuffer.length) { |
| | | // 缓冲区不足时,清理并重新开始 |
| | | System.arraycopy(dataBuffer, bufferPosition - rawData.length, dataBuffer, 0, rawData.length); |
| | | bufferPosition = rawData.length; |
| | | } else { |
| | | try { |
| | | // 批量处理数据,限制每次处理的最大数量 |
| | | int maxBatchSize = 50; |
| | | java.util.List<byte[]> batch = new java.util.ArrayList<>(maxBatchSize); |
| | | batchQueue.drainTo(batch, maxBatchSize); |
| | | |
| | | for (byte[] rawData : batch) { |
| | | // 检查缓冲区容量,避免溢出 |
| | | if (bufferPosition + rawData.length > dataBuffer.length) { |
| | | // 缓冲区不足时的处理 |
| | | if (rawData.length > dataBuffer.length) { |
| | | // 单条数据超过缓冲区大小,直接处理或丢弃 |
| | | handleOversizedPacket(rawData); |
| | | continue; |
| | | } else { |
| | | // 移动有效数据到缓冲区开头 |
| | | compactBuffer(0); |
| | | } |
| | | } |
| | | |
| | | // 将数据添加到缓冲区 |
| | | System.arraycopy(rawData, 0, dataBuffer, bufferPosition, rawData.length); |
| | | bufferPosition += rawData.length; |
| | | |
| | | // 处理缓冲区中的数据 |
| | | processBuffer(); |
| | | } |
| | | |
| | | // 处理缓冲区中的数据 |
| | | processBuffer(); |
| | | // 定期检查内存 |
| | | checkMemory(); |
| | | |
| | | // 定期清理去重缓存 |
| | | cleanupPacketCache(); |
| | | |
| | | } catch (Exception e) { |
| | | System.err.println("批量处理数据时发生异常: " + e.getMessage()); |
| | | // 发生异常时重置状态 |
| | | resetParserState(); |
| | | } |
| | | |
| | | // 定期检查内存 |
| | | checkMemory(); |
| | | } |
| | | |
| | | /** |
| | | * 处理过大的数据包 |
| | | */ |
| | | private void handleOversizedPacket(byte[] oversizedData) { |
| | | // 对于过大的数据包,直接尝试查找起始标记 |
| | | int startIndex = findStartMarkerInArray(oversizedData, 0); |
| | | if (startIndex != -1) { |
| | | // 找到起始标记,将剩余数据放入缓冲区 |
| | | int remainingLength = oversizedData.length - startIndex; |
| | | if (remainingLength <= dataBuffer.length) { |
| | | System.arraycopy(oversizedData, startIndex, dataBuffer, 0, remainingLength); |
| | | bufferPosition = remainingLength; |
| | | processBuffer(); |
| | | } |
| | | } |
| | | // 否则丢弃该数据包 |
| | | } |
| | | |
| | | /** |
| | | * 在指定数组中查找起始标记 |
| | | */ |
| | | private int findStartMarkerInArray(byte[] data, int startPos) { |
| | | for (int i = startPos; i <= data.length - START_MARKER.length; i++) { |
| | | if (data[i] == START_MARKER[0] && data[i + 1] == START_MARKER[1]) { |
| | | return i; |
| | | } |
| | | } |
| | | return -1; |
| | | } |
| | | |
| | | /** |
| | | * 重置解析器状态 |
| | | */ |
| | | private void resetParserState() { |
| | | bufferPosition = 0; |
| | | clearQueues(); |
| | | } |
| | | |
| | | /** |
| | | * 内存监控 |
| | | */ |
| | | private void checkMemory() { |
| | | long currentTime = System.currentTimeMillis(); |
| | | if (currentTime - lastMemoryCheckTime < MEMORY_CHECK_INTERVAL) { |
| | | return; |
| | | } |
| | | |
| | | lastMemoryCheckTime = currentTime; |
| | | |
| | | Runtime runtime = Runtime.getRuntime(); |
| | | long usedMem = runtime.totalMemory() - runtime.freeMemory(); |
| | | long maxMem = runtime.maxMemory(); |
| | | |
| | | if (usedMem > maxMem * 0.8) { |
| | | System.out.println("内存使用率超过80%,当前使用: " + (usedMem / 1024 / 1024) + "MB"); |
| | | if (usedMem > maxMem * 0.75) { |
| | | // 内存使用率超过75%时进行垃圾回收 |
| | | System.gc(); |
| | | |
| | | if (lunxun.DEBUG_ENABLED) { |
| | | SystemDebugDialog.appendAsciiData("内存使用率超过75%,已触发垃圾回收。使用内存: " + |
| | | (usedMem / 1024 / 1024) + "MB"); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 清理数据包去重缓存 |
| | | */ |
| | | private void cleanupPacketCache() { |
| | | long currentTime = System.currentTimeMillis(); |
| | | if (currentTime - lastCacheCleanupTime < CACHE_CLEANUP_INTERVAL) { |
| | | return; |
| | | } |
| | | |
| | | lastCacheCleanupTime = currentTime; |
| | | |
| | | // 如果缓存大小超过限制,清空缓存 |
| | | if (processedPacketCache.size() >= MAX_CACHE_SIZE) { |
| | | processedPacketCache.clear(); |
| | | if (lunxun.DEBUG_ENABLED) { |
| | | SystemDebugDialog.appendAsciiData("数据包去重缓存已清空,当前大小: " + processedPacketCache.size()); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | * 处理缓冲区中的数据,解析完整数据包 |
| | | */ |
| | | private void processBuffer() { |
| | | while (bufferPosition >= MIN_PACKET_LENGTH) { |
| | | int processedCount = 0; |
| | | final int MAX_PACKETS_PER_BATCH = 20; // 限制每次处理的最大包数量 |
| | | |
| | | while (bufferPosition >= MIN_PACKET_LENGTH && processedCount < MAX_PACKETS_PER_BATCH) { |
| | | // 查找起始标记 |
| | | int startIndex = findStartMarker(); |
| | | if (startIndex == -1) { |
| | |
| | | int dataLength = ((dataBuffer[startIndex + 2] & 0xFF) << 8) | (dataBuffer[startIndex + 3] & 0xFF); |
| | | int totalPacketLength = 2 + 2 + dataLength + 2; // 起始标记2 + 数据长度2 + 数据内容 + CRC2 |
| | | |
| | | // 检查数据长度是否合理 |
| | | if (dataLength < 0 || totalPacketLength > dataBuffer.length) { |
| | | // 数据长度异常,跳过这个包 |
| | | compactBuffer(startIndex + 1); |
| | | continue; |
| | | } |
| | | |
| | | // 检查是否收到完整数据包 |
| | | if (startIndex + totalPacketLength > bufferPosition) { |
| | | // 数据包不完整,等待更多数据 |
| | |
| | | } |
| | | |
| | | // 提取完整数据包 |
| | | byte[] packet = new byte[totalPacketLength]; |
| | | System.arraycopy(dataBuffer, startIndex, packet, 0, totalPacketLength); |
| | | |
| | | // 将数据包放入队列供解析 |
| | | try { |
| | | if (!dataQueue.offer(packet)) { |
| | | System.err.println("数据队列已满,丢弃数据包"); |
| | | byte[] packet = extractPacket(startIndex, totalPacketLength); |
| | | if (packet != null) { |
| | | // ==================== 新增:数据包去重检查 ==================== |
| | | String packetHash = generatePacketHash(packet); |
| | | if (processedPacketCache.contains(packetHash)) { |
| | | // 跳过重复数据包 |
| | | if (lunxun.DEBUG_ENABLED) { |
| | | SystemDebugDialog.appendAsciiData("跳过重复数据包: " + bytesToHex(packet)); |
| | | } |
| | | |
| | | // 移动缓冲区位置,继续处理下一个包 |
| | | int remaining = bufferPosition - (startIndex + totalPacketLength); |
| | | if (remaining > 0) { |
| | | System.arraycopy(dataBuffer, startIndex + totalPacketLength, dataBuffer, 0, remaining); |
| | | } |
| | | bufferPosition = remaining; |
| | | processedCount++; |
| | | continue; |
| | | } |
| | | } catch (Exception e) { |
| | | System.err.println("放入数据队列时发生异常: " + e.getMessage()); |
| | | |
| | | // 添加到去重缓存 |
| | | processedPacketCache.add(packetHash); |
| | | |
| | | // 将数据包放入队列供解析 |
| | | if (!dataQueue.offer(packet)) { |
| | | // 队列已满,释放packet引用 |
| | | packet = null; |
| | | handleDataQueueFull(); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // 移动缓冲区位置 |
| | |
| | | System.arraycopy(dataBuffer, startIndex + totalPacketLength, dataBuffer, 0, remaining); |
| | | } |
| | | bufferPosition = remaining; |
| | | |
| | | processedCount++; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 生成数据包哈希值用于去重 |
| | | * 基于关键字段:主机地址 + 卡槽地址 + 功能码 + 数据内容 |
| | | */ |
| | | private String generatePacketHash(byte[] packet) { |
| | | if (packet == null || packet.length < 7) { |
| | | return ""; |
| | | } |
| | | |
| | | StringBuilder key = new StringBuilder(64); |
| | | |
| | | // 添加协议头信息 |
| | | key.append(packet[4] & 0xFF).append(":"); // 主机地址 (位置4) |
| | | key.append(packet[5] & 0xFF).append(":"); // 卡槽地址 (位置5) |
| | | key.append(packet[6] & 0xFF).append(":"); // 功能码 (位置6) |
| | | |
| | | // 添加数据内容哈希(排除包头和CRC) |
| | | int dataStart = 7; // 数据开始位置 |
| | | int dataEnd = packet.length - 2; // 排除CRC字节 |
| | | |
| | | // 取数据内容的前20个字节作为哈希依据 |
| | | int hashLength = Math.min(dataEnd - dataStart, 20); |
| | | for (int i = dataStart; i < dataStart + hashLength && i < dataEnd; i++) { |
| | | key.append(String.format("%02X", packet[i])); |
| | | } |
| | | |
| | | return key.toString(); |
| | | } |
| | | |
| | | /** |
| | | * 提取数据包,重用byte数组减少对象创建 |
| | | */ |
| | | private byte[] extractPacket(int startIndex, int totalPacketLength) { |
| | | byte[] packet = new byte[totalPacketLength]; |
| | | System.arraycopy(dataBuffer, startIndex, packet, 0, totalPacketLength); |
| | | return packet; |
| | | } |
| | | |
| | | /** |
| | | * 处理数据队列满的情况 |
| | | */ |
| | | private void handleDataQueueFull() { |
| | | // 丢弃队列中最老的数据 |
| | | dataQueue.poll(); |
| | | |
| | | if (lunxun.DEBUG_ENABLED) { |
| | | SystemDebugDialog.appendAsciiData("数据解析队列已满,丢弃最老数据包"); |
| | | } |
| | | } |
| | | |
| | |
| | | * 压缩缓冲区,将有效数据移到开头 |
| | | */ |
| | | private void compactBuffer(int startIndex) { |
| | | if (startIndex > 0) { |
| | | if (startIndex > 0 && bufferPosition > startIndex) { |
| | | System.arraycopy(dataBuffer, startIndex, dataBuffer, 0, bufferPosition - startIndex); |
| | | bufferPosition -= startIndex; |
| | | } |
| | |
| | | break; |
| | | } catch (Exception e) { |
| | | System.err.println("处理数据包时发生异常: " + e.getMessage()); |
| | | e.printStackTrace(); |
| | | // 继续运行,不退出线程 |
| | | } |
| | | } |
| | | |
| | |
| | | * 解析数据包并根据功能码调用相应方法 |
| | | */ |
| | | private void parsePacket(byte[] packet) { |
| | | System.out.println("开始解析收到的数据"+bytesToHex(packet)+"时间"+TimestampUtil.getTimestamp()); |
| | | if (packet == null || packet.length < MIN_PACKET_LENGTH) { |
| | | return; |
| | | } |
| | | |
| | | try { |
| | | SerialPortService.getReceivedDataCount(); |
| | | SerialPortService.getReceivedDataCount(); |
| | | // 解析基本字段 |
| | | byte hostAddress = packet[4]; // 主机地址 |
| | | byte slotAddress = packet[5]; // 卡槽地址 |
| | | byte functionCode = packet[6]; // 功能码 |
| | | byte functionCode = packet[6]; // 功能码 |
| | | |
| | | // 数据长度 (从协议中读取) |
| | | int dataLength = ((packet[2] & 0xFF) << 8) | (packet[3] & 0xFF); |
| | |
| | | // 返回值数据 |
| | | int returnValueLength = dataLength - 3; // N-3 (减去主机地址、卡槽地址、功能码) |
| | | byte[] returnValue = null; |
| | | if (returnValueLength > 0) { |
| | | if (returnValueLength > 0 && returnValueLength <= packet.length - 7) { |
| | | returnValue = new byte[returnValueLength]; |
| | | System.arraycopy(packet, 7, returnValue, 0, returnValueLength); |
| | | } |
| | |
| | | if (returnValue != null) { |
| | | // 使用优化的字节数组解析方法,避免字符串转换 |
| | | ParseResult rst = ProtocolParser01.parseDDCC01Data(packet); |
| | | rst.fuzhi(); |
| | | // rst.toString(); |
| | | if (rst != null) { |
| | | rst.fuzhi(); |
| | | if (lunxun.DEBUG_ENABLED) { |
| | | SystemDebugDialog.appendAsciiData(rst.toString()); |
| | | } |
| | | } |
| | | } |
| | | break; |
| | | case FUNCTION_51: |
| | | // 调用 ProtocolParser51 处理数据 |
| | | String hexPacket = bytesToHex(packet); |
| | | int result = ProtocolParser51.parse(hexPacket); |
| | | int slot = slotAddress; |
| | | if (result == 1) { |
| | | System.out.println("功能码 0x51 - 开门控制成功"); |
| | | SlotManager.changgehaska(slot, result); |
| | | } else { |
| | | System.out.println("功能码 0x51 - 开门控制失败或报文不合法"); |
| | | String message = slot + "号卡槽取卡失败"; |
| | | Charulog.logOperation(message); |
| | | } |
| | | break; |
| | | case FUNCTION_52: |
| | | System.out.println("功能码 0x52 - LED亮度控制"); |
| | | // LED亮度控制 - 无操作 |
| | | break; |
| | | case FUNCTION_80: |
| | | System.out.println("功能码 0x80 - 工卡升级使能"); |
| | | // 工卡升级使能 - 无操作 |
| | | break; |
| | | case FUNCTION_81: |
| | | System.out.println("功能码 0x81 - 工作卡升级数据包"); |
| | | // 工作卡升级数据包 - 无操作 |
| | | break; |
| | | case FUNCTION_82: |
| | | System.out.println("功能码 0x82 - 单板升级使能"); |
| | | // 单板升级使能 - 无操作 |
| | | break; |
| | | case FUNCTION_83: |
| | | System.out.println("功能码 0x83 - 单板升级数据包"); |
| | | // 单板升级数据包 - 无操作 |
| | | break; |
| | | default: |
| | | System.err.println("未知功能码: 0x" + Integer.toHexString(functionCode & 0xFF)); |
| | | if (lunxun.DEBUG_ENABLED) { |
| | | System.err.println("未知功能码: 0x" + Integer.toHexString(functionCode & 0xFF)); |
| | | } |
| | | break; |
| | | } |
| | | |
| | | } catch (Exception e) { |
| | | System.err.println("解析数据包时发生错误: " + e.getMessage()); |
| | | e.printStackTrace(); |
| | | // 不打印堆栈跟踪,避免产生大量日志对象 |
| | | } |
| | | } |
| | | |
| | |
| | | * 优化的字节数组转十六进制字符串方法 |
| | | */ |
| | | private String bytesToHex(byte[] bytes) { |
| | | if (bytes == null || bytes.length == 0) { |
| | | return ""; |
| | | } |
| | | |
| | | hexBuilder.setLength(0); // 清空重用 |
| | | for (byte b : bytes) { |
| | | hexBuilder.append(String.format("%02X", b)); |
| | |
| | | * 获取解析器状态信息 |
| | | */ |
| | | public String getStatusInfo() { |
| | | return String.format("串口解析器状态: %s, 队列大小: %d/%d, 批量队列: %d/%d", |
| | | return String.format("串口解析器状态: %s, 队列大小: %d/%d, 批量队列: %d/%d, 缓冲区: %d/%d, 去重缓存: %d/%d", |
| | | isRunning ? "运行中" : "已停止", |
| | | dataQueue.size(), |
| | | dataQueue.remainingCapacity() + dataQueue.size(), |
| | | batchQueue.size(), |
| | | batchQueue.remainingCapacity() + batchQueue.size()); |
| | | batchQueue.remainingCapacity() + batchQueue.size(), |
| | | bufferPosition, |
| | | dataBuffer.length, |
| | | processedPacketCache.size(), |
| | | MAX_CACHE_SIZE); |
| | | } |
| | | |
| | | /** |
| | |
| | | public void setMaxRawDataPrintLength(int length) { |
| | | // 实现根据需要调整 |
| | | } |
| | | |
| | | /** |
| | | * 主动清理资源 |
| | | */ |
| | | public void cleanup() { |
| | | stop(); |
| | | clearQueues(); |
| | | bufferPosition = 0; |
| | | hexBuilder.setLength(0); |
| | | processedPacketCache.clear(); |
| | | } |
| | | |
| | | /** |
| | | * 手动清空数据包去重缓存(用于调试或特殊情况) |
| | | */ |
| | | public void clearPacketCache() { |
| | | processedPacketCache.clear(); |
| | | if (lunxun.DEBUG_ENABLED) { |
| | | SystemDebugDialog.appendAsciiData("数据包去重缓存已手动清空"); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 获取去重缓存大小(用于监控) |
| | | */ |
| | | public int getPacketCacheSize() { |
| | | return processedPacketCache.size(); |
| | | } |
| | | } |