| | |
| | | import java.util.concurrent.Executors; |
| | | |
| | | public class PacketProcessingSystem { |
| | | // 线程安全的报文存储队列(集合A) |
| | | // 线程安全的报文存储队列(集合A) |
| | | private static final ConcurrentLinkedQueue<HexPacket> packetQueue = new ConcurrentLinkedQueue<>(); |
| | | private static final AtomicBoolean isRunning = new AtomicBoolean(false); |
| | | private static final ExecutorService parserExecutor = Executors.newSingleThreadExecutor(); |
| | | private static Thread parserThread; |
| | | private static boolean a1=true; |
| | | // 报文存储结构 |
| | | // 报文存储结构 |
| | | public static class HexPacket { |
| | | public final String ip; |
| | | public final int port; |
| | |
| | | } |
| | | } |
| | | |
| | | // 接收端存储报文(UDPPortAReceiver中调用) |
| | | // 接收端存储报文(UDPPortAReceiver中调用) |
| | | public static void storePacket(String ip, int port, String hexData) { |
| | | if(a1) { |
| | | startProcessing(); |
| | | a1=false; |
| | | } |
| | | if (packetQueue.size() < 100000) { // 限制队列大小防止OOM |
| | | if (packetQueue.size() < 100000) { // 限制队列大小防止OOM |
| | | packetQueue.offer(new HexPacket(ip, port, hexData, System.currentTimeMillis())); |
| | | } |
| | | } |
| | | |
| | | // 启动解析系统 |
| | | // 启动解析系统 |
| | | public static void startProcessing() { |
| | | if (isRunning.get()) return; |
| | | isRunning.set(true); |
| | |
| | | while (isRunning.get()) { |
| | | HexPacket packet = packetQueue.poll(); |
| | | if (packet == null) { |
| | | Thread.yield(); // 队列为空时让出CPU |
| | | Thread.yield(); // 队列为空时让出CPU |
| | | continue; |
| | | } |
| | | |
| | | try { |
| | | // 转换HEX为字节数据 |
| | | // 转换HEX为字节数据 |
| | | byte[] rawData = PacketParser.hexStringToBytes(packet.hexData); |
| | | String ip=packet.getIp(); |
| | | int port=packet.getPort(); |
| | | // 追加到缓冲区并解析 |
| | | // 追加到缓冲区并解析 |
| | | bufferManager.appendData(rawData, rawData.length); |
| | | List<PacketParser.DataPacket> parsedPackets = bufferManager.parsePackets(); |
| | | |
| | | // 处理解析后的数据包 |
| | | // 处理解析后的数据包 |
| | | for (PacketParser.DataPacket p : parsedPackets) { |
| | | // 根据包头类型路由到不同解析器 |
| | | // 根据包头类型路由到不同解析器 |
| | | switch (p.getPacketType()) { |
| | | case 0x01: |
| | | processType01(p,ip,port);break; |
| | |
| | | case 0x03: |
| | | processType03(p,ip,port);break; |
| | | default: |
| | | System.err.println("未知包类型: " + p.getPacketType()); |
| | | System.err.println("未知包类型: " + p.getPacketType()); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | System.err.println("解析错误: " + e.getMessage()); |
| | | System.err.println("解析错误: " + e.getMessage()); |
| | | } |
| | | } |
| | | }); |
| | |
| | | parserThread.start(); |
| | | } |
| | | |
| | | // 停止解析系统 |
| | | // 停止解析系统 |
| | | public static void stopProcessing() { |
| | | isRunning.set(false); |
| | | parserExecutor.shutdownNow(); |
| | |
| | | } |
| | | } |
| | | |
| | | // 示例解析方法(需根据实际协议实现) |
| | | // 示例解析方法(需根据实际协议实现) |
| | | private static void processType01(PacketParser.DataPacket packet,String ip,int port) { |
| | | String hexData = PacketParser.bytesToHexString(packet.getPacket()); |
| | | Dell55AA01Parser.parse(hexData,ip,port); |
| | |
| | | private static void processType02(PacketParser.DataPacket packet,String ip,int port) { |
| | | String hexData = PacketParser.bytesToHexString(packet.getPacket()); |
| | | Dell55AA02Parser.parse(hexData,ip,port); |
| | | // 实际业务逻辑 |
| | | // 实际业务逻辑 |
| | | } |
| | | |
| | | private static void processType12(PacketParser.DataPacket packet,String ip,int port) { |
| | |
| | | String hexData = PacketParser.bytesToHexString(packet.getPacket()); |
| | | DellTag55AA03.parse(hexData,ip,port); |
| | | } |
| | | // 其他类型处理方法... |
| | | // 其他类型处理方法... |
| | | } |