| | |
| | | |
| | | import java.util.concurrent.ArrayBlockingQueue; |
| | | import java.util.concurrent.BlockingQueue; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.Executors; |
| | | import java.util.concurrent.ScheduledExecutorService; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.Collections; |
| | | import java.util.Set; |
| | | |
| | | import chuankou.SerialPortService; |
| | | import chushihua.SlotManager; |
| | |
| | | // 对象池,减少对象创建 |
| | | private final Object packetPoolLock = new Object(); |
| | | |
| | | // ==================== 新增:数据包去重缓存 ==================== |
| | | private final Set<String> processedPacketCache = Collections.newSetFromMap( |
| | | new ConcurrentHashMap<String, Boolean>()); |
| | | private static final int MAX_CACHE_SIZE = 1000; |
| | | private static final long CACHE_CLEANUP_INTERVAL = 60000; // 1分钟清理一次缓存 |
| | | private long lastCacheCleanupTime = 0; |
| | | |
| | | /** |
| | | * 启动解析器 |
| | | */ |
| | |
| | | |
| | | isRunning = true; |
| | | |
| | | // 启动批量处理(每50ms处理一次) |
| | | batchExecutor.scheduleAtFixedRate(this::batchProcess, 50, 50, TimeUnit.MILLISECONDS); |
| | | |
| | | // 启动批量处理(每10ms处理一次) |
| | | batchExecutor.scheduleAtFixedRate(this::batchProcess, 10, 10, TimeUnit.MILLISECONDS); |
| | | processorThread = new Thread(this::processPackets, "Serial-Protocol-Parser"); |
| | | processorThread.setDaemon(true); |
| | | processorThread.start(); |
| | |
| | | // 清空队列和缓冲区 |
| | | clearQueues(); |
| | | bufferPosition = 0; |
| | | |
| | | // 清空去重缓存 |
| | | processedPacketCache.clear(); |
| | | } |
| | | |
| | | /** |
| | |
| | | // 定期检查内存 |
| | | checkMemory(); |
| | | |
| | | // 定期清理去重缓存 |
| | | cleanupPacketCache(); |
| | | |
| | | } catch (Exception e) { |
| | | System.err.println("批量处理数据时发生异常: " + e.getMessage()); |
| | | // 发生异常时重置状态 |
| | |
| | | } |
| | | |
| | | /** |
| | | * 清理数据包去重缓存 |
| | | */ |
| | | private void cleanupPacketCache() { |
| | | long currentTime = System.currentTimeMillis(); |
| | | if (currentTime - lastCacheCleanupTime < CACHE_CLEANUP_INTERVAL) { |
| | | return; |
| | | } |
| | | |
| | | lastCacheCleanupTime = currentTime; |
| | | |
| | | // 如果缓存大小超过限制,清空缓存 |
| | | if (processedPacketCache.size() >= MAX_CACHE_SIZE) { |
| | | processedPacketCache.clear(); |
| | | if (lunxun.DEBUG_ENABLED) { |
| | | SystemDebugDialog.appendAsciiData("数据包去重缓存已清空,当前大小: " + processedPacketCache.size()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 处理缓冲区中的数据,解析完整数据包 |
| | | */ |
| | | private void processBuffer() { |
| | |
| | | // 提取完整数据包 |
| | | byte[] packet = extractPacket(startIndex, totalPacketLength); |
| | | if (packet != null) { |
| | | // ==================== 新增:数据包去重检查 ==================== |
| | | String packetHash = generatePacketHash(packet); |
| | | if (processedPacketCache.contains(packetHash)) { |
| | | // 跳过重复数据包 |
| | | if (lunxun.DEBUG_ENABLED) { |
| | | SystemDebugDialog.appendAsciiData("跳过重复数据包: " + bytesToHex(packet)); |
| | | } |
| | | |
| | | // 移动缓冲区位置,继续处理下一个包 |
| | | int remaining = bufferPosition - (startIndex + totalPacketLength); |
| | | if (remaining > 0) { |
| | | System.arraycopy(dataBuffer, startIndex + totalPacketLength, dataBuffer, 0, remaining); |
| | | } |
| | | bufferPosition = remaining; |
| | | processedCount++; |
| | | continue; |
| | | } |
| | | |
| | | // 添加到去重缓存 |
| | | processedPacketCache.add(packetHash); |
| | | |
| | | // 将数据包放入队列供解析 |
| | | if (!dataQueue.offer(packet)) { |
| | | // 队列已满,释放packet引用 |
| | |
| | | } |
| | | |
| | | /** |
| | | * 生成数据包哈希值用于去重 |
| | | * 基于关键字段:主机地址 + 卡槽地址 + 功能码 + 数据内容 |
| | | */ |
| | | private String generatePacketHash(byte[] packet) { |
| | | if (packet == null || packet.length < 7) { |
| | | return ""; |
| | | } |
| | | |
| | | StringBuilder key = new StringBuilder(64); |
| | | |
| | | // 添加协议头信息 |
| | | key.append(packet[4] & 0xFF).append(":"); // 主机地址 (位置4) |
| | | key.append(packet[5] & 0xFF).append(":"); // 卡槽地址 (位置5) |
| | | key.append(packet[6] & 0xFF).append(":"); // 功能码 (位置6) |
| | | |
| | | // 添加数据内容哈希(排除包头和CRC) |
| | | int dataStart = 7; // 数据开始位置 |
| | | int dataEnd = packet.length - 2; // 排除CRC字节 |
| | | |
| | | // 取数据内容的前20个字节作为哈希依据 |
| | | int hashLength = Math.min(dataEnd - dataStart, 20); |
| | | for (int i = dataStart; i < dataStart + hashLength && i < dataEnd; i++) { |
| | | key.append(String.format("%02X", packet[i])); |
| | | } |
| | | |
| | | return key.toString(); |
| | | } |
| | | |
| | | /** |
| | | * 提取数据包,重用byte数组减少对象创建 |
| | | */ |
| | | private byte[] extractPacket(int startIndex, int totalPacketLength) { |
| | |
| | | * 解析数据包并根据功能码调用相应方法 |
| | | */ |
| | | private void parsePacket(byte[] packet) { |
| | | System.out.println("开始解析收到的数据"+bytesToHex(packet)+"时间"+TimestampUtil.getTimestamp()); |
| | | if (packet == null || packet.length < MIN_PACKET_LENGTH) { |
| | | return; |
| | | } |
| | |
| | | byte hostAddress = packet[4]; // 主机地址 |
| | | byte slotAddress = packet[5]; // 卡槽地址 |
| | | byte functionCode = packet[6]; // 功能码 |
| | | |
| | | // 数据长度 (从协议中读取) |
| | | int dataLength = ((packet[2] & 0xFF) << 8) | (packet[3] & 0xFF); |
| | | |
| | |
| | | * 获取解析器状态信息 |
| | | */ |
| | | public String getStatusInfo() { |
| | | return String.format("串口解析器状态: %s, 队列大小: %d/%d, 批量队列: %d/%d, 缓冲区: %d/%d", |
| | | return String.format("串口解析器状态: %s, 队列大小: %d/%d, 批量队列: %d/%d, 缓冲区: %d/%d, 去重缓存: %d/%d", |
| | | isRunning ? "运行中" : "已停止", |
| | | dataQueue.size(), |
| | | dataQueue.remainingCapacity() + dataQueue.size(), |
| | | batchQueue.size(), |
| | | batchQueue.remainingCapacity() + batchQueue.size(), |
| | | bufferPosition, |
| | | dataBuffer.length); |
| | | dataBuffer.length, |
| | | processedPacketCache.size(), |
| | | MAX_CACHE_SIZE); |
| | | } |
| | | |
| | | /** |
| | |
| | | clearQueues(); |
| | | bufferPosition = 0; |
| | | hexBuilder.setLength(0); |
| | | processedPacketCache.clear(); |
| | | } |
| | | |
| | | /** |
| | | * 手动清空数据包去重缓存(用于调试或特殊情况) |
| | | */ |
| | | public void clearPacketCache() { |
| | | processedPacketCache.clear(); |
| | | if (lunxun.DEBUG_ENABLED) { |
| | | SystemDebugDialog.appendAsciiData("数据包去重缓存已手动清空"); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 获取去重缓存大小(用于监控) |
| | | */ |
| | | public int getPacketCacheSize() { |
| | | return processedPacketCache.size(); |
| | | } |
| | | } |