826220679@qq.com
3 天以前 96f9630247478ee09dace5786ebfe46a54a6f2c0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
package chuankou;
 
import udpdell.UDPServer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
 
/**
 * 串口实时数据调度中心。
 * <p>
 * 负责接收 {@link SerialPortService} 捕获的原始字节流、
 * 聚合为完整的文本行并向已注册的监听器分发,同时提供对
 * GNGGA 数据的内置解析支持,复用 UDP 处理逻辑。
 */
public final class dellmessage {
    private static final CopyOnWriteArrayList<Consumer<byte[]>> RAW_CONSUMERS = new CopyOnWriteArrayList<>();
    private static final CopyOnWriteArrayList<Consumer<String>> LINE_CONSUMERS = new CopyOnWriteArrayList<>();
    private static final StringBuilder LINE_BUFFER = new StringBuilder(512);
    private static final AtomicInteger LINE_COUNTER = new AtomicInteger(0);
    private static final AtomicReference<String> LAST_LINE = new AtomicReference<>("");
 
    private dellmessage() {
        // utility
    }
 
    /**
     * 注册原始字节监听器。
     *
     * @param consumer 接收完整数据帧的监听器
     */
    public static void registerRawListener(Consumer<byte[]> consumer) {
        // 用法:在需要直接处理原始串口字节流的模块启动时调用,传入回调处理数据帧。
        if (consumer != null) {
            RAW_CONSUMERS.addIfAbsent(consumer);
        }
    }
 
    /**
     * 注销原始字节监听器。
     */
    public static void unregisterRawListener(Consumer<byte[]> consumer) {
        // 用法:模块销毁或不再需要接收原始数据时调用,避免内存泄漏。
        if (consumer != null) {
            RAW_CONSUMERS.remove(consumer);
        }
    }
 
    /**
     * 注册文本行监听器。
     * <p>
     * 每一条经由换行符截断的完整文本行将触发一次回调。
     */
    public static void registerLineListener(Consumer<String> consumer) {
        // 用法:需要按行读取串口文本(如 NMEA 报文)时调用,回调拿到完整文本行。
        if (consumer != null) {
            LINE_CONSUMERS.addIfAbsent(consumer);
        }
    }
 
    /**
     * 注销文本行监听器。
     */
    public static void unregisterLineListener(Consumer<String> consumer) {
        // 用法:对应 registerLineListener 的反注册操作,通常在窗口关闭或服务停止时调用。
        if (consumer != null) {
            LINE_CONSUMERS.remove(consumer);
        }
    }
 
    /**
     * 串口捕获线程收到数据后调用此方法。
     *
     * @param data 完整的串口数据帧
     */
    public static void handleIncomingBytes(byte[] data) {
        // 用法:由串口读取线程(SerialPortService)在获取完整数据帧后直接调用。
        if (data == null || data.length == 0) {
            return;
        }
 
        notifyRawConsumers(data);
 
        String ascii = new String(data, StandardCharsets.UTF_8);
        if (ascii.isEmpty()) {
            return;
        }
 
        synchronized (LINE_BUFFER) {
            LINE_BUFFER.append(ascii);
            String line;
            while ((line = pollNextLine()) != null) {
                dispatchLine(line);
            }
        }
    }
 
    /**
     * 获取当前累计处理的文本行数量。
     */
    public static int getProcessedLineCount() {
        // 用法:用于显示或监控当前已处理文本行的数量,例如调试界面统计信息。
        return LINE_COUNTER.get();
    }
 
    /**
     * 获取最近一次解析出的完整行。
     */
    public static String getLastLine() {
        // 用法:便捷获取最近一次解析的完整文本行,可用于 UI 显示或调试记录。
        return LAST_LINE.get();
    }
 
    /**
     * 清空内部缓存。
     */
    public static void clearBuffer() {
        // 用法:在需要重置解析状态时调用,例如断开串口或重新连接前清除缓存。
        synchronized (LINE_BUFFER) {
            LINE_BUFFER.setLength(0);
        }
        LAST_LINE.set("");
        LINE_COUNTER.set(0);
    }
 
    private static void notifyRawConsumers(byte[] data) {
        for (Consumer<byte[]> consumer : RAW_CONSUMERS) {
            try {
                consumer.accept(data);
            } catch (Exception ex) {
                logConsumerFailure(ex);
            }
        }
    }
 
    private static String pollNextLine() {
        while (true) {
            int length = LINE_BUFFER.length();
            if (length == 0) {
                return null;
            }
 
            int lineEnd = findLineBreakIndex();
            if (lineEnd >= 0) {
                String line = LINE_BUFFER.substring(0, lineEnd);
                int skip = calculateSkipLength(lineEnd, length);
                LINE_BUFFER.delete(0, lineEnd + skip);
                return line;
            }
 
            int firstDollar = LINE_BUFFER.indexOf("$");
            if (firstDollar >= 0) {
                if (firstDollar > 0) {
                    String prefix = LINE_BUFFER.substring(0, firstDollar);
                    LINE_BUFFER.delete(0, firstDollar);
                    if (!prefix.trim().isEmpty()) {
                        return prefix;
                    }
                    continue;
                }
 
                int nextDollar = LINE_BUFFER.indexOf("$", firstDollar + 1);
                if (nextDollar > 0) {
                    String message = LINE_BUFFER.substring(firstDollar, nextDollar);
                    LINE_BUFFER.delete(0, nextDollar);
                    return message;
                }
            }
 
            ensureBufferCapacity();
            return null;
        }
    }
 
    private static int findLineBreakIndex() {
        for (int i = 0; i < LINE_BUFFER.length(); i++) {
            char ch = LINE_BUFFER.charAt(i);
            if (ch == '\n' || ch == '\r') {
                return i;
            }
        }
        return -1;
    }
 
    private static int calculateSkipLength(int breakIndex, int currentLength) {
        if (breakIndex >= currentLength) {
            return 0;
        }
        char separator = LINE_BUFFER.charAt(breakIndex);
        if (separator == '\r') {
            if (breakIndex + 1 < currentLength && LINE_BUFFER.charAt(breakIndex + 1) == '\n') {
                return 2;
            }
            return 1;
        }
        // separator == '\n'
        if (breakIndex + 1 < currentLength && LINE_BUFFER.charAt(breakIndex + 1) == '\r') {
            return 2;
        }
        return 1;
    }
 
    private static void ensureBufferCapacity() {
        final int maxCapacity = 4096;
        if (LINE_BUFFER.length() > maxCapacity) {
            LINE_BUFFER.delete(0, LINE_BUFFER.length() - maxCapacity);
        }
    }
 
    private static void dispatchLine(String rawLine) {
//       System.out.println("处理收到的数据: " + rawLine);
        
        if (rawLine == null) {
            return;
        }
        String line = rawLine.trim();
        if (line.isEmpty()) {
            return;
        }
 
        LAST_LINE.set(line);
        LINE_COUNTER.updateAndGet(count -> count >= 10000 ? 1 : count + 1);
 
        for (Consumer<String> consumer : LINE_CONSUMERS) {
            try {
                consumer.accept(line);
            } catch (Exception ex) {
                logConsumerFailure(ex);
            }
        }
 
        if (line.startsWith("$GNGGA")) {
            try {
                UDPServer.processSerialData(line);
            } catch (Exception ex) {
                System.err.println("dellmessage GNGGA parse error: " + ex.getMessage());
            }
        }
    }
 
    private static void logConsumerFailure(Exception ex) {
        System.err.println("dellmessage listener exception: " + ex.getMessage());
    }
}