package udpdell;
|
import java.io.IOException;
|
import java.net.DatagramPacket;
|
import java.net.DatagramSocket;
|
import java.net.SocketException;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import gecaoji.Device;
|
import zhuye.Coordinate;
|
|
public class UDPServer {
|
private static final int PORT = 7000; // 默认UDP监听端口
|
private static final int BUFFER_SIZE = 65507; // UDP最大包大小
|
private static final int THREAD_POOL_SIZE = 100; // 线程池大小
|
|
private static final AtomicInteger RECEIVED_PACKET_COUNTER = new AtomicInteger(0);
|
|
private static volatile Thread serverThread;
|
|
/**
|
* 启动 UDP 服务端监听线程,线程会常驻运行直至进程结束或手动中断。
|
*
|
* @return 正在运行的监听线程
|
*/
|
public static synchronized Thread startAsync() {
|
if (serverThread != null && serverThread.isAlive()) {
|
return serverThread;
|
}
|
|
Thread thread = new Thread(UDPServer::runServerLoop, "UDPServer-Listener");
|
thread.setDaemon(false); // 保持 JVM 持续存活
|
thread.start();
|
serverThread = thread;
|
return thread;
|
}
|
|
/**
|
* 在当前线程中直接运行 UDP 服务端监听。
|
*/
|
public static void startSync() {
|
runServerLoop();
|
}
|
|
/**
|
* 监听并处理 UDP 数据包。
|
*/
|
private static void runServerLoop() {
|
ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
|
|
try (DatagramSocket socket = new DatagramSocket(PORT)) {
|
System.out.println("UDPServer started on port " + PORT);
|
|
while (!Thread.currentThread().isInterrupted()) {
|
byte[] buffer = new byte[BUFFER_SIZE];
|
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
|
|
try {
|
socket.receive(packet);
|
executor.submit(new PacketProcessor(packet));
|
} catch (IOException e) {
|
if (!socket.isClosed()) {
|
System.err.println("Error receiving packet: " + e.getMessage());
|
}
|
}
|
}
|
} catch (SocketException e) {
|
System.err.println("Could not start server: " + e.getMessage());
|
} finally {
|
executor.shutdownNow();
|
}
|
}
|
|
public static void processIncomingMessage(String message) {
|
String[] fields = message.split(",");
|
// 检查字段数量是否完整
|
if (fields.length != 21) {
|
System.err.println("Invalid message format, expected 21 fields but got " + fields.length);
|
return;
|
}
|
|
// 检查包头是否正确
|
if (!fields[0].equals("$GNGGA")) {
|
System.err.println("Invalid message header: " + fields[0]);
|
return;
|
}
|
int sequence = incrementReceivedPacketCounter();
|
System.out.println("收到了差分数据(" + sequence + "):" + message);
|
Coordinate.parseGNGGAToCoordinateList(message);
|
int count = Coordinate.coordinates.size();
|
System.out.println("savenum:" + count);
|
|
Device.updateFromGNGGA(message, fields[15]);
|
}
|
|
/**处理串口接收到的数据*/
|
public static void processSerialData(String message) {
|
String[] fields = message.split(",");
|
// 检查字段数量是否完整
|
if (fields.length < 15) {
|
System.err.println("Invalid serial GNGGA format, expected at least 15 fields but got " + fields.length);
|
return;
|
}
|
|
// 检查包头是否正确
|
if (!fields[0].equals("$GNGGA")) {
|
System.err.println("Invalid message header: " + fields[0]);
|
return;
|
}
|
int sequence = incrementReceivedPacketCounter();
|
System.out.println("收到了串口数据(" + sequence + "):" + message);
|
Coordinate.dellchuankougngga(message);
|
int count = Coordinate.coordinates.size();
|
System.out.println("savenum:" + count);
|
|
Device.updateFromSerialGNGGA(message);
|
}
|
|
private static int incrementReceivedPacketCounter() {
|
return RECEIVED_PACKET_COUNTER.updateAndGet(current -> {
|
int next = current + 1;
|
if (next > 10000 || next <= 0) {
|
next = 1;
|
}
|
return next;
|
});
|
}
|
|
public static int getReceivedPacketCount() {
|
return RECEIVED_PACKET_COUNTER.get();
|
}
|
|
private static class PacketProcessor implements Runnable {
|
private final DatagramPacket packet;
|
|
public PacketProcessor(DatagramPacket packet) {
|
this.packet = packet;
|
}
|
|
@Override
|
public void run() {
|
String receivedData = new String(packet.getData(), 0, packet.getLength());
|
// 处理可能的连包情况
|
String[] messages = receivedData.split("\\$");
|
|
for (String message : messages) {
|
if (message.isEmpty()) {
|
continue;
|
}
|
// 重新添加$符号以便统一处理
|
String fullMessage = "$" + message;
|
processIncomingMessage(fullMessage);
|
}
|
}
|
}
|
}
|