826220679@qq.com
9 天以前 fb883547ede83b1c758b1a9a025898ba3f83497a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package udptcp;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import dell55AAData.Dell55AA01Parser;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class PacketProcessingSystem {
    // Ḭ̈߳²È«µÄ±¨ÎÄ´æ´¢¶ÓÁУ¨¼¯ºÏA£©
    private static final ConcurrentLinkedQueue<HexPacket> packetQueue = new ConcurrentLinkedQueue<>();
    private static final AtomicBoolean isRunning = new AtomicBoolean(false);
    private static final ExecutorService parserExecutor = Executors.newSingleThreadExecutor();
    private static Thread parserThread;
    private static boolean a1=true;
 
    // ±¨ÎÄ´æ´¢½á¹¹
    public static class HexPacket {
        public final String ip;
        public final int port;
        public final String hexData;
        public final long timestamp;
 
        public HexPacket(String ip, int port, String hexData, long timestamp) {
            this.ip = ip;
            this.port = port;
            this.hexData = hexData;
            this.timestamp = timestamp;
        }
 
        public String getIp() {
            return ip;
        }
 
        public int getPort() {
            return port;
        }
 
        public String getHexData() {
            return hexData;
        }
 
        public long getTimestamp() {
            return timestamp;
        }
    }
 
    // ½ÓÊÕ¶Ë´æ´¢±¨ÎÄ£¨UDPPortAReceiverÖе÷Óã©
    public static void storePacket(String ip, int port, String hexData) {
        if(a1) {
            startProcessing();
            a1=false;
        }
        if (packetQueue.size() < 100000) { // ÏÞÖÆ¶ÓÁдóС·ÀÖ¹OOM
            packetQueue.offer(new HexPacket(ip, port, hexData, System.currentTimeMillis()));
        }
    }
 
    // Æô¶¯½âÎöϵͳ
    public static void startProcessing() {
        if (isRunning.get()) return;
        
        isRunning.set(true);
        parserThread = new Thread(() -> {
            PacketParser bufferManager = new PacketParser();
            
            while (isRunning.get()) {
                HexPacket packet = packetQueue.poll();
                if (packet == null) {
                    Thread.yield(); // ¶ÓÁÐΪ¿ÕʱÈóöCPU
                    continue;
                }
                
                try {
                    // ×ª»»HEXΪ×Ö½ÚÊý¾Ý
                    byte[] rawData = PacketParser.hexStringToBytes(packet.hexData); 
                    String ip=packet.getIp();
                    int port=packet.getPort();
                    // ×·¼Óµ½»º³åÇø²¢½âÎö
                    bufferManager.appendData(rawData, rawData.length);
                    List<PacketParser.DataPacket> parsedPackets = bufferManager.parsePackets();
                    
                    // ´¦Àí½âÎöºóµÄÊý¾Ý°ü
                    for (PacketParser.DataPacket p : parsedPackets) {
                        // ¸ù¾Ý°üÍ·ÀàÐÍ·Óɵ½²»Í¬½âÎöÆ÷
                        switch (p.getPacketType()) {
                            case 0x01:
                                processType01(p,ip,port);break;
                            case 0x02:
                                processType02(p,ip,port);break;
                            
                            default:
                                System.err.println("δ֪°üÀàÐÍ: " + p.getPacketType());
                        }
                    }
                } catch (Exception e) {
                    System.err.println("½âÎö´íÎó: " + e.getMessage());
                }
            }
        });
        
        parserThread.setDaemon(true);
        parserThread.start();
    }
 
    // Í£Ö¹½âÎöϵͳ
    public static void stopProcessing() {
        isRunning.set(false);
        parserExecutor.shutdownNow();
        if (parserThread != null) {
            parserThread.interrupt();
        }
    }
 
    // Ê¾Àý½âÎö·½·¨£¨Ðè¸ù¾Ýʵ¼ÊЭÒéʵÏÖ£©
    private static void processType01(PacketParser.DataPacket packet,String ip,int port) {        
        String hexData = PacketParser.bytesToHexString(packet.getPacket());        
        Dell55AA01Parser.parse(hexData,ip,port);
    }
 
    private static void processType02(PacketParser.DataPacket packet,String ip,int port) {
        System.out.println("´¦Àí55AA02°ü: " + packet);
        // Êµ¼ÊÒµÎñÂß¼­
    }
    
    // ÆäËûÀàÐÍ´¦Àí·½·¨...
}