package chuankou;
import udpdell.UDPServer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
/**
* 串口实时数据调度中心。
*
* 负责接收 {@link SerialPortService} 捕获的原始字节流、
* 聚合为完整的文本行并向已注册的监听器分发,同时提供对
* GNGGA 数据的内置解析支持,复用 UDP 处理逻辑。
*/
public final class dellmessage {
private static final CopyOnWriteArrayList> RAW_CONSUMERS = new CopyOnWriteArrayList<>();
private static final CopyOnWriteArrayList> LINE_CONSUMERS = new CopyOnWriteArrayList<>();
private static final StringBuilder LINE_BUFFER = new StringBuilder(512);
private static final AtomicInteger LINE_COUNTER = new AtomicInteger(0);
private static final AtomicReference LAST_LINE = new AtomicReference<>("");
private dellmessage() {
// utility
}
/**
* 注册原始字节监听器。
*
* @param consumer 接收完整数据帧的监听器
*/
public static void registerRawListener(Consumer consumer) {
// 用法:在需要直接处理原始串口字节流的模块启动时调用,传入回调处理数据帧。
if (consumer != null) {
RAW_CONSUMERS.addIfAbsent(consumer);
}
}
/**
* 注销原始字节监听器。
*/
public static void unregisterRawListener(Consumer consumer) {
// 用法:模块销毁或不再需要接收原始数据时调用,避免内存泄漏。
if (consumer != null) {
RAW_CONSUMERS.remove(consumer);
}
}
/**
* 注册文本行监听器。
*
* 每一条经由换行符截断的完整文本行将触发一次回调。
*/
public static void registerLineListener(Consumer consumer) {
// 用法:需要按行读取串口文本(如 NMEA 报文)时调用,回调拿到完整文本行。
if (consumer != null) {
LINE_CONSUMERS.addIfAbsent(consumer);
}
}
/**
* 注销文本行监听器。
*/
public static void unregisterLineListener(Consumer consumer) {
// 用法:对应 registerLineListener 的反注册操作,通常在窗口关闭或服务停止时调用。
if (consumer != null) {
LINE_CONSUMERS.remove(consumer);
}
}
/**
* 串口捕获线程收到数据后调用此方法。
*
* @param data 完整的串口数据帧
*/
public static void handleIncomingBytes(byte[] data) {
// 用法:由串口读取线程(SerialPortService)在获取完整数据帧后直接调用。
if (data == null || data.length == 0) {
return;
}
notifyRawConsumers(data);
String ascii = new String(data, StandardCharsets.UTF_8);
if (ascii.isEmpty()) {
return;
}
synchronized (LINE_BUFFER) {
LINE_BUFFER.append(ascii);
String line;
while ((line = pollNextLine()) != null) {
dispatchLine(line);
}
}
}
/**
* 获取当前累计处理的文本行数量。
*/
public static int getProcessedLineCount() {
// 用法:用于显示或监控当前已处理文本行的数量,例如调试界面统计信息。
return LINE_COUNTER.get();
}
/**
* 获取最近一次解析出的完整行。
*/
public static String getLastLine() {
// 用法:便捷获取最近一次解析的完整文本行,可用于 UI 显示或调试记录。
return LAST_LINE.get();
}
/**
* 清空内部缓存。
*/
public static void clearBuffer() {
// 用法:在需要重置解析状态时调用,例如断开串口或重新连接前清除缓存。
synchronized (LINE_BUFFER) {
LINE_BUFFER.setLength(0);
}
LAST_LINE.set("");
LINE_COUNTER.set(0);
}
private static void notifyRawConsumers(byte[] data) {
for (Consumer consumer : RAW_CONSUMERS) {
try {
consumer.accept(data);
} catch (Exception ex) {
logConsumerFailure(ex);
}
}
}
private static String pollNextLine() {
while (true) {
int length = LINE_BUFFER.length();
if (length == 0) {
return null;
}
int lineEnd = findLineBreakIndex();
if (lineEnd >= 0) {
String line = LINE_BUFFER.substring(0, lineEnd);
int skip = calculateSkipLength(lineEnd, length);
LINE_BUFFER.delete(0, lineEnd + skip);
return line;
}
int firstDollar = LINE_BUFFER.indexOf("$");
if (firstDollar >= 0) {
if (firstDollar > 0) {
String prefix = LINE_BUFFER.substring(0, firstDollar);
LINE_BUFFER.delete(0, firstDollar);
if (!prefix.trim().isEmpty()) {
return prefix;
}
continue;
}
int nextDollar = LINE_BUFFER.indexOf("$", firstDollar + 1);
if (nextDollar > 0) {
String message = LINE_BUFFER.substring(firstDollar, nextDollar);
LINE_BUFFER.delete(0, nextDollar);
return message;
}
}
ensureBufferCapacity();
return null;
}
}
private static int findLineBreakIndex() {
for (int i = 0; i < LINE_BUFFER.length(); i++) {
char ch = LINE_BUFFER.charAt(i);
if (ch == '\n' || ch == '\r') {
return i;
}
}
return -1;
}
private static int calculateSkipLength(int breakIndex, int currentLength) {
if (breakIndex >= currentLength) {
return 0;
}
char separator = LINE_BUFFER.charAt(breakIndex);
if (separator == '\r') {
if (breakIndex + 1 < currentLength && LINE_BUFFER.charAt(breakIndex + 1) == '\n') {
return 2;
}
return 1;
}
// separator == '\n'
if (breakIndex + 1 < currentLength && LINE_BUFFER.charAt(breakIndex + 1) == '\r') {
return 2;
}
return 1;
}
private static void ensureBufferCapacity() {
final int maxCapacity = 4096;
if (LINE_BUFFER.length() > maxCapacity) {
LINE_BUFFER.delete(0, LINE_BUFFER.length() - maxCapacity);
}
}
private static void dispatchLine(String rawLine) {
// System.out.println("处理收到的数据: " + rawLine);
if (rawLine == null) {
return;
}
String line = rawLine.trim();
if (line.isEmpty()) {
return;
}
LAST_LINE.set(line);
LINE_COUNTER.updateAndGet(count -> count >= 10000 ? 1 : count + 1);
for (Consumer consumer : LINE_CONSUMERS) {
try {
consumer.accept(line);
} catch (Exception ex) {
logConsumerFailure(ex);
}
}
if (line.startsWith("$GNGGA")) {
try {
UDPServer.processSerialData(line);
} catch (Exception ex) {
System.err.println("dellmessage GNGGA parse error: " + ex.getMessage());
}
}
}
private static void logConsumerFailure(Exception ex) {
System.err.println("dellmessage listener exception: " + ex.getMessage());
}
}