package jiexi; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class PacketProcessingSystem { // 线程安全的报文存储队列(集合A) private static final ConcurrentLinkedQueue packetQueue = new ConcurrentLinkedQueue<>(); private static final AtomicBoolean isRunning = new AtomicBoolean(false); private static final ExecutorService parserExecutor = Executors.newSingleThreadExecutor(); private static Thread parserThread; private static boolean a1=true; // 报文存储结构 public static class HexPacket { public final String ip; public final int port; public final String hexData; public final long timestamp; public HexPacket(String ip, int port, String hexData, long timestamp) { this.ip = ip; this.port = port; this.hexData = hexData; this.timestamp = timestamp; } public String getIp() { return ip; } public int getPort() { return port; } public String getHexData() { return hexData; } public long getTimestamp() { return timestamp; } } // 接收端存储报文(UDPPortAReceiver中调用) public static void storePacket(String ip, int port, String hexData) { if(a1) { startProcessing(); a1=false; } if (packetQueue.size() < 100000) { // 限制队列大小防止OOM packetQueue.offer(new HexPacket(ip, port, hexData, System.currentTimeMillis())); } } // 启动解析系统 public static void startProcessing() { if (isRunning.get()) return; isRunning.set(true); parserThread = new Thread(() -> { PacketParser bufferManager = new PacketParser(); while (isRunning.get()) { HexPacket packet = packetQueue.poll(); if (packet == null) { Thread.yield(); // 队列为空时让出CPU continue; } try { // 转换HEX为字节数据 byte[] rawData = PacketParser.hexStringToBytes(packet.hexData); String ip=packet.getIp(); int port=packet.getPort(); // 追加到缓冲区并解析 bufferManager.appendData(rawData, rawData.length); List parsedPackets = bufferManager.parsePackets(); // 处理解析后的数据包 for (PacketParser.DataPacket p : parsedPackets) { // 根据包头类型路由到不同解析器 switch (p.getPacketType()) { case 0x01: processType01(p,ip,port);break; case 0x02: processType02(p,ip,port);break; case 0x12: processType12(p,ip,port);break; case 0x03: processType03(p,ip,port);break; default: System.err.println("未知包类型: " + p.getPacketType()); } } } catch (Exception e) { System.err.println("解析错误: " + e.getMessage()); } } }); parserThread.setDaemon(true); parserThread.start(); } // 停止解析系统 public static void stopProcessing() { isRunning.set(false); parserExecutor.shutdownNow(); if (parserThread != null) { parserThread.interrupt(); } } // 示例解析方法(需根据实际协议实现) private static void processType01(PacketParser.DataPacket packet,String ip,int port) { String hexData = PacketParser.bytesToHexString(packet.getPacket()); Dell55AA01Parser.parse(hexData,ip,port); } private static void processType02(PacketParser.DataPacket packet,String ip,int port) { String hexData = PacketParser.bytesToHexString(packet.getPacket()); Dell55AA02Parser.parse(hexData,ip,port); // 实际业务逻辑 } private static void processType12(PacketParser.DataPacket packet,String ip,int port) { String hexData = PacketParser.bytesToHexString(packet.getPacket()); Dell55AA12HighPerf.parse(hexData,ip,port); } private static void processType03(PacketParser.DataPacket packet,String ip,int port) { String hexData = PacketParser.bytesToHexString(packet.getPacket()); DellTag55AA03.parse(hexData,ip,port); } // 其他类型处理方法... }