张世豪
2025-11-26 2b756769ea4adad21332d8a294871712cd42cc3f
src/publicway/SerialProtocolParser.java
@@ -2,9 +2,12 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.Collections;
import java.util.Set;
import chuankou.SerialPortService;
import chushihua.SlotManager;
@@ -54,6 +57,13 @@
    // 对象池,减少对象创建
    private final Object packetPoolLock = new Object();
    
    // ==================== 新增:数据包去重缓存 ====================
    private final Set<String> processedPacketCache = Collections.newSetFromMap(
        new ConcurrentHashMap<String, Boolean>());
    private static final int MAX_CACHE_SIZE = 1000;
    private static final long CACHE_CLEANUP_INTERVAL = 60000; // 1分钟清理一次缓存
    private long lastCacheCleanupTime = 0;
    /**
     * 启动解析器
     */
@@ -64,9 +74,8 @@
        
        isRunning = true;
        
        // 启动批量处理(每50ms处理一次)
        batchExecutor.scheduleAtFixedRate(this::batchProcess, 50, 50, TimeUnit.MILLISECONDS);
        // 启动批量处理(每10ms处理一次)
        batchExecutor.scheduleAtFixedRate(this::batchProcess, 10, 10, TimeUnit.MILLISECONDS);
        processorThread = new Thread(this::processPackets, "Serial-Protocol-Parser");
        processorThread.setDaemon(true);
        processorThread.start();
@@ -106,6 +115,9 @@
        // 清空队列和缓冲区
        clearQueues();
        bufferPosition = 0;
        // 清空去重缓存
        processedPacketCache.clear();
    }
    
    /**
@@ -203,6 +215,9 @@
            // 定期检查内存
            checkMemory();
            
            // 定期清理去重缓存
            cleanupPacketCache();
        } catch (Exception e) {
            System.err.println("批量处理数据时发生异常: " + e.getMessage());
            // 发生异常时重置状态
@@ -275,6 +290,26 @@
    }
    
    /**
     * 清理数据包去重缓存
     */
    private void cleanupPacketCache() {
        long currentTime = System.currentTimeMillis();
        if (currentTime - lastCacheCleanupTime < CACHE_CLEANUP_INTERVAL) {
            return;
        }
        lastCacheCleanupTime = currentTime;
        // 如果缓存大小超过限制,清空缓存
        if (processedPacketCache.size() >= MAX_CACHE_SIZE) {
            processedPacketCache.clear();
            if (lunxun.DEBUG_ENABLED) {
                SystemDebugDialog.appendAsciiData("数据包去重缓存已清空,当前大小: " + processedPacketCache.size());
            }
        }
    }
    /**
     * 处理缓冲区中的数据,解析完整数据包
     */
    private void processBuffer() {
@@ -318,6 +353,27 @@
            // 提取完整数据包
            byte[] packet = extractPacket(startIndex, totalPacketLength);
            if (packet != null) {
                // ==================== 新增:数据包去重检查 ====================
                String packetHash = generatePacketHash(packet);
                if (processedPacketCache.contains(packetHash)) {
                    // 跳过重复数据包
                    if (lunxun.DEBUG_ENABLED) {
                        SystemDebugDialog.appendAsciiData("跳过重复数据包: " + bytesToHex(packet));
                    }
                    // 移动缓冲区位置,继续处理下一个包
                    int remaining = bufferPosition - (startIndex + totalPacketLength);
                    if (remaining > 0) {
                        System.arraycopy(dataBuffer, startIndex + totalPacketLength, dataBuffer, 0, remaining);
                    }
                    bufferPosition = remaining;
                    processedCount++;
                    continue;
                }
                // 添加到去重缓存
                processedPacketCache.add(packetHash);
                // 将数据包放入队列供解析
                if (!dataQueue.offer(packet)) {
                    // 队列已满,释放packet引用
@@ -339,6 +395,35 @@
    }
    
    /**
     * 生成数据包哈希值用于去重
     * 基于关键字段:主机地址 + 卡槽地址 + 功能码 + 数据内容
     */
    private String generatePacketHash(byte[] packet) {
        if (packet == null || packet.length < 7) {
            return "";
        }
        StringBuilder key = new StringBuilder(64);
        // 添加协议头信息
        key.append(packet[4] & 0xFF).append(":"); // 主机地址 (位置4)
        key.append(packet[5] & 0xFF).append(":"); // 卡槽地址 (位置5)
        key.append(packet[6] & 0xFF).append(":"); // 功能码 (位置6)
        // 添加数据内容哈希(排除包头和CRC)
        int dataStart = 7; // 数据开始位置
        int dataEnd = packet.length - 2; // 排除CRC字节
        // 取数据内容的前20个字节作为哈希依据
        int hashLength = Math.min(dataEnd - dataStart, 20);
        for (int i = dataStart; i < dataStart + hashLength && i < dataEnd; i++) {
            key.append(String.format("%02X", packet[i]));
        }
        return key.toString();
    }
    /**
     * 提取数据包,重用byte数组减少对象创建
     */
    private byte[] extractPacket(int startIndex, int totalPacketLength) {
@@ -408,6 +493,7 @@
     * 解析数据包并根据功能码调用相应方法
     */
    private void parsePacket(byte[] packet) {
        System.out.println("开始解析收到的数据"+bytesToHex(packet)+"时间"+TimestampUtil.getTimestamp());
        if (packet == null || packet.length < MIN_PACKET_LENGTH) {
            return;
        }
@@ -418,6 +504,7 @@
            byte hostAddress = packet[4];        // 主机地址
            byte slotAddress = packet[5];        // 卡槽地址
            byte functionCode = packet[6];       // 功能码           
            // 数据长度 (从协议中读取)
            int dataLength = ((packet[2] & 0xFF) << 8) | (packet[3] & 0xFF);
            
@@ -529,14 +616,16 @@
     * 获取解析器状态信息
     */
    public String getStatusInfo() {
        return String.format("串口解析器状态: %s, 队列大小: %d/%d, 批量队列: %d/%d, 缓冲区: %d/%d",
        return String.format("串口解析器状态: %s, 队列大小: %d/%d, 批量队列: %d/%d, 缓冲区: %d/%d, 去重缓存: %d/%d",
                           isRunning ? "运行中" : "已停止", 
                           dataQueue.size(), 
                           dataQueue.remainingCapacity() + dataQueue.size(),
                           batchQueue.size(),
                           batchQueue.remainingCapacity() + batchQueue.size(),
                           bufferPosition,
                           dataBuffer.length);
                           dataBuffer.length,
                           processedPacketCache.size(),
                           MAX_CACHE_SIZE);
    }
    
    /**
@@ -554,5 +643,23 @@
        clearQueues();
        bufferPosition = 0;
        hexBuilder.setLength(0);
        processedPacketCache.clear();
    }
    /**
     * 手动清空数据包去重缓存(用于调试或特殊情况)
     */
    public void clearPacketCache() {
        processedPacketCache.clear();
        if (lunxun.DEBUG_ENABLED) {
            SystemDebugDialog.appendAsciiData("数据包去重缓存已手动清空");
        }
    }
    /**
     * 获取去重缓存大小(用于监控)
     */
    public int getPacketCacheSize() {
        return processedPacketCache.size();
    }
}