张世豪
7 小时以前 5ae9bbe3583384afab8eb95a134ccb74aee6487a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
package Mqttmessage;
 
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
import set.Setsys;
import user.Usrdell;
 
/**
 * MQTT客户端工具类
 * 用于连接MQTT服务器并订阅主题
 */
public class Client {
    
    private String host;
    private String topic;
    private String clientId;
    private MqttClient client;
    private MqttConnectOptions options;
    
    // 静态变量用于存储客户端实例
    private static Client gpsClient;
    private static Client responseClient;
    
    /**
     * 构造函数
     * @param host MQTT服务器地址,格式:tcp://ip:port
     * @param topic 订阅的主题
     * @param clientId 客户端ID,不能重复
     */
    public Client(String host, String topic, String clientId) {
        this.host = host;
        this.topic = topic;
        this.clientId = clientId;
        this.options = new MqttConnectOptions();
        this.options.setCleanSession(true);
        // 设置连接超时时间(秒)
        this.options.setConnectionTimeout(30);
        // 设置KeepAlive间隔(秒),用于保持连接活跃
        this.options.setKeepAliveInterval(60);
        // 设置自动重连
        this.options.setAutomaticReconnect(true);
        // 设置MQTT版本,使用3.1.1
        this.options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
    }
    
    /**
     * 连接到MQTT服务器
     * @throws MqttException 连接失败时抛出异常
     */
    public void connect() throws MqttException {
        if (client != null && client.isConnected()) {
            System.out.println("MQTT客户端已连接,ClientId: " + clientId);
            return;
        }
        
        // 如果客户端已存在但未连接,先关闭
        if (client != null) {
            try {
                client.close();
            } catch (Exception e) {
                // 忽略关闭时的异常
            }
            client = null;
        }
        
        // 使用内存持久化,避免文件锁定问题
        client = new MqttClient(host, clientId, new MemoryPersistence());
        // 先设置回调,再连接
        client.setCallback(new PushCallback());
        
        // 执行连接
        client.connect(options);
        System.out.println("MQTT连接成功!ClientId: " + clientId + ", 服务器: " + host + ", 主题: " + topic);
    }
    
    /**
     * 订阅主题
     * @param qos 服务质量等级,0-2
     * @throws MqttException 订阅失败时抛出异常
     */
    public void subscribe(int qos) throws MqttException {
        if (client == null || !client.isConnected()) {
            connect();
        }
        client.subscribe(topic, qos);
    }
    
    /**
     * 订阅主题(默认QoS为2)
     * @throws MqttException 订阅失败时抛出异常
     */
    public void subscribe() throws MqttException {
        subscribe(2);
    }
    
    /**
     * 断开连接
     * @throws MqttException 断开连接失败时抛出异常
     */
    public void disconnect() throws MqttException {
        if (client != null && client.isConnected()) {
            client.disconnect();
        }
    }
    
    /**
     * 关闭客户端并释放资源
     */
    public void close() {
        try {
            if (client != null) {
                if (client.isConnected()) {
                    client.disconnect();
                }
                client.close();
                client = null;
            }
        } catch (Exception e) {
            // 忽略关闭时的异常
        }
    }
    
    /**
     * 检查是否已连接
     * @return true表示已连接,false表示未连接
     */
    public boolean isConnected() {
        return client != null && client.isConnected();
    }
    
    /**
     * 获取MQTT客户端实例
     * @return MqttClient实例
     */
    public MqttClient getClient() {
        return client;
    }
    
    /**
     * 连接MQTT服务器的工具方法
     * 供其他类直接调用,连接GPS主题和响应主题
     * @return true表示连接成功,false表示连接失败
     */
    public static boolean connectMQTT() {
        // 先断开之前的连接
        disconnectAll();
        
        boolean gpsSuccess = false;
        boolean responseSuccess = false;
        
        try {
            String host = "tcp://39.99.43.227:1883";
            String deiveID = Setsys.getMowerIdValue();
            // 添加时间戳确保客户端ID唯一
            long timestamp = System.currentTimeMillis();
            String clientId = Usrdell.getUserEmail() + "mower" + "_" + timestamp;
            String clientId2 = Usrdell.getUserEmail() + "response" + "_" + timestamp;
            String topic = "mower/" + deiveID + "/gps";
            String topic2 = "mower/" + deiveID + "/response";
            
            // 连接GPS主题
            try {
                gpsClient = new Client(host, topic, clientId);
                gpsClient.connect();
                // 稍作延迟,确保连接稳定
                Thread.sleep(100);
                gpsClient.subscribe();
                gpsSuccess = true;
                System.out.println("GPS主题MQTT连接并订阅成功");
            } catch (MqttException e) {
                System.err.println("GPS主题MQTT连接失败: " + e.getMessage());
                if (e.getCause() != null) {
                    System.err.println("失败原因: " + e.getCause().getMessage());
                }
                e.printStackTrace();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("连接过程被中断");
            }
 
            // 连接响应主题
            try {
                responseClient = new Client(host, topic2, clientId2);
                responseClient.connect();
                // 稍作延迟,确保连接稳定
                Thread.sleep(100);
                responseClient.subscribe();
                responseSuccess = true;
                System.out.println("响应主题MQTT连接并订阅成功");
            } catch (MqttException e) {
                System.err.println("响应主题MQTT连接失败: " + e.getMessage());
                if (e.getCause() != null) {
                    System.err.println("失败原因: " + e.getCause().getMessage());
                }
                e.printStackTrace();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("连接过程被中断");
            }
 
            if (gpsSuccess && responseSuccess) {
                System.out.println("所有MQTT主题连接并订阅成功!");
                return true;
            } else if (gpsSuccess || responseSuccess) {
                System.out.println("部分MQTT主题连接成功");
                return true;
            } else {
                System.err.println("所有MQTT主题连接失败");
                return false;
            }
        } catch (Exception e) {
            System.err.println("MQTT连接过程发生异常: " + e.getMessage());
            e.printStackTrace();
            return false;
        }
    }
    
    /**
     * 连接MQTT服务器的工具方法(带参数版本)
     * @param host MQTT服务器地址,格式:tcp://ip:port
     * @param deviceId 设备ID
     * @param userEmail 用户邮箱
     * @return true表示连接成功,false表示连接失败
     */
    public static boolean connectMQTT(String host, String deviceId, String userEmail) {
        // 先断开之前的连接
        disconnectAll();
        
        boolean gpsSuccess = false;
        boolean responseSuccess = false;
        
        try {
            // 添加时间戳确保客户端ID唯一
            long timestamp = System.currentTimeMillis();
            String clientId = userEmail + "mower" + "_" + timestamp;
            String clientId2 = userEmail + "response" + "_" + timestamp;
            String topic = "mower/" + deviceId + "/gps";
            String topic2 = "mower/" + deviceId + "/response";
            
            // 连接GPS主题
            try {
                gpsClient = new Client(host, topic, clientId);
                gpsClient.connect();
                // 稍作延迟,确保连接稳定
                Thread.sleep(100);
                gpsClient.subscribe();
                gpsSuccess = true;
                System.out.println("GPS主题MQTT连接并订阅成功");
            } catch (MqttException e) {
                System.err.println("GPS主题MQTT连接失败: " + e.getMessage());
                if (e.getCause() != null) {
                    System.err.println("失败原因: " + e.getCause().getMessage());
                }
                e.printStackTrace();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("连接过程被中断");
            }
 
            // 连接响应主题
            try {
                responseClient = new Client(host, topic2, clientId2);
                responseClient.connect();
                // 稍作延迟,确保连接稳定
                Thread.sleep(100);
                responseClient.subscribe();
                responseSuccess = true;
                System.out.println("响应主题MQTT连接并订阅成功");
            } catch (MqttException e) {
                System.err.println("响应主题MQTT连接失败: " + e.getMessage());
                if (e.getCause() != null) {
                    System.err.println("失败原因: " + e.getCause().getMessage());
                }
                e.printStackTrace();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("连接过程被中断");
            }
 
            if (gpsSuccess && responseSuccess) {
                System.out.println("所有MQTT主题连接并订阅成功!");
                return true;
            } else if (gpsSuccess || responseSuccess) {
                System.out.println("部分MQTT主题连接成功");
                return true;
            } else {
                System.err.println("所有MQTT主题连接失败");
                return false;
            }
        } catch (Exception e) {
            System.err.println("MQTT连接过程发生异常: " + e.getMessage());
            e.printStackTrace();
            return false;
        }
    }
    
    /**
     * 创建并连接MQTT客户端的工具方法
     * @param host MQTT服务器地址
     * @param topic 订阅主题
     * @param clientId 客户端ID
     * @param qos 服务质量等级,默认2
     * @return Client实例,连接失败返回null
     */
    public static Client createAndConnect(String host, String topic, String clientId, int qos) {
        try {
            Client mqttClient = new Client(host, topic, clientId);
            mqttClient.connect();
            mqttClient.subscribe(qos);
            System.out.println("MQTT客户端创建并订阅成功,主题: " + topic + ", ClientId: " + clientId);
            return mqttClient;
        } catch (MqttException e) {
            System.err.println("MQTT客户端创建失败: " + e.getMessage() + ", 主题: " + topic);
            e.printStackTrace();
            return null;
        }
    }
    
    /**
     * 创建并连接MQTT客户端的工具方法(默认QoS为2)
     * @param host MQTT服务器地址
     * @param topic 订阅主题
     * @param clientId 客户端ID
     * @return Client实例,连接失败返回null
     */
    public static Client createAndConnect(String host, String topic, String clientId) {
        return createAndConnect(host, topic, clientId, 2);
    }
    
    /**
     * 断开所有MQTT连接
     */
    public static void disconnectAll() {
        try {
            if (gpsClient != null) {
                gpsClient.close();
                System.out.println("GPS主题MQTT连接已断开");
                gpsClient = null;
            }
            if (responseClient != null) {
                responseClient.close();
                System.out.println("响应主题MQTT连接已断开");
                responseClient = null;
            }
        } catch (Exception e) {
            System.err.println("断开MQTT连接失败: " + e.getMessage());
            e.printStackTrace();
        }
    }
    
    /**
     * 获取GPS客户端实例
     * @return GPS客户端实例
     */
    public static Client getGpsClient() {
        return gpsClient;
    }
    
    /**
     * 获取响应客户端实例
     * @return 响应客户端实例
     */
    public static Client getResponseClient() {
        return responseClient;
    }
    
    /**
     * 检查MQTT连接状态(静态方法)
     * @return true表示已连接,false表示未连接
     */
    public static boolean areClientsConnected() {
        boolean gpsConnected = gpsClient != null && gpsClient.isConnected();
        boolean responseConnected = responseClient != null && responseClient.isConnected();
        return gpsConnected || responseConnected;
    }
    
    /**
     * 示例用法(保留向后兼容)
     * @deprecated 请使用 connectMQTT() 方法替代
     */
    @Deprecated
    public static void lianjiemqqt() {
        connectMQTT();
    }
}