张世豪
15 小时以前 2eea735fd4ddf0ae047687780271ef3962d256cc
src/Mqttmessage/Client.java
@@ -19,10 +19,15 @@
    private String clientId;
    private MqttClient client;
    private MqttConnectOptions options;
    private int qos = 0;  // 默认QoS等级
    private Thread connectionMonitorThread;  // 连接监控线程
    private volatile boolean monitoring = false;  // 监控标志
    
    // 静态变量用于存储客户端实例
    private static Client gpsClient;
    private static Client responseClient;
    // 重连标志,防止重复重连
    private static volatile boolean isReconnecting = false;
    
    /**
     * 构造函数
@@ -68,12 +73,15 @@
        
        // 使用内存持久化,避免文件锁定问题
        client = new MqttClient(host, clientId, new MemoryPersistence());
        // 先设置回调,再连接
        client.setCallback(new PushCallback());
        // 先设置回调,再连接(传入Client实例和订阅信息,用于自动重连后重新订阅)
        client.setCallback(new PushCallback(this, topic, qos));
        
        // 执行连接
        client.connect(options);
        System.out.println("MQTT连接成功!ClientId: " + clientId + ", 服务器: " + host + ", 主题: " + topic);
        // 启动连接监控线程
        startConnectionMonitor();
    }
    
    /**
@@ -82,10 +90,12 @@
     * @throws MqttException 订阅失败时抛出异常
     */
    public void subscribe(int qos) throws MqttException {
        this.qos = qos;  // 保存QoS值,用于重连后重新订阅
        if (client == null || !client.isConnected()) {
            connect();
        }
        client.subscribe(topic, qos);
        System.out.println("已订阅主题: " + topic + ", QoS: " + qos);
    }
    
    /**
@@ -93,7 +103,8 @@
     * @throws MqttException 订阅失败时抛出异常
     */
    public void subscribe() throws MqttException {
        subscribe(2);
        // 使用QoS 0确保消息可靠传递(至少一次,且仅一次)
        subscribe(0);
    }
    
    /**
@@ -107,9 +118,79 @@
    }
    
    /**
     * 启动连接监控线程,定期检查连接状态并在断开时尝试重连
     */
    private void startConnectionMonitor() {
        if (monitoring) {
            return;  // 已经在监控中
        }
        monitoring = true;
        connectionMonitorThread = new Thread(() -> {
            while (monitoring && client != null) {
                try {
                    Thread.sleep(5000);  // 每5秒检查一次
                    if (client != null && !client.isConnected()) {
                        System.out.println("检测到MQTT连接断开,尝试重连... ClientId: " + clientId);
                        try {
                            // 尝试重新连接
                            if (!client.isConnected()) {
                                client.reconnect();
                                // 重连成功后重新订阅
                                if (client.isConnected()) {
                                    client.subscribe(topic, qos);
                                    System.out.println("连接监控:重连成功并重新订阅主题: " + topic);
                                }
                            }
                        } catch (Exception e) {
                            System.err.println("连接监控:重连失败: " + e.getMessage());
                            // 如果自动重连失败,尝试完全重新连接
                            try {
                                connect();
                                if (isConnected()) {
                                    subscribe(qos);
                                }
                            } catch (Exception ex) {
                                System.err.println("连接监控:完全重连失败: " + ex.getMessage());
                            }
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) {
                    System.err.println("连接监控线程异常: " + e.getMessage());
                }
            }
        });
        connectionMonitorThread.setDaemon(true);
        connectionMonitorThread.setName("MQTT-ConnectionMonitor-" + clientId);
        connectionMonitorThread.start();
        System.out.println("已启动MQTT连接监控线程: " + clientId);
    }
    /**
     * 停止连接监控线程
     */
    private void stopConnectionMonitor() {
        monitoring = false;
        if (connectionMonitorThread != null) {
            connectionMonitorThread.interrupt();
            try {
                connectionMonitorThread.join(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            connectionMonitorThread = null;
        }
    }
    /**
     * 关闭客户端并释放资源
     */
    public void close() {
        stopConnectionMonitor();  // 停止监控线程
        try {
            if (client != null) {
                if (client.isConnected()) {
@@ -145,76 +226,104 @@
     * @return true表示连接成功,false表示连接失败
     */
    public static boolean connectMQTT() {
        // 先断开之前的连接
        disconnectAll();
        // 防止重复重连
        if (isReconnecting) {
            System.out.println("MQTT正在重连中,跳过本次重连请求");
            return false;
        }
        
        boolean gpsSuccess = false;
        boolean responseSuccess = false;
        // 检查是否已经连接
        if (areClientsConnected()) {
            System.out.println("MQTT客户端已连接,无需重复连接");
            return true;
        }
        
        isReconnecting = true;
        try {
            String host = "tcp://39.99.43.227:1883";
            String deiveID = Setsys.getMowerIdValue();
            // 添加时间戳确保客户端ID唯一
            long timestamp = System.currentTimeMillis();
            String clientId = Usrdell.getUserEmail() + "mower" + "_" + timestamp;
            String clientId2 = Usrdell.getUserEmail() + "response" + "_" + timestamp;
            String topic = "mower/" + deiveID + "/gps";
            String topic2 = "mower/" + deiveID + "/response";
            // 先断开之前的连接
            disconnectAll();
            
            // 连接GPS主题
            boolean gpsSuccess = false;
            boolean responseSuccess = false;
            try {
                gpsClient = new Client(host, topic, clientId);
                gpsClient.connect();
                // 稍作延迟,确保连接稳定
                Thread.sleep(100);
                gpsClient.subscribe();
                gpsSuccess = true;
                System.out.println("GPS主题MQTT连接并订阅成功");
            } catch (MqttException e) {
                System.err.println("GPS主题MQTT连接失败: " + e.getMessage());
                String host = "tcp://39.99.43.227:1883";
                String deiveID = Setsys.getMowerIdValue();
                if (deiveID == null || deiveID.isEmpty()) {
                    System.err.println("设备ID为空,无法连接MQTT");
                    return false;
                }
                // 添加时间戳确保客户端ID唯一
                long timestamp = System.currentTimeMillis();
                String clientId = Usrdell.getUserEmail() + "mower" + "_" + timestamp;
                String clientId2 = Usrdell.getUserEmail() + "response" + "_" + timestamp;
                String topic = "mower/" + deiveID + "/gps";
                String topic2 = "mower/" + deiveID + "/response";
                // 连接GPS主题
                try {
                    gpsClient = new Client(host, topic, clientId);
                    gpsClient.connect();
                    // 稍作延迟,确保连接稳定
                    Thread.sleep(100);
                    if (gpsClient != null && gpsClient.isConnected()) {
                        gpsClient.subscribe();
                        gpsSuccess = true;
                        System.out.println("GPS主题MQTT连接并订阅成功");
                    }
                } catch (MqttException e) {
                    System.err.println("GPS主题MQTT连接失败: " + e.getMessage());
                    if (e.getCause() != null) {
                        System.err.println("失败原因: " + e.getCause().getMessage());
                    }
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("连接过程被中断");
                }
                // 连接响应主题
                try {
                    responseClient = new Client(host, topic2, clientId2);
                    responseClient.connect();
                    // 稍作延迟,确保连接稳定
                    Thread.sleep(100);
                    if (responseClient != null && responseClient.isConnected()) {
                        responseClient.subscribe();
                        responseSuccess = true;
                        System.out.println("响应主题MQTT连接并订阅成功");
                    }
                } catch (MqttException e) {
                    System.err.println("响应主题MQTT连接失败: " + e.getMessage());
                    if (e.getCause() != null) {
                        System.err.println("失败原因: " + e.getCause().getMessage());
                    }
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("连接过程被中断");
                }
                if (gpsSuccess && responseSuccess) {
                    System.out.println("所有MQTT主题连接并订阅成功!");
                    return true;
                } else if (gpsSuccess || responseSuccess) {
                    System.out.println("部分MQTT主题连接成功");
                    return true;
                } else {
                    System.err.println("所有MQTT主题连接失败");
                    return false;
                }
            } catch (Exception e) {
                System.err.println("MQTT连接过程发生异常: " + e.getMessage());
                if (e.getCause() != null) {
                    System.err.println("失败原因: " + e.getCause().getMessage());
                    System.err.println("异常原因: " + e.getCause().getMessage());
                }
                e.printStackTrace();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("连接过程被中断");
            }
            // 连接响应主题
            try {
                responseClient = new Client(host, topic2, clientId2);
                responseClient.connect();
                // 稍作延迟,确保连接稳定
                Thread.sleep(100);
                responseClient.subscribe();
                responseSuccess = true;
                System.out.println("响应主题MQTT连接并订阅成功");
            } catch (MqttException e) {
                System.err.println("响应主题MQTT连接失败: " + e.getMessage());
                if (e.getCause() != null) {
                    System.err.println("失败原因: " + e.getCause().getMessage());
                }
                e.printStackTrace();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("连接过程被中断");
            }
            if (gpsSuccess && responseSuccess) {
                System.out.println("所有MQTT主题连接并订阅成功!");
                return true;
            } else if (gpsSuccess || responseSuccess) {
                System.out.println("部分MQTT主题连接成功");
                return true;
            } else {
                System.err.println("所有MQTT主题连接失败");
                return false;
            }
        } catch (Exception e) {
            System.err.println("MQTT连接过程发生异常: " + e.getMessage());
            e.printStackTrace();
            return false;
        } finally {
            isReconnecting = false;
        }
    }
    
@@ -226,74 +335,102 @@
     * @return true表示连接成功,false表示连接失败
     */
    public static boolean connectMQTT(String host, String deviceId, String userEmail) {
        // 先断开之前的连接
        disconnectAll();
        // 防止重复重连
        if (isReconnecting) {
            System.out.println("MQTT正在重连中,跳过本次重连请求");
            return false;
        }
        
        boolean gpsSuccess = false;
        boolean responseSuccess = false;
        // 检查是否已经连接
        if (areClientsConnected()) {
            System.out.println("MQTT客户端已连接,无需重复连接");
            return true;
        }
        
        isReconnecting = true;
        try {
            // 添加时间戳确保客户端ID唯一
            long timestamp = System.currentTimeMillis();
            String clientId = userEmail + "mower" + "_" + timestamp;
            String clientId2 = userEmail + "response" + "_" + timestamp;
            String topic = "mower/" + deviceId + "/gps";
            String topic2 = "mower/" + deviceId + "/response";
            // 先断开之前的连接
            disconnectAll();
            
            // 连接GPS主题
            boolean gpsSuccess = false;
            boolean responseSuccess = false;
            try {
                gpsClient = new Client(host, topic, clientId);
                gpsClient.connect();
                // 稍作延迟,确保连接稳定
                Thread.sleep(100);
                gpsClient.subscribe();
                gpsSuccess = true;
                System.out.println("GPS主题MQTT连接并订阅成功");
            } catch (MqttException e) {
                System.err.println("GPS主题MQTT连接失败: " + e.getMessage());
                if (deviceId == null || deviceId.isEmpty()) {
                    System.err.println("设备ID为空,无法连接MQTT");
                    return false;
                }
                // 添加时间戳确保客户端ID唯一
                long timestamp = System.currentTimeMillis();
                String clientId = userEmail + "mower" + "_" + timestamp;
                String clientId2 = userEmail + "response" + "_" + timestamp;
                String topic = "mower/" + deviceId + "/gps";
                String topic2 = "mower/" + deviceId + "/response";
                // 连接GPS主题
                try {
                    gpsClient = new Client(host, topic, clientId);
                    gpsClient.connect();
                    // 稍作延迟,确保连接稳定
                    Thread.sleep(100);
                    if (gpsClient != null && gpsClient.isConnected()) {
                        gpsClient.subscribe();
                        gpsSuccess = true;
                        System.out.println("GPS主题MQTT连接并订阅成功");
                    }
                } catch (MqttException e) {
                    System.err.println("GPS主题MQTT连接失败: " + e.getMessage());
                    if (e.getCause() != null) {
                        System.err.println("失败原因: " + e.getCause().getMessage());
                    }
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("连接过程被中断");
                }
                // 连接响应主题
                try {
                    responseClient = new Client(host, topic2, clientId2);
                    responseClient.connect();
                    // 稍作延迟,确保连接稳定
                    Thread.sleep(100);
                    if (responseClient != null && responseClient.isConnected()) {
                        responseClient.subscribe();
                        responseSuccess = true;
                        System.out.println("响应主题MQTT连接并订阅成功");
                    }
                } catch (MqttException e) {
                    System.err.println("响应主题MQTT连接失败: " + e.getMessage());
                    if (e.getCause() != null) {
                        System.err.println("失败原因: " + e.getCause().getMessage());
                    }
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("连接过程被中断");
                }
                if (gpsSuccess && responseSuccess) {
                    System.out.println("所有MQTT主题连接并订阅成功!");
                    return true;
                } else if (gpsSuccess || responseSuccess) {
                    System.out.println("部分MQTT主题连接成功");
                    return true;
                } else {
                    System.err.println("所有MQTT主题连接失败");
                    return false;
                }
            } catch (Exception e) {
                System.err.println("MQTT连接过程发生异常: " + e.getMessage());
                if (e.getCause() != null) {
                    System.err.println("失败原因: " + e.getCause().getMessage());
                    System.err.println("异常原因: " + e.getCause().getMessage());
                }
                e.printStackTrace();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("连接过程被中断");
            }
            // 连接响应主题
            try {
                responseClient = new Client(host, topic2, clientId2);
                responseClient.connect();
                // 稍作延迟,确保连接稳定
                Thread.sleep(100);
                responseClient.subscribe();
                responseSuccess = true;
                System.out.println("响应主题MQTT连接并订阅成功");
            } catch (MqttException e) {
                System.err.println("响应主题MQTT连接失败: " + e.getMessage());
                if (e.getCause() != null) {
                    System.err.println("失败原因: " + e.getCause().getMessage());
                }
                e.printStackTrace();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("连接过程被中断");
            }
            if (gpsSuccess && responseSuccess) {
                System.out.println("所有MQTT主题连接并订阅成功!");
                return true;
            } else if (gpsSuccess || responseSuccess) {
                System.out.println("部分MQTT主题连接成功");
                return true;
            } else {
                System.err.println("所有MQTT主题连接失败");
                return false;
            }
        } catch (Exception e) {
            System.err.println("MQTT连接过程发生异常: " + e.getMessage());
            e.printStackTrace();
            return false;
        } finally {
            isReconnecting = false;
        }
    }