张世豪
5 小时以前 d22349714c8d199c02f336f90fba841ef8f5cd39
src/publicway/SerialProtocolParser.java
@@ -1,4 +1,5 @@
package publicway;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
@@ -27,15 +28,15 @@
    private static final byte FUNCTION_82 = (byte) 0x82; // 单板升级使能
    private static final byte FUNCTION_83 = (byte) 0x83;   // 单板升级数据包
    
    // 数据缓冲区,用于处理粘包
    // 数据缓冲区,用于处理粘包 - 改为直接ByteArrayOutputStream管理
    private byte[] dataBuffer = new byte[1024];
    private int bufferPosition = 0;
    
    // 数据接收队列
    private BlockingQueue<byte[]> dataQueue = new ArrayBlockingQueue<>(100);
    // 数据接收队列 - 限制队列大小防止内存无限增长
    private BlockingQueue<byte[]> dataQueue = new ArrayBlockingQueue<>(50);
    
    // 批量处理队列
    private final BlockingQueue<byte[]> batchQueue = new ArrayBlockingQueue<>(1000);
    // 批量处理队列 - 限制队列大小
    private final BlockingQueue<byte[]> batchQueue = new ArrayBlockingQueue<>(200);
    private final ScheduledExecutorService batchExecutor = 
        Executors.newSingleThreadScheduledExecutor();
    
@@ -46,12 +47,18 @@
    // 重用StringBuilder减少对象创建
    private final StringBuilder hexBuilder = new StringBuilder(256);
    
    // 内存监控计数器
    private long lastMemoryCheckTime = 0;
    private static final long MEMORY_CHECK_INTERVAL = 30000; // 30秒检查一次
    // 对象池,减少对象创建
    private final Object packetPoolLock = new Object();
    /**
     * 启动解析器
     */
    public void start() {
        if (isRunning) {
            //System.out.println("串口协议解析器已经在运行中");
            return;
        }
        
@@ -63,8 +70,6 @@
        processorThread = new Thread(this::processPackets, "Serial-Protocol-Parser");
        processorThread.setDaemon(true);
        processorThread.start();
        //System.out.println("串口协议解析器已启动 - 独立于轮询状态运行");
    }
    
    /**
@@ -99,11 +104,16 @@
        }
        
        // 清空队列和缓冲区
        clearQueues();
        bufferPosition = 0;
    }
    /**
     * 清空所有队列,释放内存
     */
    private void clearQueues() {
        dataQueue.clear();
        batchQueue.clear();
        bufferPosition = 0;
        //System.out.println("串口协议解析器已停止");
    }
    
    /**
@@ -126,9 +136,31 @@
            return;
        }
        
        // 将数据添加到批量队列 - 确保始终执行
        // 检查队列状态,避免内存无限增长
        if (batchQueue.size() > batchQueue.remainingCapacity() * 0.8) {
            // 队列接近满时,丢弃最老的数据
            if (!batchQueue.isEmpty()) {
                batchQueue.poll(); // 移除一个旧数据
            }
        }
        // 将数据添加到批量队列
        if (!batchQueue.offer(rawData)) {
            System.err.println("批量队列已满,丢弃数据");
            // 队列已满时的处理
            handleQueueFull();
        }
    }
    /**
     * 处理队列满的情况
     */
    private void handleQueueFull() {
        // 清空队列重新开始,避免内存泄漏
        clearQueues();
        bufferPosition = 0; // 重置缓冲区
        if (lunxun.DEBUG_ENABLED) {
            SystemDebugDialog.appendAsciiData("警告:数据处理队列已满,已清空队列重新开始");
        }
    }
    
@@ -140,39 +172,105 @@
            return;
        }
        
        // 批量处理数据
        java.util.List<byte[]> batch = new java.util.ArrayList<>(100);
        batchQueue.drainTo(batch, 100);
        for (byte[] rawData : batch) {
            // 将数据添加到缓冲区
            if (bufferPosition + rawData.length > dataBuffer.length) {
                // 缓冲区不足时,清理并重新开始
                System.arraycopy(dataBuffer, bufferPosition - rawData.length, dataBuffer, 0, rawData.length);
                bufferPosition = rawData.length;
            } else {
        try {
            // 批量处理数据,限制每次处理的最大数量
            int maxBatchSize = 50;
            java.util.List<byte[]> batch = new java.util.ArrayList<>(maxBatchSize);
            batchQueue.drainTo(batch, maxBatchSize);
            for (byte[] rawData : batch) {
                // 检查缓冲区容量,避免溢出
                if (bufferPosition + rawData.length > dataBuffer.length) {
                    // 缓冲区不足时的处理
                    if (rawData.length > dataBuffer.length) {
                        // 单条数据超过缓冲区大小,直接处理或丢弃
                        handleOversizedPacket(rawData);
                        continue;
                    } else {
                        // 移动有效数据到缓冲区开头
                        compactBuffer(0);
                    }
                }
                // 将数据添加到缓冲区
                System.arraycopy(rawData, 0, dataBuffer, bufferPosition, rawData.length);
                bufferPosition += rawData.length;
                // 处理缓冲区中的数据
                processBuffer();
            }
            
            // 处理缓冲区中的数据
            processBuffer();
            // 定期检查内存
            checkMemory();
        } catch (Exception e) {
            System.err.println("批量处理数据时发生异常: " + e.getMessage());
            // 发生异常时重置状态
            resetParserState();
        }
        // 定期检查内存
        checkMemory();
    }
    /**
     * 处理过大的数据包
     */
    private void handleOversizedPacket(byte[] oversizedData) {
        // 对于过大的数据包,直接尝试查找起始标记
        int startIndex = findStartMarkerInArray(oversizedData, 0);
        if (startIndex != -1) {
            // 找到起始标记,将剩余数据放入缓冲区
            int remainingLength = oversizedData.length - startIndex;
            if (remainingLength <= dataBuffer.length) {
                System.arraycopy(oversizedData, startIndex, dataBuffer, 0, remainingLength);
                bufferPosition = remainingLength;
                processBuffer();
            }
        }
        // 否则丢弃该数据包
    }
    /**
     * 在指定数组中查找起始标记
     */
    private int findStartMarkerInArray(byte[] data, int startPos) {
        for (int i = startPos; i <= data.length - START_MARKER.length; i++) {
            if (data[i] == START_MARKER[0] && data[i + 1] == START_MARKER[1]) {
                return i;
            }
        }
        return -1;
    }
    /**
     * 重置解析器状态
     */
    private void resetParserState() {
        bufferPosition = 0;
        clearQueues();
    }
    
    /**
     * 内存监控
     */
    private void checkMemory() {
        long currentTime = System.currentTimeMillis();
        if (currentTime - lastMemoryCheckTime < MEMORY_CHECK_INTERVAL) {
            return;
        }
        lastMemoryCheckTime = currentTime;
        Runtime runtime = Runtime.getRuntime();
        long usedMem = runtime.totalMemory() - runtime.freeMemory();
        long maxMem = runtime.maxMemory();
        
        if (usedMem > maxMem * 0.8) {
            //System.out.println("内存使用率超过80%,当前使用: " + (usedMem / 1024 / 1024) + "MB");
        if (usedMem > maxMem * 0.75) {
            // 内存使用率超过75%时进行垃圾回收
            System.gc();
            if (lunxun.DEBUG_ENABLED) {
                SystemDebugDialog.appendAsciiData("内存使用率超过75%,已触发垃圾回收。使用内存: " +
                    (usedMem / 1024 / 1024) + "MB");
            }
        }
    }
    
@@ -180,7 +278,10 @@
     * 处理缓冲区中的数据,解析完整数据包
     */
    private void processBuffer() {
        while (bufferPosition >= MIN_PACKET_LENGTH) {
        int processedCount = 0;
        final int MAX_PACKETS_PER_BATCH = 20; // 限制每次处理的最大包数量
        while (bufferPosition >= MIN_PACKET_LENGTH && processedCount < MAX_PACKETS_PER_BATCH) {
            // 查找起始标记
            int startIndex = findStartMarker();
            if (startIndex == -1) {
@@ -200,6 +301,13 @@
            int dataLength = ((dataBuffer[startIndex + 2] & 0xFF) << 8) | (dataBuffer[startIndex + 3] & 0xFF);
            int totalPacketLength = 2 + 2 + dataLength + 2; // 起始标记2 + 数据长度2 + 数据内容 + CRC2
            
            // 检查数据长度是否合理
            if (dataLength < 0 || totalPacketLength > dataBuffer.length) {
                // 数据长度异常,跳过这个包
                compactBuffer(startIndex + 1);
                continue;
            }
            // 检查是否收到完整数据包
            if (startIndex + totalPacketLength > bufferPosition) {
                // 数据包不完整,等待更多数据
@@ -208,16 +316,15 @@
            }
            
            // 提取完整数据包
            byte[] packet = new byte[totalPacketLength];
            System.arraycopy(dataBuffer, startIndex, packet, 0, totalPacketLength);
            // 将数据包放入队列供解析
            try {
            byte[] packet = extractPacket(startIndex, totalPacketLength);
            if (packet != null) {
                // 将数据包放入队列供解析
                if (!dataQueue.offer(packet)) {
                    System.err.println("数据队列已满,丢弃数据包");
                    // 队列已满,释放packet引用
                    packet = null;
                    handleDataQueueFull();
                    return;
                }
            } catch (Exception e) {
                System.err.println("放入数据队列时发生异常: " + e.getMessage());
            }
            
            // 移动缓冲区位置
@@ -226,6 +333,29 @@
                System.arraycopy(dataBuffer, startIndex + totalPacketLength, dataBuffer, 0, remaining);
            }
            bufferPosition = remaining;
            processedCount++;
        }
    }
    /**
     * 提取数据包,重用byte数组减少对象创建
     */
    private byte[] extractPacket(int startIndex, int totalPacketLength) {
        byte[] packet = new byte[totalPacketLength];
        System.arraycopy(dataBuffer, startIndex, packet, 0, totalPacketLength);
        return packet;
    }
    /**
     * 处理数据队列满的情况
     */
    private void handleDataQueueFull() {
        // 丢弃队列中最老的数据
        dataQueue.poll();
        if (lunxun.DEBUG_ENABLED) {
            SystemDebugDialog.appendAsciiData("数据解析队列已满,丢弃最老数据包");
        }
    }
    
@@ -245,7 +375,7 @@
     * 压缩缓冲区,将有效数据移到开头
     */
    private void compactBuffer(int startIndex) {
        if (startIndex > 0) {
        if (startIndex > 0 && bufferPosition > startIndex) {
            System.arraycopy(dataBuffer, startIndex, dataBuffer, 0, bufferPosition - startIndex);
            bufferPosition -= startIndex;
        }
@@ -267,7 +397,7 @@
                break;
            } catch (Exception e) {
                System.err.println("处理数据包时发生异常: " + e.getMessage());
                e.printStackTrace();
                // 继续运行,不退出线程
            }
        }
        
@@ -278,8 +408,12 @@
     * 解析数据包并根据功能码调用相应方法
     */
    private void parsePacket(byte[] packet) {
        if (packet == null || packet.length < MIN_PACKET_LENGTH) {
            return;
        }
        try {
           SerialPortService.getReceivedDataCount();
            SerialPortService.getReceivedDataCount();
            // 解析基本字段
            byte hostAddress = packet[4];        // 主机地址
            byte slotAddress = packet[5];        // 卡槽地址
@@ -290,7 +424,7 @@
            // 返回值数据
            int returnValueLength = dataLength - 3; // N-3 (减去主机地址、卡槽地址、功能码)
            byte[] returnValue = null;
            if (returnValueLength > 0) {
            if (returnValueLength > 0 && returnValueLength <= packet.length - 7) {
                returnValue = new byte[returnValueLength];
                System.arraycopy(packet, 7, returnValue, 0, returnValueLength);
            }
@@ -301,12 +435,12 @@
                    if (returnValue != null) {
                        // 使用优化的字节数组解析方法,避免字符串转换
                        ParseResult rst = ProtocolParser01.parseDDCC01Data(packet);
                        rst.fuzhi();
//                        System.out.println(rst.toString());
                        if (lunxun.DEBUG_ENABLED) {
                            SystemDebugDialog.appendAsciiData(rst.toString());
                        if (rst != null) {
                            rst.fuzhi();
                            if (lunxun.DEBUG_ENABLED) {
                                SystemDebugDialog.appendAsciiData(rst.toString());
                            }
                        }
                    }
                    break;
                case FUNCTION_51:
@@ -315,36 +449,37 @@
                    int result = ProtocolParser51.parse(hexPacket);
                    int slot = slotAddress;
                    if (result == 1) {
//                       Dingshidialog.showTimedDialog(null, 5,slot+"号卡槽出卡成功请取走卡...");
                       SlotManager.changgehaska(slot, result);
                        SlotManager.changgehaska(slot, result);
                    } else {
                       String message=slot+"号卡槽取卡失败";
                        String message = slot + "号卡槽取卡失败";
                        Charulog.logOperation(message);
                    }
                    break;
                case FUNCTION_52:
                    //System.out.println("功能码 0x52 - LED亮度控制");
                    // LED亮度控制 - 无操作
                    break;
                case FUNCTION_80:
                    //System.out.println("功能码 0x80 - 工卡升级使能");
                    // 工卡升级使能 - 无操作
                    break;
                case FUNCTION_81:
                    //System.out.println("功能码 0x81 - 工作卡升级数据包");
                    // 工作卡升级数据包 - 无操作
                    break;
                case FUNCTION_82:
                    //System.out.println("功能码 0x82 - 单板升级使能");
                    // 单板升级使能 - 无操作
                    break;
                case FUNCTION_83:
                    //System.out.println("功能码 0x83 - 单板升级数据包");
                    // 单板升级数据包 - 无操作
                    break;
                default:
                    System.err.println("未知功能码: 0x" + Integer.toHexString(functionCode & 0xFF));
                    if (lunxun.DEBUG_ENABLED) {
                        System.err.println("未知功能码: 0x" + Integer.toHexString(functionCode & 0xFF));
                    }
                    break;
            }
            
        } catch (Exception e) {
            System.err.println("解析数据包时发生错误: " + e.getMessage());
            e.printStackTrace();
            // 不打印堆栈跟踪,避免产生大量日志对象
        }
    }
    
@@ -352,6 +487,10 @@
     * 优化的字节数组转十六进制字符串方法
     */
    private String bytesToHex(byte[] bytes) {
        if (bytes == null || bytes.length == 0) {
            return "";
        }
        hexBuilder.setLength(0); // 清空重用
        for (byte b : bytes) {
            hexBuilder.append(String.format("%02X", b));
@@ -390,12 +529,14 @@
     * 获取解析器状态信息
     */
    public String getStatusInfo() {
        return String.format("串口解析器状态: %s, 队列大小: %d/%d, 批量队列: %d/%d",
        return String.format("串口解析器状态: %s, 队列大小: %d/%d, 批量队列: %d/%d, 缓冲区: %d/%d",
                           isRunning ? "运行中" : "已停止", 
                           dataQueue.size(), 
                           dataQueue.remainingCapacity() + dataQueue.size(),
                           batchQueue.size(),
                           batchQueue.remainingCapacity() + batchQueue.size());
                           batchQueue.remainingCapacity() + batchQueue.size(),
                           bufferPosition,
                           dataBuffer.length);
    }
    
    /**
@@ -404,4 +545,14 @@
    public void setMaxRawDataPrintLength(int length) {
        // 实现根据需要调整
    }
    /**
     * 主动清理资源
     */
    public void cleanup() {
        stop();
        clearQueues();
        bufferPosition = 0;
        hexBuilder.setLength(0);
    }
}