package udptcp;
|
import java.util.List;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import dell55AAData.Dell55AA01Parser;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
|
public class PacketProcessingSystem {
|
// Ḭ̈߳²È«µÄ±¨ÎÄ´æ´¢¶ÓÁУ¨¼¯ºÏA£©
|
private static final ConcurrentLinkedQueue<HexPacket> packetQueue = new ConcurrentLinkedQueue<>();
|
private static final AtomicBoolean isRunning = new AtomicBoolean(false);
|
private static final ExecutorService parserExecutor = Executors.newSingleThreadExecutor();
|
private static Thread parserThread;
|
private static boolean a1=true;
|
|
// ±¨ÎÄ´æ´¢½á¹¹
|
public static class HexPacket {
|
public final String ip;
|
public final int port;
|
public final String hexData;
|
public final long timestamp;
|
|
public HexPacket(String ip, int port, String hexData, long timestamp) {
|
this.ip = ip;
|
this.port = port;
|
this.hexData = hexData;
|
this.timestamp = timestamp;
|
}
|
|
public String getIp() {
|
return ip;
|
}
|
|
public int getPort() {
|
return port;
|
}
|
|
public String getHexData() {
|
return hexData;
|
}
|
|
public long getTimestamp() {
|
return timestamp;
|
}
|
}
|
|
// ½ÓÊÕ¶Ë´æ´¢±¨ÎÄ£¨UDPPortAReceiverÖе÷Óã©
|
public static void storePacket(String ip, int port, String hexData) {
|
if(a1) {
|
startProcessing();
|
a1=false;
|
}
|
if (packetQueue.size() < 100000) { // ÏÞÖÆ¶ÓÁдóС·ÀÖ¹OOM
|
packetQueue.offer(new HexPacket(ip, port, hexData, System.currentTimeMillis()));
|
}
|
}
|
|
// Æô¶¯½âÎöϵͳ
|
public static void startProcessing() {
|
if (isRunning.get()) return;
|
|
isRunning.set(true);
|
parserThread = new Thread(() -> {
|
PacketParser bufferManager = new PacketParser();
|
|
while (isRunning.get()) {
|
HexPacket packet = packetQueue.poll();
|
if (packet == null) {
|
Thread.yield(); // ¶ÓÁÐΪ¿ÕʱÈóöCPU
|
continue;
|
}
|
|
try {
|
// ת»»HEXΪ×Ö½ÚÊý¾Ý
|
byte[] rawData = PacketParser.hexStringToBytes(packet.hexData);
|
String ip=packet.getIp();
|
int port=packet.getPort();
|
// ×·¼Óµ½»º³åÇø²¢½âÎö
|
bufferManager.appendData(rawData, rawData.length);
|
List<PacketParser.DataPacket> parsedPackets = bufferManager.parsePackets();
|
|
// ´¦Àí½âÎöºóµÄÊý¾Ý°ü
|
for (PacketParser.DataPacket p : parsedPackets) {
|
// ¸ù¾Ý°üÍ·ÀàÐÍ·Óɵ½²»Í¬½âÎöÆ÷
|
switch (p.getPacketType()) {
|
case 0x01:
|
processType01(p,ip,port);break;
|
case 0x02:
|
processType02(p,ip,port);break;
|
|
default:
|
System.err.println("δ֪°üÀàÐÍ: " + p.getPacketType());
|
}
|
}
|
} catch (Exception e) {
|
System.err.println("½âÎö´íÎó: " + e.getMessage());
|
}
|
}
|
});
|
|
parserThread.setDaemon(true);
|
parserThread.start();
|
}
|
|
// Í£Ö¹½âÎöϵͳ
|
public static void stopProcessing() {
|
isRunning.set(false);
|
parserExecutor.shutdownNow();
|
if (parserThread != null) {
|
parserThread.interrupt();
|
}
|
}
|
|
// ʾÀý½âÎö·½·¨£¨Ðè¸ù¾Ýʵ¼ÊÐÒéʵÏÖ£©
|
private static void processType01(PacketParser.DataPacket packet,String ip,int port) {
|
String hexData = PacketParser.bytesToHexString(packet.getPacket());
|
Dell55AA01Parser.parse(hexData,ip,port);
|
}
|
|
private static void processType02(PacketParser.DataPacket packet,String ip,int port) {
|
System.out.println("´¦Àí55AA02°ü: " + packet);
|
// ʵ¼ÊÒµÎñÂß¼
|
}
|
|
// ÆäËûÀàÐÍ´¦Àí·½·¨...
|
}
|