package udpdell; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.SocketException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import gecaoji.Device; import zhuye.Coordinate; public class UDPServer { private static final int PORT = 7000; // 默认UDP监听端口 private static final int BUFFER_SIZE = 65507; // UDP最大包大小 private static final int THREAD_POOL_SIZE = 100; // 线程池大小 private static volatile Thread serverThread; /** * 启动 UDP 服务端监听线程,线程会常驻运行直至进程结束或手动中断。 * * @return 正在运行的监听线程 */ public static synchronized Thread startAsync() { if (serverThread != null && serverThread.isAlive()) { return serverThread; } Thread thread = new Thread(UDPServer::runServerLoop, "UDPServer-Listener"); thread.setDaemon(false); // 保持 JVM 持续存活 thread.start(); serverThread = thread; return thread; } /** * 在当前线程中直接运行 UDP 服务端监听。 */ public static void startSync() { runServerLoop(); } /** * 监听并处理 UDP 数据包。 */ private static void runServerLoop() { ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE); try (DatagramSocket socket = new DatagramSocket(PORT)) { System.out.println("UDPServer started on port " + PORT); while (!Thread.currentThread().isInterrupted()) { byte[] buffer = new byte[BUFFER_SIZE]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); try { socket.receive(packet); executor.submit(new PacketProcessor(packet)); } catch (IOException e) { if (!socket.isClosed()) { System.err.println("Error receiving packet: " + e.getMessage()); } } } } catch (SocketException e) { System.err.println("Could not start server: " + e.getMessage()); } finally { executor.shutdownNow(); } } private static class PacketProcessor implements Runnable { private final DatagramPacket packet; public PacketProcessor(DatagramPacket packet) { this.packet = packet; } public void run() { String receivedData = new String(packet.getData(), 0, packet.getLength()); // 处理可能的连包情况 String[] messages = receivedData.split("\\$"); for (String message : messages) { if (message.isEmpty()) continue; // 重新添加$符号以便统一处理 String fullMessage = "$" + message; processMessage(fullMessage); } } private void processMessage(String message) { String[] fields = message.split(","); // 检查字段数量是否完整 if (fields.length != 21) { System.err.println("Invalid message format, expected 21 fields but got " + fields.length); return; } // 检查包头是否正确 if (!fields[0].equals("$GNGGA")) { System.err.println("Invalid message header: " + fields[0]); return; } System.out.println("收到了差分数据:"+message); Coordinate.parseGNGGAToCoordinateList(message); int count= Coordinate.coordinates.size(); System.out.println("savenum:"+count); Device.updateFromGNGGA(message, fields[15]); // 打印解析后的数据 //System.out.println("UTC时间: " + fields[1].toUpperCase()); //System.out.println("纬度: " + fields[2].toUpperCase()); //System.out.println("纬度半球: " + fields[3].toUpperCase()); //System.out.println("经度: " + fields[4].toUpperCase()); //System.out.println("经度半球: " + fields[5].toUpperCase()); //System.out.println("定位质量指示: " + fields[6].toUpperCase()); //System.out.println("卫星数量: " + fields[7].toUpperCase()); //System.out.println("水平精度因子: " + fields[8].toUpperCase()); //System.out.println("海拔高度: " + fields[9].toUpperCase()); //System.out.println("海拔高度单位: " + fields[10].toUpperCase()); //System.out.println("大地水准面高度: " + fields[11].toUpperCase()); //System.out.println("大地水准面高度单位: " + fields[12].toUpperCase()); //System.out.println("差分时间: " + fields[13].toUpperCase()); //System.out.println("校验和: " + fields[14].toUpperCase()); //System.out.println("设备编号: " + fields[15].toUpperCase()); //System.out.println("设备电量: " + fields[16].toUpperCase()); //System.out.println("卫星信号强度: " + fields[17].toUpperCase()); //System.out.println("保留位1: " + fields[18].toUpperCase()); //System.out.println("保留位2: " + fields[19].toUpperCase()); //System.out.println("保留位3: " + fields[20].toUpperCase()); //System.out.println("----------------------------------------"); } } }