张世豪
2025-12-02 6799351be12deb2f713f2c0a2b4c467a6d1098c3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package udpdell;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
import gecaoji.Device;
import zhuye.Coordinate;
 
public class UDPServer {
    private static final int PORT = 7000; // 默认UDP监听端口
    private static final int BUFFER_SIZE = 65507; // UDP最大包大小
    private static final int THREAD_POOL_SIZE = 100; // 线程池大小
 
    private static volatile Thread serverThread;
 
    /**
     * 启动 UDP 服务端监听线程,线程会常驻运行直至进程结束或手动中断。
     *
     * @return 正在运行的监听线程
     */
    public static synchronized Thread startAsync() {
        if (serverThread != null && serverThread.isAlive()) {
            return serverThread;
        }
 
        Thread thread = new Thread(UDPServer::runServerLoop, "UDPServer-Listener");
        thread.setDaemon(false); // 保持 JVM 持续存活
        thread.start();
        serverThread = thread;
        return thread;
    }
 
    /**
     * 在当前线程中直接运行 UDP 服务端监听。
     */
    public static void startSync() {
        runServerLoop();
    }
 
    /**
     * 监听并处理 UDP 数据包。
     */
    private static void runServerLoop() {
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
 
        try (DatagramSocket socket = new DatagramSocket(PORT)) {
            System.out.println("UDPServer started on port " + PORT);
 
            while (!Thread.currentThread().isInterrupted()) {
                byte[] buffer = new byte[BUFFER_SIZE];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
 
                try {
                    socket.receive(packet);
                    executor.submit(new PacketProcessor(packet));
                } catch (IOException e) {
                    if (!socket.isClosed()) {
                        System.err.println("Error receiving packet: " + e.getMessage());
                    }
                }
            }
        } catch (SocketException e) {
            System.err.println("Could not start server: " + e.getMessage());
        } finally {
            executor.shutdownNow();
        }
    }
 
    public static void processIncomingMessage(String message) {
        String[] fields = message.split(",");
        // 检查字段数量是否完整
        if (fields.length != 21) {
            System.err.println("Invalid message format, expected 21 fields but got " + fields.length);
            return;
        }
 
        // 检查包头是否正确
        if (!fields[0].equals("$GNGGA")) {
            System.err.println("Invalid message header: " + fields[0]);
            return;
        }
        System.out.println("收到了差分数据:" + message);
        Coordinate.parseGNGGAToCoordinateList(message);
        int count = Coordinate.coordinates.size();
        System.out.println("savenum:" + count);
 
        Device.updateFromGNGGA(message, fields[15]);
    }
 
    private static class PacketProcessor implements Runnable {
        private final DatagramPacket packet;
 
        public PacketProcessor(DatagramPacket packet) {
            this.packet = packet;
        }
 
        @Override
        public void run() {
            String receivedData = new String(packet.getData(), 0, packet.getLength());
            // 处理可能的连包情况
            String[] messages = receivedData.split("\\$");
 
            for (String message : messages) {
                if (message.isEmpty()) {
                    continue;
                }
                // 重新添加$符号以便统一处理
                String fullMessage = "$" + message;
                processIncomingMessage(fullMessage);
            }
        }
    }
}