package publicsWay; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import dell55AAData.Dell55AA01Parser; import dell55AAData.Dell55AA12HighPerf; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class PacketProcessingSystem { // Ḭ̈߳²È«µÄ±¨ÎÄ´æ´¢¶ÓÁУ¨¼¯ºÏA£© private static final ConcurrentLinkedQueue packetQueue = new ConcurrentLinkedQueue<>(); private static final AtomicBoolean isRunning = new AtomicBoolean(false); private static final ExecutorService parserExecutor = Executors.newSingleThreadExecutor(); private static Thread parserThread; private static boolean a1=true; // ±¨ÎÄ´æ´¢½á¹¹ public static class HexPacket { public final String ip; public final int port; public final String hexData; public final long timestamp; public HexPacket(String ip, int port, String hexData, long timestamp) { this.ip = ip; this.port = port; this.hexData = hexData; this.timestamp = timestamp; } public String getIp() { return ip; } public int getPort() { return port; } public String getHexData() { return hexData; } public long getTimestamp() { return timestamp; } } // ½ÓÊÕ¶Ë´æ´¢±¨ÎÄ£¨UDPPortAReceiverÖе÷Óã© public static void storePacket(String ip, int port, String hexData) { if(a1) { startProcessing(); a1=false; } if (packetQueue.size() < 100000) { // ÏÞÖÆ¶ÓÁдóС·ÀÖ¹OOM packetQueue.offer(new HexPacket(ip, port, hexData, System.currentTimeMillis())); } } // Æô¶¯½âÎöϵͳ public static void startProcessing() { if (isRunning.get()) return; isRunning.set(true); parserThread = new Thread(() -> { PacketParser bufferManager = new PacketParser(); while (isRunning.get()) { HexPacket packet = packetQueue.poll(); if (packet == null) { Thread.yield(); // ¶ÓÁÐΪ¿ÕʱÈóöCPU continue; } try { // ת»»HEXΪ×Ö½ÚÊý¾Ý byte[] rawData = PacketParser.hexStringToBytes(packet.hexData); String ip=packet.getIp(); int port=packet.getPort(); // ×·¼Óµ½»º³åÇø²¢½âÎö bufferManager.appendData(rawData, rawData.length); List parsedPackets = bufferManager.parsePackets(); // ´¦Àí½âÎöºóµÄÊý¾Ý°ü for (PacketParser.DataPacket p : parsedPackets) { // ¸ù¾Ý°üÍ·ÀàÐÍ·Óɵ½²»Í¬½âÎöÆ÷ switch (p.getPacketType()) { case 0x01: processType01(p,ip,port);break; case 0x02: processType02(p,ip,port);break; case 0x12: processType12(p,ip,port);break; default: System.err.println("δ֪°üÀàÐÍ: " + p.getPacketType()); } } } catch (Exception e) { System.err.println("½âÎö´íÎó: " + e.getMessage()); } } }); parserThread.setDaemon(true); parserThread.start(); } // Í£Ö¹½âÎöϵͳ public static void stopProcessing() { isRunning.set(false); parserExecutor.shutdownNow(); if (parserThread != null) { parserThread.interrupt(); } } // ʾÀý½âÎö·½·¨£¨Ðè¸ù¾Ýʵ¼ÊЭÒéʵÏÖ£© private static void processType01(PacketParser.DataPacket packet,String ip,int port) { String hexData = PacketParser.bytesToHexString(packet.getPacket()); Dell55AA01Parser.parse(hexData,ip,port); } private static void processType02(PacketParser.DataPacket packet,String ip,int port) { String hexData = PacketParser.bytesToHexString(packet.getPacket()); System.out.println("´¦Àí55AA02°ü: " + packet); // ʵ¼ÊÒµÎñÂß¼­ } private static void processType12(PacketParser.DataPacket packet,String ip,int port) { String hexData = PacketParser.bytesToHexString(packet.getPacket()); Dell55AA12HighPerf.parse(hexData,ip,port); } // ÆäËûÀàÐÍ´¦Àí·½·¨... }