张世豪
12 小时以前 5ae9bbe3583384afab8eb95a134ccb74aee6487a
src/Mqttmessage/Client.java
@@ -3,6 +3,10 @@
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import set.Setsys;
import user.Usrdell;
/**
 * MQTT客户端工具类
@@ -16,6 +20,10 @@
    private MqttClient client;
    private MqttConnectOptions options;
    
    // 静态变量用于存储客户端实例
    private static Client gpsClient;
    private static Client responseClient;
    /**
     * 构造函数
     * @param host MQTT服务器地址,格式:tcp://ip:port
@@ -23,12 +31,19 @@
     * @param clientId 客户端ID,不能重复
     */
    public Client(String host, String topic, String clientId) {
        this.host = host;
        this.topic = topic;
        this.clientId = clientId;
        this.options = new MqttConnectOptions();
        this.options.setCleanSession(true);
        // 设置连接超时时间(秒)
        this.options.setConnectionTimeout(30);
        // 设置KeepAlive间隔(秒),用于保持连接活跃
        this.options.setKeepAliveInterval(60);
        // 设置自动重连
        this.options.setAutomaticReconnect(true);
        // 设置MQTT版本,使用3.1.1
        this.options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
    }
    
    /**
@@ -37,11 +52,28 @@
     */
    public void connect() throws MqttException {
        if (client != null && client.isConnected()) {
            System.out.println("MQTT客户端已连接,ClientId: " + clientId);
            return;
        }
        client = new MqttClient(host, clientId);
        client.connect(options);
        // 如果客户端已存在但未连接,先关闭
        if (client != null) {
            try {
                client.close();
            } catch (Exception e) {
                // 忽略关闭时的异常
            }
            client = null;
        }
        // 使用内存持久化,避免文件锁定问题
        client = new MqttClient(host, clientId, new MemoryPersistence());
        // 先设置回调,再连接
        client.setCallback(new PushCallback());
        // 执行连接
        client.connect(options);
        System.out.println("MQTT连接成功!ClientId: " + clientId + ", 服务器: " + host + ", 主题: " + topic);
    }
    
    /**
@@ -75,6 +107,23 @@
    }
    
    /**
     * 关闭客户端并释放资源
     */
    public void close() {
        try {
            if (client != null) {
                if (client.isConnected()) {
                    client.disconnect();
                }
                client.close();
                client = null;
            }
        } catch (Exception e) {
            // 忽略关闭时的异常
        }
    }
    /**
     * 检查是否已连接
     * @return true表示已连接,false表示未连接
     */
@@ -91,29 +140,249 @@
    }
    
    /**
     * 示例用法
     * 连接MQTT服务器的工具方法
     * 供其他类直接调用,连接GPS主题和响应主题
     * @return true表示连接成功,false表示连接失败
     */
    public static void test()  {
    public static boolean connectMQTT() {
        // 先断开之前的连接
        disconnectAll();
        boolean gpsSuccess = false;
        boolean responseSuccess = false;
        try {
            String host = "tcp://39.99.43.227:1883";
            String deiveID="6258";
            String clientId = "hxzkMQTT";
            String clientId2 = "hxzkMQTT2";
            String topic = "mower/"+deiveID+"/gps";
            String topic2 = "mower/"+deiveID+"/response";
            Client mqttClient = new Client(host, topic, clientId);
            Client mqttClient1 = new Client(host, topic2, clientId2);
            mqttClient.connect();
            mqttClient.subscribe();
            String deiveID = Setsys.getMowerIdValue();
            // 添加时间戳确保客户端ID唯一
            long timestamp = System.currentTimeMillis();
            String clientId = Usrdell.getUserEmail() + "mower" + "_" + timestamp;
            String clientId2 = Usrdell.getUserEmail() + "response" + "_" + timestamp;
            String topic = "mower/" + deiveID + "/gps";
            String topic2 = "mower/" + deiveID + "/response";
            // 连接GPS主题
            try {
                gpsClient = new Client(host, topic, clientId);
                gpsClient.connect();
                // 稍作延迟,确保连接稳定
                Thread.sleep(100);
                gpsClient.subscribe();
                gpsSuccess = true;
                System.out.println("GPS主题MQTT连接并订阅成功");
            } catch (MqttException e) {
                System.err.println("GPS主题MQTT连接失败: " + e.getMessage());
                if (e.getCause() != null) {
                    System.err.println("失败原因: " + e.getCause().getMessage());
                }
                e.printStackTrace();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("连接过程被中断");
            }
            mqttClient1.connect();
            mqttClient1.subscribe();
            // 连接响应主题
            try {
                responseClient = new Client(host, topic2, clientId2);
                responseClient.connect();
                // 稍作延迟,确保连接稳定
                Thread.sleep(100);
                responseClient.subscribe();
                responseSuccess = true;
                System.out.println("响应主题MQTT连接并订阅成功");
            } catch (MqttException e) {
                System.err.println("响应主题MQTT连接失败: " + e.getMessage());
                if (e.getCause() != null) {
                    System.err.println("失败原因: " + e.getCause().getMessage());
                }
                e.printStackTrace();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("连接过程被中断");
            }
            // 保持程序运行
           //Thread.sleep(Long.MAX_VALUE);
        } catch (MqttException e) {
            throw new RuntimeException(e);
            if (gpsSuccess && responseSuccess) {
                System.out.println("所有MQTT主题连接并订阅成功!");
                return true;
            } else if (gpsSuccess || responseSuccess) {
                System.out.println("部分MQTT主题连接成功");
                return true;
            } else {
                System.err.println("所有MQTT主题连接失败");
                return false;
            }
        } catch (Exception e) {
            System.err.println("MQTT连接过程发生异常: " + e.getMessage());
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 连接MQTT服务器的工具方法(带参数版本)
     * @param host MQTT服务器地址,格式:tcp://ip:port
     * @param deviceId 设备ID
     * @param userEmail 用户邮箱
     * @return true表示连接成功,false表示连接失败
     */
    public static boolean connectMQTT(String host, String deviceId, String userEmail) {
        // 先断开之前的连接
        disconnectAll();
        boolean gpsSuccess = false;
        boolean responseSuccess = false;
        try {
            // 添加时间戳确保客户端ID唯一
            long timestamp = System.currentTimeMillis();
            String clientId = userEmail + "mower" + "_" + timestamp;
            String clientId2 = userEmail + "response" + "_" + timestamp;
            String topic = "mower/" + deviceId + "/gps";
            String topic2 = "mower/" + deviceId + "/response";
            // 连接GPS主题
            try {
                gpsClient = new Client(host, topic, clientId);
                gpsClient.connect();
                // 稍作延迟,确保连接稳定
                Thread.sleep(100);
                gpsClient.subscribe();
                gpsSuccess = true;
                System.out.println("GPS主题MQTT连接并订阅成功");
            } catch (MqttException e) {
                System.err.println("GPS主题MQTT连接失败: " + e.getMessage());
                if (e.getCause() != null) {
                    System.err.println("失败原因: " + e.getCause().getMessage());
                }
                e.printStackTrace();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("连接过程被中断");
            }
            // 连接响应主题
            try {
                responseClient = new Client(host, topic2, clientId2);
                responseClient.connect();
                // 稍作延迟,确保连接稳定
                Thread.sleep(100);
                responseClient.subscribe();
                responseSuccess = true;
                System.out.println("响应主题MQTT连接并订阅成功");
            } catch (MqttException e) {
                System.err.println("响应主题MQTT连接失败: " + e.getMessage());
                if (e.getCause() != null) {
                    System.err.println("失败原因: " + e.getCause().getMessage());
                }
                e.printStackTrace();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("连接过程被中断");
            }
            if (gpsSuccess && responseSuccess) {
                System.out.println("所有MQTT主题连接并订阅成功!");
                return true;
            } else if (gpsSuccess || responseSuccess) {
                System.out.println("部分MQTT主题连接成功");
                return true;
            } else {
                System.err.println("所有MQTT主题连接失败");
                return false;
            }
        } catch (Exception e) {
            System.err.println("MQTT连接过程发生异常: " + e.getMessage());
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 创建并连接MQTT客户端的工具方法
     * @param host MQTT服务器地址
     * @param topic 订阅主题
     * @param clientId 客户端ID
     * @param qos 服务质量等级,默认2
     * @return Client实例,连接失败返回null
     */
    public static Client createAndConnect(String host, String topic, String clientId, int qos) {
        try {
            Client mqttClient = new Client(host, topic, clientId);
            mqttClient.connect();
            mqttClient.subscribe(qos);
            System.out.println("MQTT客户端创建并订阅成功,主题: " + topic + ", ClientId: " + clientId);
            return mqttClient;
        } catch (MqttException e) {
            System.err.println("MQTT客户端创建失败: " + e.getMessage() + ", 主题: " + topic);
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 创建并连接MQTT客户端的工具方法(默认QoS为2)
     * @param host MQTT服务器地址
     * @param topic 订阅主题
     * @param clientId 客户端ID
     * @return Client实例,连接失败返回null
     */
    public static Client createAndConnect(String host, String topic, String clientId) {
        return createAndConnect(host, topic, clientId, 2);
    }
    /**
     * 断开所有MQTT连接
     */
    public static void disconnectAll() {
        try {
            if (gpsClient != null) {
                gpsClient.close();
                System.out.println("GPS主题MQTT连接已断开");
                gpsClient = null;
            }
            if (responseClient != null) {
                responseClient.close();
                System.out.println("响应主题MQTT连接已断开");
                responseClient = null;
            }
        } catch (Exception e) {
            System.err.println("断开MQTT连接失败: " + e.getMessage());
            e.printStackTrace();
        }
    }
    /**
     * 获取GPS客户端实例
     * @return GPS客户端实例
     */
    public static Client getGpsClient() {
        return gpsClient;
    }
    /**
     * 获取响应客户端实例
     * @return 响应客户端实例
     */
    public static Client getResponseClient() {
        return responseClient;
    }
    /**
     * 检查MQTT连接状态(静态方法)
     * @return true表示已连接,false表示未连接
     */
    public static boolean areClientsConnected() {
        boolean gpsConnected = gpsClient != null && gpsClient.isConnected();
        boolean responseConnected = responseClient != null && responseClient.isConnected();
        return gpsConnected || responseConnected;
    }
    /**
     * 示例用法(保留向后兼容)
     * @deprecated 请使用 connectMQTT() 方法替代
     */
    @Deprecated
    public static void lianjiemqqt() {
        connectMQTT();
    }
}