张世豪
15 小时以前 2eea735fd4ddf0ae047687780271ef3962d256cc
src/Mqttmessage/Client.java
@@ -3,6 +3,10 @@
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import set.Setsys;
import user.Usrdell;
/**
 * MQTT客户端工具类
@@ -15,6 +19,15 @@
    private String clientId;
    private MqttClient client;
    private MqttConnectOptions options;
    private int qos = 0;  // 默认QoS等级
    private Thread connectionMonitorThread;  // 连接监控线程
    private volatile boolean monitoring = false;  // 监控标志
    // 静态变量用于存储客户端实例
    private static Client gpsClient;
    private static Client responseClient;
    // 重连标志,防止重复重连
    private static volatile boolean isReconnecting = false;
    
    /**
     * 构造函数
@@ -23,12 +36,19 @@
     * @param clientId 客户端ID,不能重复
     */
    public Client(String host, String topic, String clientId) {
        this.host = host;
        this.topic = topic;
        this.clientId = clientId;
        this.options = new MqttConnectOptions();
        this.options.setCleanSession(true);
        // 设置连接超时时间(秒)
        this.options.setConnectionTimeout(30);
        // 设置KeepAlive间隔(秒),用于保持连接活跃
        this.options.setKeepAliveInterval(60);
        // 设置自动重连
        this.options.setAutomaticReconnect(true);
        // 设置MQTT版本,使用3.1.1
        this.options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
    }
    
    /**
@@ -37,11 +57,31 @@
     */
    public void connect() throws MqttException {
        if (client != null && client.isConnected()) {
            System.out.println("MQTT客户端已连接,ClientId: " + clientId);
            return;
        }
        client = new MqttClient(host, clientId);
        // 如果客户端已存在但未连接,先关闭
        if (client != null) {
            try {
                client.close();
            } catch (Exception e) {
                // 忽略关闭时的异常
            }
            client = null;
        }
        // 使用内存持久化,避免文件锁定问题
        client = new MqttClient(host, clientId, new MemoryPersistence());
        // 先设置回调,再连接(传入Client实例和订阅信息,用于自动重连后重新订阅)
        client.setCallback(new PushCallback(this, topic, qos));
        // 执行连接
        client.connect(options);
        client.setCallback(new PushCallback());
        System.out.println("MQTT连接成功!ClientId: " + clientId + ", 服务器: " + host + ", 主题: " + topic);
        // 启动连接监控线程
        startConnectionMonitor();
    }
    
    /**
@@ -50,10 +90,12 @@
     * @throws MqttException 订阅失败时抛出异常
     */
    public void subscribe(int qos) throws MqttException {
        this.qos = qos;  // 保存QoS值,用于重连后重新订阅
        if (client == null || !client.isConnected()) {
            connect();
        }
        client.subscribe(topic, qos);
        System.out.println("已订阅主题: " + topic + ", QoS: " + qos);
    }
    
    /**
@@ -61,7 +103,8 @@
     * @throws MqttException 订阅失败时抛出异常
     */
    public void subscribe() throws MqttException {
        subscribe(2);
        // 使用QoS 0确保消息可靠传递(至少一次,且仅一次)
        subscribe(0);
    }
    
    /**
@@ -75,6 +118,93 @@
    }
    
    /**
     * 启动连接监控线程,定期检查连接状态并在断开时尝试重连
     */
    private void startConnectionMonitor() {
        if (monitoring) {
            return;  // 已经在监控中
        }
        monitoring = true;
        connectionMonitorThread = new Thread(() -> {
            while (monitoring && client != null) {
                try {
                    Thread.sleep(5000);  // 每5秒检查一次
                    if (client != null && !client.isConnected()) {
                        System.out.println("检测到MQTT连接断开,尝试重连... ClientId: " + clientId);
                        try {
                            // 尝试重新连接
                            if (!client.isConnected()) {
                                client.reconnect();
                                // 重连成功后重新订阅
                                if (client.isConnected()) {
                                    client.subscribe(topic, qos);
                                    System.out.println("连接监控:重连成功并重新订阅主题: " + topic);
                                }
                            }
                        } catch (Exception e) {
                            System.err.println("连接监控:重连失败: " + e.getMessage());
                            // 如果自动重连失败,尝试完全重新连接
                            try {
                                connect();
                                if (isConnected()) {
                                    subscribe(qos);
                                }
                            } catch (Exception ex) {
                                System.err.println("连接监控:完全重连失败: " + ex.getMessage());
                            }
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) {
                    System.err.println("连接监控线程异常: " + e.getMessage());
                }
            }
        });
        connectionMonitorThread.setDaemon(true);
        connectionMonitorThread.setName("MQTT-ConnectionMonitor-" + clientId);
        connectionMonitorThread.start();
        System.out.println("已启动MQTT连接监控线程: " + clientId);
    }
    /**
     * 停止连接监控线程
     */
    private void stopConnectionMonitor() {
        monitoring = false;
        if (connectionMonitorThread != null) {
            connectionMonitorThread.interrupt();
            try {
                connectionMonitorThread.join(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            connectionMonitorThread = null;
        }
    }
    /**
     * 关闭客户端并释放资源
     */
    public void close() {
        stopConnectionMonitor();  // 停止监控线程
        try {
            if (client != null) {
                if (client.isConnected()) {
                    client.disconnect();
                }
                client.close();
                client = null;
            }
        } catch (Exception e) {
            // 忽略关闭时的异常
        }
    }
    /**
     * 检查是否已连接
     * @return true表示已连接,false表示未连接
     */
@@ -91,29 +221,305 @@
    }
    
    /**
     * 示例用法
     * 连接MQTT服务器的工具方法
     * 供其他类直接调用,连接GPS主题和响应主题
     * @return true表示连接成功,false表示连接失败
     */
    public static void test()  {
        try {
            String host = "tcp://39.99.43.227:1883";
            String deiveID="6258";
            String clientId = "hxzkMQTT";
            String clientId2 = "hxzkMQTT2";
            String topic = "mower/"+deiveID+"/gps";
            String topic2 = "mower/"+deiveID+"/response";
            Client mqttClient = new Client(host, topic, clientId);
            Client mqttClient1 = new Client(host, topic2, clientId2);
            mqttClient.connect();
            mqttClient.subscribe();
            mqttClient1.connect();
            mqttClient1.subscribe();
            // 保持程序运行
           //Thread.sleep(Long.MAX_VALUE);
        } catch (MqttException e) {
            throw new RuntimeException(e);
    public static boolean connectMQTT() {
        // 防止重复重连
        if (isReconnecting) {
            System.out.println("MQTT正在重连中,跳过本次重连请求");
            return false;
        }
        // 检查是否已经连接
        if (areClientsConnected()) {
            System.out.println("MQTT客户端已连接,无需重复连接");
            return true;
        }
        isReconnecting = true;
        try {
            // 先断开之前的连接
            disconnectAll();
            boolean gpsSuccess = false;
            boolean responseSuccess = false;
            try {
                String host = "tcp://39.99.43.227:1883";
                String deiveID = Setsys.getMowerIdValue();
                if (deiveID == null || deiveID.isEmpty()) {
                    System.err.println("设备ID为空,无法连接MQTT");
                    return false;
                }
                // 添加时间戳确保客户端ID唯一
                long timestamp = System.currentTimeMillis();
                String clientId = Usrdell.getUserEmail() + "mower" + "_" + timestamp;
                String clientId2 = Usrdell.getUserEmail() + "response" + "_" + timestamp;
                String topic = "mower/" + deiveID + "/gps";
                String topic2 = "mower/" + deiveID + "/response";
                // 连接GPS主题
                try {
                    gpsClient = new Client(host, topic, clientId);
                    gpsClient.connect();
                    // 稍作延迟,确保连接稳定
                    Thread.sleep(100);
                    if (gpsClient != null && gpsClient.isConnected()) {
                        gpsClient.subscribe();
                        gpsSuccess = true;
                        System.out.println("GPS主题MQTT连接并订阅成功");
                    }
                } catch (MqttException e) {
                    System.err.println("GPS主题MQTT连接失败: " + e.getMessage());
                    if (e.getCause() != null) {
                        System.err.println("失败原因: " + e.getCause().getMessage());
                    }
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("连接过程被中断");
                }
                // 连接响应主题
                try {
                    responseClient = new Client(host, topic2, clientId2);
                    responseClient.connect();
                    // 稍作延迟,确保连接稳定
                    Thread.sleep(100);
                    if (responseClient != null && responseClient.isConnected()) {
                        responseClient.subscribe();
                        responseSuccess = true;
                        System.out.println("响应主题MQTT连接并订阅成功");
                    }
                } catch (MqttException e) {
                    System.err.println("响应主题MQTT连接失败: " + e.getMessage());
                    if (e.getCause() != null) {
                        System.err.println("失败原因: " + e.getCause().getMessage());
                    }
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("连接过程被中断");
                }
                if (gpsSuccess && responseSuccess) {
                    System.out.println("所有MQTT主题连接并订阅成功!");
                    return true;
                } else if (gpsSuccess || responseSuccess) {
                    System.out.println("部分MQTT主题连接成功");
                    return true;
                } else {
                    System.err.println("所有MQTT主题连接失败");
                    return false;
                }
            } catch (Exception e) {
                System.err.println("MQTT连接过程发生异常: " + e.getMessage());
                if (e.getCause() != null) {
                    System.err.println("异常原因: " + e.getCause().getMessage());
                }
                e.printStackTrace();
                return false;
            }
        } finally {
            isReconnecting = false;
        }
    }
    /**
     * 连接MQTT服务器的工具方法(带参数版本)
     * @param host MQTT服务器地址,格式:tcp://ip:port
     * @param deviceId 设备ID
     * @param userEmail 用户邮箱
     * @return true表示连接成功,false表示连接失败
     */
    public static boolean connectMQTT(String host, String deviceId, String userEmail) {
        // 防止重复重连
        if (isReconnecting) {
            System.out.println("MQTT正在重连中,跳过本次重连请求");
            return false;
        }
        // 检查是否已经连接
        if (areClientsConnected()) {
            System.out.println("MQTT客户端已连接,无需重复连接");
            return true;
        }
        isReconnecting = true;
        try {
            // 先断开之前的连接
            disconnectAll();
            boolean gpsSuccess = false;
            boolean responseSuccess = false;
            try {
                if (deviceId == null || deviceId.isEmpty()) {
                    System.err.println("设备ID为空,无法连接MQTT");
                    return false;
                }
                // 添加时间戳确保客户端ID唯一
                long timestamp = System.currentTimeMillis();
                String clientId = userEmail + "mower" + "_" + timestamp;
                String clientId2 = userEmail + "response" + "_" + timestamp;
                String topic = "mower/" + deviceId + "/gps";
                String topic2 = "mower/" + deviceId + "/response";
                // 连接GPS主题
                try {
                    gpsClient = new Client(host, topic, clientId);
                    gpsClient.connect();
                    // 稍作延迟,确保连接稳定
                    Thread.sleep(100);
                    if (gpsClient != null && gpsClient.isConnected()) {
                        gpsClient.subscribe();
                        gpsSuccess = true;
                        System.out.println("GPS主题MQTT连接并订阅成功");
                    }
                } catch (MqttException e) {
                    System.err.println("GPS主题MQTT连接失败: " + e.getMessage());
                    if (e.getCause() != null) {
                        System.err.println("失败原因: " + e.getCause().getMessage());
                    }
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("连接过程被中断");
                }
                // 连接响应主题
                try {
                    responseClient = new Client(host, topic2, clientId2);
                    responseClient.connect();
                    // 稍作延迟,确保连接稳定
                    Thread.sleep(100);
                    if (responseClient != null && responseClient.isConnected()) {
                        responseClient.subscribe();
                        responseSuccess = true;
                        System.out.println("响应主题MQTT连接并订阅成功");
                    }
                } catch (MqttException e) {
                    System.err.println("响应主题MQTT连接失败: " + e.getMessage());
                    if (e.getCause() != null) {
                        System.err.println("失败原因: " + e.getCause().getMessage());
                    }
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("连接过程被中断");
                }
                if (gpsSuccess && responseSuccess) {
                    System.out.println("所有MQTT主题连接并订阅成功!");
                    return true;
                } else if (gpsSuccess || responseSuccess) {
                    System.out.println("部分MQTT主题连接成功");
                    return true;
                } else {
                    System.err.println("所有MQTT主题连接失败");
                    return false;
                }
            } catch (Exception e) {
                System.err.println("MQTT连接过程发生异常: " + e.getMessage());
                if (e.getCause() != null) {
                    System.err.println("异常原因: " + e.getCause().getMessage());
                }
                e.printStackTrace();
                return false;
            }
        } finally {
            isReconnecting = false;
        }
    }
    /**
     * 创建并连接MQTT客户端的工具方法
     * @param host MQTT服务器地址
     * @param topic 订阅主题
     * @param clientId 客户端ID
     * @param qos 服务质量等级,默认2
     * @return Client实例,连接失败返回null
     */
    public static Client createAndConnect(String host, String topic, String clientId, int qos) {
        try {
            Client mqttClient = new Client(host, topic, clientId);
            mqttClient.connect();
            mqttClient.subscribe(qos);
            System.out.println("MQTT客户端创建并订阅成功,主题: " + topic + ", ClientId: " + clientId);
            return mqttClient;
        } catch (MqttException e) {
            System.err.println("MQTT客户端创建失败: " + e.getMessage() + ", 主题: " + topic);
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 创建并连接MQTT客户端的工具方法(默认QoS为2)
     * @param host MQTT服务器地址
     * @param topic 订阅主题
     * @param clientId 客户端ID
     * @return Client实例,连接失败返回null
     */
    public static Client createAndConnect(String host, String topic, String clientId) {
        return createAndConnect(host, topic, clientId, 2);
    }
    /**
     * 断开所有MQTT连接
     */
    public static void disconnectAll() {
        try {
            if (gpsClient != null) {
                gpsClient.close();
                System.out.println("GPS主题MQTT连接已断开");
                gpsClient = null;
            }
            if (responseClient != null) {
                responseClient.close();
                System.out.println("响应主题MQTT连接已断开");
                responseClient = null;
            }
        } catch (Exception e) {
            System.err.println("断开MQTT连接失败: " + e.getMessage());
            e.printStackTrace();
        }
    }
    /**
     * 获取GPS客户端实例
     * @return GPS客户端实例
     */
    public static Client getGpsClient() {
        return gpsClient;
    }
    /**
     * 获取响应客户端实例
     * @return 响应客户端实例
     */
    public static Client getResponseClient() {
        return responseClient;
    }
    /**
     * 检查MQTT连接状态(静态方法)
     * @return true表示已连接,false表示未连接
     */
    public static boolean areClientsConnected() {
        boolean gpsConnected = gpsClient != null && gpsClient.isConnected();
        boolean responseConnected = responseClient != null && responseClient.isConnected();
        return gpsConnected || responseConnected;
    }
    /**
     * 示例用法(保留向后兼容)
     * @deprecated 请使用 connectMQTT() 方法替代
     */
    @Deprecated
    public static void lianjiemqqt() {
        connectMQTT();
    }
}