package chuankou;
|
|
import udpdell.UDPServer;
|
import java.nio.charset.StandardCharsets;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.function.Consumer;
|
|
/**
|
* 串口实时数据调度中心。
|
* <p>
|
* 负责接收 {@link SerialPortService} 捕获的原始字节流、
|
* 聚合为完整的文本行并向已注册的监听器分发,同时提供对
|
* GNGGA 数据的内置解析支持,复用 UDP 处理逻辑。
|
*/
|
public final class dellmessage {
|
private static final CopyOnWriteArrayList<Consumer<byte[]>> RAW_CONSUMERS = new CopyOnWriteArrayList<>();
|
private static final CopyOnWriteArrayList<Consumer<String>> LINE_CONSUMERS = new CopyOnWriteArrayList<>();
|
private static final StringBuilder LINE_BUFFER = new StringBuilder(512);
|
private static final AtomicInteger LINE_COUNTER = new AtomicInteger(0);
|
private static final AtomicReference<String> LAST_LINE = new AtomicReference<>("");
|
|
private dellmessage() {
|
// utility
|
}
|
|
/**
|
* 注册原始字节监听器。
|
*
|
* @param consumer 接收完整数据帧的监听器
|
*/
|
public static void registerRawListener(Consumer<byte[]> consumer) {
|
// 用法:在需要直接处理原始串口字节流的模块启动时调用,传入回调处理数据帧。
|
if (consumer != null) {
|
RAW_CONSUMERS.addIfAbsent(consumer);
|
}
|
}
|
|
/**
|
* 注销原始字节监听器。
|
*/
|
public static void unregisterRawListener(Consumer<byte[]> consumer) {
|
// 用法:模块销毁或不再需要接收原始数据时调用,避免内存泄漏。
|
if (consumer != null) {
|
RAW_CONSUMERS.remove(consumer);
|
}
|
}
|
|
/**
|
* 注册文本行监听器。
|
* <p>
|
* 每一条经由换行符截断的完整文本行将触发一次回调。
|
*/
|
public static void registerLineListener(Consumer<String> consumer) {
|
// 用法:需要按行读取串口文本(如 NMEA 报文)时调用,回调拿到完整文本行。
|
if (consumer != null) {
|
LINE_CONSUMERS.addIfAbsent(consumer);
|
}
|
}
|
|
/**
|
* 注销文本行监听器。
|
*/
|
public static void unregisterLineListener(Consumer<String> consumer) {
|
// 用法:对应 registerLineListener 的反注册操作,通常在窗口关闭或服务停止时调用。
|
if (consumer != null) {
|
LINE_CONSUMERS.remove(consumer);
|
}
|
}
|
|
/**
|
* 串口捕获线程收到数据后调用此方法。
|
*
|
* @param data 完整的串口数据帧
|
*/
|
public static void handleIncomingBytes(byte[] data) {
|
// 用法:由串口读取线程(SerialPortService)在获取完整数据帧后直接调用。
|
if (data == null || data.length == 0) {
|
return;
|
}
|
|
notifyRawConsumers(data);
|
|
String ascii = new String(data, StandardCharsets.UTF_8);
|
if (ascii.isEmpty()) {
|
return;
|
}
|
|
synchronized (LINE_BUFFER) {
|
LINE_BUFFER.append(ascii);
|
String line;
|
while ((line = pollNextLine()) != null) {
|
dispatchLine(line);
|
}
|
}
|
}
|
|
/**
|
* 获取当前累计处理的文本行数量。
|
*/
|
public static int getProcessedLineCount() {
|
// 用法:用于显示或监控当前已处理文本行的数量,例如调试界面统计信息。
|
return LINE_COUNTER.get();
|
}
|
|
/**
|
* 获取最近一次解析出的完整行。
|
*/
|
public static String getLastLine() {
|
// 用法:便捷获取最近一次解析的完整文本行,可用于 UI 显示或调试记录。
|
return LAST_LINE.get();
|
}
|
|
/**
|
* 清空内部缓存。
|
*/
|
public static void clearBuffer() {
|
// 用法:在需要重置解析状态时调用,例如断开串口或重新连接前清除缓存。
|
synchronized (LINE_BUFFER) {
|
LINE_BUFFER.setLength(0);
|
}
|
LAST_LINE.set("");
|
LINE_COUNTER.set(0);
|
}
|
|
private static void notifyRawConsumers(byte[] data) {
|
for (Consumer<byte[]> consumer : RAW_CONSUMERS) {
|
try {
|
consumer.accept(data);
|
} catch (Exception ex) {
|
logConsumerFailure(ex);
|
}
|
}
|
}
|
|
private static String pollNextLine() {
|
while (true) {
|
int length = LINE_BUFFER.length();
|
if (length == 0) {
|
return null;
|
}
|
|
int lineEnd = findLineBreakIndex();
|
if (lineEnd >= 0) {
|
String line = LINE_BUFFER.substring(0, lineEnd);
|
int skip = calculateSkipLength(lineEnd, length);
|
LINE_BUFFER.delete(0, lineEnd + skip);
|
return line;
|
}
|
|
int firstDollar = LINE_BUFFER.indexOf("$");
|
if (firstDollar >= 0) {
|
if (firstDollar > 0) {
|
String prefix = LINE_BUFFER.substring(0, firstDollar);
|
LINE_BUFFER.delete(0, firstDollar);
|
if (!prefix.trim().isEmpty()) {
|
return prefix;
|
}
|
continue;
|
}
|
|
int nextDollar = LINE_BUFFER.indexOf("$", firstDollar + 1);
|
if (nextDollar > 0) {
|
String message = LINE_BUFFER.substring(firstDollar, nextDollar);
|
LINE_BUFFER.delete(0, nextDollar);
|
return message;
|
}
|
}
|
|
ensureBufferCapacity();
|
return null;
|
}
|
}
|
|
private static int findLineBreakIndex() {
|
for (int i = 0; i < LINE_BUFFER.length(); i++) {
|
char ch = LINE_BUFFER.charAt(i);
|
if (ch == '\n' || ch == '\r') {
|
return i;
|
}
|
}
|
return -1;
|
}
|
|
private static int calculateSkipLength(int breakIndex, int currentLength) {
|
if (breakIndex >= currentLength) {
|
return 0;
|
}
|
char separator = LINE_BUFFER.charAt(breakIndex);
|
if (separator == '\r') {
|
if (breakIndex + 1 < currentLength && LINE_BUFFER.charAt(breakIndex + 1) == '\n') {
|
return 2;
|
}
|
return 1;
|
}
|
// separator == '\n'
|
if (breakIndex + 1 < currentLength && LINE_BUFFER.charAt(breakIndex + 1) == '\r') {
|
return 2;
|
}
|
return 1;
|
}
|
|
private static void ensureBufferCapacity() {
|
final int maxCapacity = 4096;
|
if (LINE_BUFFER.length() > maxCapacity) {
|
LINE_BUFFER.delete(0, LINE_BUFFER.length() - maxCapacity);
|
}
|
}
|
|
private static void dispatchLine(String rawLine) {
|
// System.out.println("处理收到的数据: " + rawLine);
|
|
if (rawLine == null) {
|
return;
|
}
|
String line = rawLine.trim();
|
if (line.isEmpty()) {
|
return;
|
}
|
|
LAST_LINE.set(line);
|
LINE_COUNTER.updateAndGet(count -> count >= 10000 ? 1 : count + 1);
|
|
for (Consumer<String> consumer : LINE_CONSUMERS) {
|
try {
|
consumer.accept(line);
|
} catch (Exception ex) {
|
logConsumerFailure(ex);
|
}
|
}
|
|
if (line.startsWith("$GNGGA")) {
|
try {
|
UDPServer.processSerialData(line);
|
} catch (Exception ex) {
|
System.err.println("dellmessage GNGGA parse error: " + ex.getMessage());
|
}
|
}
|
}
|
|
private static void logConsumerFailure(Exception ex) {
|
System.err.println("dellmessage listener exception: " + ex.getMessage());
|
}
|
}
|