zsh_root
2025-12-10 8d662de2fd262b3a485f16e197cb4d0ca2a61cdf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package jiexi;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class PacketProcessingSystem {
    // 线程安全的报文存储队列(集合A)
    private static final ConcurrentLinkedQueue<HexPacket> packetQueue = new ConcurrentLinkedQueue<>();
    private static final AtomicBoolean isRunning = new AtomicBoolean(false);
    private static final ExecutorService parserExecutor = Executors.newSingleThreadExecutor();
    private static Thread parserThread;
    private static boolean a1=true;
    // 报文存储结构
    public static class HexPacket {
        public final String ip;
        public final int port;
        public final String hexData;
        public final long timestamp;
 
        public HexPacket(String ip, int port, String hexData, long timestamp) {
            this.ip = ip;
            this.port = port;
            this.hexData = hexData;
            this.timestamp = timestamp;
        }
 
        public String getIp() {
            return ip;
        }
 
        public int getPort() {
            return port;
        }
 
        public String getHexData() {
            return hexData;
        }
 
        public long getTimestamp() {
            return timestamp;
        }
    }
 
    // 接收端存储报文(UDPPortAReceiver中调用)
    public static void storePacket(String ip, int port, String hexData) {
        if(a1) {
            startProcessing();
            a1=false;
        }
        if (packetQueue.size() < 100000) { // 限制队列大小防止OOM
            packetQueue.offer(new HexPacket(ip, port, hexData, System.currentTimeMillis()));
        }
    }
 
    // 启动解析系统
    public static void startProcessing() {
        if (isRunning.get()) return;        
        isRunning.set(true);
        parserThread = new Thread(() -> {
            PacketParser bufferManager = new PacketParser();            
            while (isRunning.get()) {
                HexPacket packet = packetQueue.poll();
                if (packet == null) {
                    Thread.yield(); // 队列为空时让出CPU
                    continue;
                }
 
                try {
                    // 转换HEX为字节数据
                    byte[] rawData = PacketParser.hexStringToBytes(packet.hexData); 
                    String ip=packet.getIp();
                    int port=packet.getPort();
                    // 追加到缓冲区并解析
                    bufferManager.appendData(rawData, rawData.length);
                    List<PacketParser.DataPacket> parsedPackets = bufferManager.parsePackets();
 
                    // 处理解析后的数据包
                    for (PacketParser.DataPacket p : parsedPackets) {
                        // 根据包头类型路由到不同解析器                       
                        switch (p.getPacketType()) {
                        case 0x01:
                            processType01(p,ip,port);break;
                        case 0x02:
                            processType02(p,ip,port);break;
                        case 0x12:
                            processType12(p,ip,port);break;
                        case 0x03:
                            processType03(p,ip,port);break;
                        default:
                            System.err.println("未知包类型: " + p.getPacketType());
                        }
                    }
                } catch (Exception e) {
                    System.err.println("解析错误: " + e.getMessage());
                }
            }
        });
 
        parserThread.setDaemon(true);
        parserThread.start();
    }
 
    // 停止解析系统
    public static void stopProcessing() {
        isRunning.set(false);
        parserExecutor.shutdownNow();
        if (parserThread != null) {
            parserThread.interrupt();
        }
    }
 
    // 示例解析方法(需根据实际协议实现)
    private static void processType01(PacketParser.DataPacket packet,String ip,int port) {      
        String hexData = PacketParser.bytesToHexString(packet.getPacket());     
        Dell55AA01Parser.parse(hexData,ip,port);
    }
 
    private static void processType02(PacketParser.DataPacket packet,String ip,int port) {
        String hexData = PacketParser.bytesToHexString(packet.getPacket());
        Dell55AA02Parser.parse(hexData,ip,port);
        // 实际业务逻辑
    }
 
    private static void processType12(PacketParser.DataPacket packet,String ip,int port) {
        String hexData = PacketParser.bytesToHexString(packet.getPacket());
        Dell55AA12HighPerf.parse(hexData,ip,port);
    }
    
    private static void processType03(PacketParser.DataPacket packet,String ip,int port) {
        String hexData = PacketParser.bytesToHexString(packet.getPacket());
        DellTag55AA03.parse(hexData,ip,port);
    }
    // 其他类型处理方法...
}