package chuankou; import udpdell.UDPServer; import java.nio.charset.StandardCharsets; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; /** * 串口实时数据调度中心。 *

* 负责接收 {@link SerialPortService} 捕获的原始字节流、 * 聚合为完整的文本行并向已注册的监听器分发,同时提供对 * GNGGA 数据的内置解析支持,复用 UDP 处理逻辑。 */ public final class dellmessage { private static final CopyOnWriteArrayList> RAW_CONSUMERS = new CopyOnWriteArrayList<>(); private static final CopyOnWriteArrayList> LINE_CONSUMERS = new CopyOnWriteArrayList<>(); private static final StringBuilder LINE_BUFFER = new StringBuilder(512); private static final AtomicInteger LINE_COUNTER = new AtomicInteger(0); private static final AtomicReference LAST_LINE = new AtomicReference<>(""); private dellmessage() { // utility } /** * 注册原始字节监听器。 * * @param consumer 接收完整数据帧的监听器 */ public static void registerRawListener(Consumer consumer) { // 用法:在需要直接处理原始串口字节流的模块启动时调用,传入回调处理数据帧。 if (consumer != null) { RAW_CONSUMERS.addIfAbsent(consumer); } } /** * 注销原始字节监听器。 */ public static void unregisterRawListener(Consumer consumer) { // 用法:模块销毁或不再需要接收原始数据时调用,避免内存泄漏。 if (consumer != null) { RAW_CONSUMERS.remove(consumer); } } /** * 注册文本行监听器。 *

* 每一条经由换行符截断的完整文本行将触发一次回调。 */ public static void registerLineListener(Consumer consumer) { // 用法:需要按行读取串口文本(如 NMEA 报文)时调用,回调拿到完整文本行。 if (consumer != null) { LINE_CONSUMERS.addIfAbsent(consumer); } } /** * 注销文本行监听器。 */ public static void unregisterLineListener(Consumer consumer) { // 用法:对应 registerLineListener 的反注册操作,通常在窗口关闭或服务停止时调用。 if (consumer != null) { LINE_CONSUMERS.remove(consumer); } } /** * 串口捕获线程收到数据后调用此方法。 * * @param data 完整的串口数据帧 */ public static void handleIncomingBytes(byte[] data) { // 用法:由串口读取线程(SerialPortService)在获取完整数据帧后直接调用。 if (data == null || data.length == 0) { return; } notifyRawConsumers(data); String ascii = new String(data, StandardCharsets.UTF_8); if (ascii.isEmpty()) { return; } synchronized (LINE_BUFFER) { LINE_BUFFER.append(ascii); String line; while ((line = pollNextLine()) != null) { dispatchLine(line); } } } /** * 获取当前累计处理的文本行数量。 */ public static int getProcessedLineCount() { // 用法:用于显示或监控当前已处理文本行的数量,例如调试界面统计信息。 return LINE_COUNTER.get(); } /** * 获取最近一次解析出的完整行。 */ public static String getLastLine() { // 用法:便捷获取最近一次解析的完整文本行,可用于 UI 显示或调试记录。 return LAST_LINE.get(); } /** * 清空内部缓存。 */ public static void clearBuffer() { // 用法:在需要重置解析状态时调用,例如断开串口或重新连接前清除缓存。 synchronized (LINE_BUFFER) { LINE_BUFFER.setLength(0); } LAST_LINE.set(""); LINE_COUNTER.set(0); } private static void notifyRawConsumers(byte[] data) { for (Consumer consumer : RAW_CONSUMERS) { try { consumer.accept(data); } catch (Exception ex) { logConsumerFailure(ex); } } } private static String pollNextLine() { while (true) { int length = LINE_BUFFER.length(); if (length == 0) { return null; } int lineEnd = findLineBreakIndex(); if (lineEnd >= 0) { String line = LINE_BUFFER.substring(0, lineEnd); int skip = calculateSkipLength(lineEnd, length); LINE_BUFFER.delete(0, lineEnd + skip); return line; } int firstDollar = LINE_BUFFER.indexOf("$"); if (firstDollar >= 0) { if (firstDollar > 0) { String prefix = LINE_BUFFER.substring(0, firstDollar); LINE_BUFFER.delete(0, firstDollar); if (!prefix.trim().isEmpty()) { return prefix; } continue; } int nextDollar = LINE_BUFFER.indexOf("$", firstDollar + 1); if (nextDollar > 0) { String message = LINE_BUFFER.substring(firstDollar, nextDollar); LINE_BUFFER.delete(0, nextDollar); return message; } } ensureBufferCapacity(); return null; } } private static int findLineBreakIndex() { for (int i = 0; i < LINE_BUFFER.length(); i++) { char ch = LINE_BUFFER.charAt(i); if (ch == '\n' || ch == '\r') { return i; } } return -1; } private static int calculateSkipLength(int breakIndex, int currentLength) { if (breakIndex >= currentLength) { return 0; } char separator = LINE_BUFFER.charAt(breakIndex); if (separator == '\r') { if (breakIndex + 1 < currentLength && LINE_BUFFER.charAt(breakIndex + 1) == '\n') { return 2; } return 1; } // separator == '\n' if (breakIndex + 1 < currentLength && LINE_BUFFER.charAt(breakIndex + 1) == '\r') { return 2; } return 1; } private static void ensureBufferCapacity() { final int maxCapacity = 4096; if (LINE_BUFFER.length() > maxCapacity) { LINE_BUFFER.delete(0, LINE_BUFFER.length() - maxCapacity); } } private static void dispatchLine(String rawLine) { // System.out.println("处理收到的数据: " + rawLine); if (rawLine == null) { return; } String line = rawLine.trim(); if (line.isEmpty()) { return; } LAST_LINE.set(line); LINE_COUNTER.updateAndGet(count -> count >= 10000 ? 1 : count + 1); for (Consumer consumer : LINE_CONSUMERS) { try { consumer.accept(line); } catch (Exception ex) { logConsumerFailure(ex); } } if (line.startsWith("$GNGGA")) { try { UDPServer.processSerialData(line); } catch (Exception ex) { System.err.println("dellmessage GNGGA parse error: " + ex.getMessage()); } } } private static void logConsumerFailure(Exception ex) { System.err.println("dellmessage listener exception: " + ex.getMessage()); } }