package udpdell; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.SocketException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import gecaoji.Device; import zhuye.Coordinate; public class UDPServer { private static final int PORT = 7000; // 默认UDP监听端口 private static final int BUFFER_SIZE = 65507; // UDP最大包大小 private static final int THREAD_POOL_SIZE = 100; // 线程池大小 private static final AtomicInteger RECEIVED_PACKET_COUNTER = new AtomicInteger(0); private static volatile Thread serverThread; /** * 启动 UDP 服务端监听线程,线程会常驻运行直至进程结束或手动中断。 * * @return 正在运行的监听线程 */ public static synchronized Thread startAsync() { if (serverThread != null && serverThread.isAlive()) { return serverThread; } Thread thread = new Thread(UDPServer::runServerLoop, "UDPServer-Listener"); thread.setDaemon(false); // 保持 JVM 持续存活 thread.start(); serverThread = thread; return thread; } /** * 在当前线程中直接运行 UDP 服务端监听。 */ public static void startSync() { runServerLoop(); } /** * 监听并处理 UDP 数据包。 */ private static void runServerLoop() { ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE); try (DatagramSocket socket = new DatagramSocket(PORT)) { System.out.println("UDPServer started on port " + PORT); while (!Thread.currentThread().isInterrupted()) { byte[] buffer = new byte[BUFFER_SIZE]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); try { socket.receive(packet); executor.submit(new PacketProcessor(packet)); } catch (IOException e) { if (!socket.isClosed()) { System.err.println("Error receiving packet: " + e.getMessage()); } } } } catch (SocketException e) { System.err.println("Could not start server: " + e.getMessage()); } finally { executor.shutdownNow(); } } public static void processIncomingMessage(String message) { String[] fields = message.split(","); // 检查字段数量是否完整 if (fields.length != 21) { System.err.println("Invalid message format, expected 21 fields but got " + fields.length); return; } // 检查包头是否正确 if (!fields[0].equals("$GNGGA")) { System.err.println("Invalid message header: " + fields[0]); return; } int sequence = incrementReceivedPacketCounter(); System.out.println("收到了差分数据(" + sequence + "):" + message); Coordinate.parseGNGGAToCoordinateList(message); int count = Coordinate.coordinates.size(); System.out.println("savenum:" + count); Device.updateFromGNGGA(message, fields[15]); } /**处理串口接收到的数据*/ public static void processSerialData(String message) { String[] fields = message.split(","); // 检查字段数量是否完整 if (fields.length < 15) { System.err.println("Invalid serial GNGGA format, expected at least 15 fields but got " + fields.length); return; } // 检查包头是否正确 if (!fields[0].equals("$GNGGA")) { System.err.println("Invalid message header: " + fields[0]); return; } int sequence = incrementReceivedPacketCounter(); System.out.println("收到了串口数据(" + sequence + "):" + message); Coordinate.dellchuankougngga(message); int count = Coordinate.coordinates.size(); System.out.println("savenum:" + count); Device.updateFromSerialGNGGA(message); } private static int incrementReceivedPacketCounter() { return RECEIVED_PACKET_COUNTER.updateAndGet(current -> { int next = current + 1; if (next > 10000 || next <= 0) { next = 1; } return next; }); } public static int getReceivedPacketCount() { return RECEIVED_PACKET_COUNTER.get(); } private static class PacketProcessor implements Runnable { private final DatagramPacket packet; public PacketProcessor(DatagramPacket packet) { this.packet = packet; } @Override public void run() { String receivedData = new String(packet.getData(), 0, packet.getLength()); // 处理可能的连包情况 String[] messages = receivedData.split("\\$"); for (String message : messages) { if (message.isEmpty()) { continue; } // 重新添加$符号以便统一处理 String fullMessage = "$" + message; processIncomingMessage(fullMessage); } } } }