package Mqttmessage; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import set.Setsys; import user.Usrdell; /** * MQTT客户端工具类 * 用于连接MQTT服务器并订阅主题 */ public class Client { private String host; private String topic; private String clientId; private MqttClient client; private MqttConnectOptions options; // 静态变量用于存储客户端实例 private static Client gpsClient; private static Client responseClient; /** * 构造函数 * @param host MQTT服务器地址,格式:tcp://ip:port * @param topic 订阅的主题 * @param clientId 客户端ID,不能重复 */ public Client(String host, String topic, String clientId) { this.host = host; this.topic = topic; this.clientId = clientId; this.options = new MqttConnectOptions(); this.options.setCleanSession(true); // 设置连接超时时间(秒) this.options.setConnectionTimeout(30); // 设置KeepAlive间隔(秒),用于保持连接活跃 this.options.setKeepAliveInterval(60); // 设置自动重连 this.options.setAutomaticReconnect(true); // 设置MQTT版本,使用3.1.1 this.options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); } /** * 连接到MQTT服务器 * @throws MqttException 连接失败时抛出异常 */ public void connect() throws MqttException { if (client != null && client.isConnected()) { System.out.println("MQTT客户端已连接,ClientId: " + clientId); return; } // 如果客户端已存在但未连接,先关闭 if (client != null) { try { client.close(); } catch (Exception e) { // 忽略关闭时的异常 } client = null; } // 使用内存持久化,避免文件锁定问题 client = new MqttClient(host, clientId, new MemoryPersistence()); // 先设置回调,再连接 client.setCallback(new PushCallback()); // 执行连接 client.connect(options); System.out.println("MQTT连接成功!ClientId: " + clientId + ", 服务器: " + host + ", 主题: " + topic); } /** * 订阅主题 * @param qos 服务质量等级,0-2 * @throws MqttException 订阅失败时抛出异常 */ public void subscribe(int qos) throws MqttException { if (client == null || !client.isConnected()) { connect(); } client.subscribe(topic, qos); } /** * 订阅主题(默认QoS为2) * @throws MqttException 订阅失败时抛出异常 */ public void subscribe() throws MqttException { subscribe(2); } /** * 断开连接 * @throws MqttException 断开连接失败时抛出异常 */ public void disconnect() throws MqttException { if (client != null && client.isConnected()) { client.disconnect(); } } /** * 关闭客户端并释放资源 */ public void close() { try { if (client != null) { if (client.isConnected()) { client.disconnect(); } client.close(); client = null; } } catch (Exception e) { // 忽略关闭时的异常 } } /** * 检查是否已连接 * @return true表示已连接,false表示未连接 */ public boolean isConnected() { return client != null && client.isConnected(); } /** * 获取MQTT客户端实例 * @return MqttClient实例 */ public MqttClient getClient() { return client; } /** * 连接MQTT服务器的工具方法 * 供其他类直接调用,连接GPS主题和响应主题 * @return true表示连接成功,false表示连接失败 */ public static boolean connectMQTT() { // 先断开之前的连接 disconnectAll(); boolean gpsSuccess = false; boolean responseSuccess = false; try { String host = "tcp://39.99.43.227:1883"; String deiveID = Setsys.getMowerIdValue(); // 添加时间戳确保客户端ID唯一 long timestamp = System.currentTimeMillis(); String clientId = Usrdell.getUserEmail() + "mower" + "_" + timestamp; String clientId2 = Usrdell.getUserEmail() + "response" + "_" + timestamp; String topic = "mower/" + deiveID + "/gps"; String topic2 = "mower/" + deiveID + "/response"; // 连接GPS主题 try { gpsClient = new Client(host, topic, clientId); gpsClient.connect(); // 稍作延迟,确保连接稳定 Thread.sleep(100); gpsClient.subscribe(); gpsSuccess = true; System.out.println("GPS主题MQTT连接并订阅成功"); } catch (MqttException e) { System.err.println("GPS主题MQTT连接失败: " + e.getMessage()); if (e.getCause() != null) { System.err.println("失败原因: " + e.getCause().getMessage()); } e.printStackTrace(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("连接过程被中断"); } // 连接响应主题 try { responseClient = new Client(host, topic2, clientId2); responseClient.connect(); // 稍作延迟,确保连接稳定 Thread.sleep(100); responseClient.subscribe(); responseSuccess = true; System.out.println("响应主题MQTT连接并订阅成功"); } catch (MqttException e) { System.err.println("响应主题MQTT连接失败: " + e.getMessage()); if (e.getCause() != null) { System.err.println("失败原因: " + e.getCause().getMessage()); } e.printStackTrace(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("连接过程被中断"); } if (gpsSuccess && responseSuccess) { System.out.println("所有MQTT主题连接并订阅成功!"); return true; } else if (gpsSuccess || responseSuccess) { System.out.println("部分MQTT主题连接成功"); return true; } else { System.err.println("所有MQTT主题连接失败"); return false; } } catch (Exception e) { System.err.println("MQTT连接过程发生异常: " + e.getMessage()); e.printStackTrace(); return false; } } /** * 连接MQTT服务器的工具方法(带参数版本) * @param host MQTT服务器地址,格式:tcp://ip:port * @param deviceId 设备ID * @param userEmail 用户邮箱 * @return true表示连接成功,false表示连接失败 */ public static boolean connectMQTT(String host, String deviceId, String userEmail) { // 先断开之前的连接 disconnectAll(); boolean gpsSuccess = false; boolean responseSuccess = false; try { // 添加时间戳确保客户端ID唯一 long timestamp = System.currentTimeMillis(); String clientId = userEmail + "mower" + "_" + timestamp; String clientId2 = userEmail + "response" + "_" + timestamp; String topic = "mower/" + deviceId + "/gps"; String topic2 = "mower/" + deviceId + "/response"; // 连接GPS主题 try { gpsClient = new Client(host, topic, clientId); gpsClient.connect(); // 稍作延迟,确保连接稳定 Thread.sleep(100); gpsClient.subscribe(); gpsSuccess = true; System.out.println("GPS主题MQTT连接并订阅成功"); } catch (MqttException e) { System.err.println("GPS主题MQTT连接失败: " + e.getMessage()); if (e.getCause() != null) { System.err.println("失败原因: " + e.getCause().getMessage()); } e.printStackTrace(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("连接过程被中断"); } // 连接响应主题 try { responseClient = new Client(host, topic2, clientId2); responseClient.connect(); // 稍作延迟,确保连接稳定 Thread.sleep(100); responseClient.subscribe(); responseSuccess = true; System.out.println("响应主题MQTT连接并订阅成功"); } catch (MqttException e) { System.err.println("响应主题MQTT连接失败: " + e.getMessage()); if (e.getCause() != null) { System.err.println("失败原因: " + e.getCause().getMessage()); } e.printStackTrace(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("连接过程被中断"); } if (gpsSuccess && responseSuccess) { System.out.println("所有MQTT主题连接并订阅成功!"); return true; } else if (gpsSuccess || responseSuccess) { System.out.println("部分MQTT主题连接成功"); return true; } else { System.err.println("所有MQTT主题连接失败"); return false; } } catch (Exception e) { System.err.println("MQTT连接过程发生异常: " + e.getMessage()); e.printStackTrace(); return false; } } /** * 创建并连接MQTT客户端的工具方法 * @param host MQTT服务器地址 * @param topic 订阅主题 * @param clientId 客户端ID * @param qos 服务质量等级,默认2 * @return Client实例,连接失败返回null */ public static Client createAndConnect(String host, String topic, String clientId, int qos) { try { Client mqttClient = new Client(host, topic, clientId); mqttClient.connect(); mqttClient.subscribe(qos); System.out.println("MQTT客户端创建并订阅成功,主题: " + topic + ", ClientId: " + clientId); return mqttClient; } catch (MqttException e) { System.err.println("MQTT客户端创建失败: " + e.getMessage() + ", 主题: " + topic); e.printStackTrace(); return null; } } /** * 创建并连接MQTT客户端的工具方法(默认QoS为2) * @param host MQTT服务器地址 * @param topic 订阅主题 * @param clientId 客户端ID * @return Client实例,连接失败返回null */ public static Client createAndConnect(String host, String topic, String clientId) { return createAndConnect(host, topic, clientId, 2); } /** * 断开所有MQTT连接 */ public static void disconnectAll() { try { if (gpsClient != null) { gpsClient.close(); System.out.println("GPS主题MQTT连接已断开"); gpsClient = null; } if (responseClient != null) { responseClient.close(); System.out.println("响应主题MQTT连接已断开"); responseClient = null; } } catch (Exception e) { System.err.println("断开MQTT连接失败: " + e.getMessage()); e.printStackTrace(); } } /** * 获取GPS客户端实例 * @return GPS客户端实例 */ public static Client getGpsClient() { return gpsClient; } /** * 获取响应客户端实例 * @return 响应客户端实例 */ public static Client getResponseClient() { return responseClient; } /** * 检查MQTT连接状态(静态方法) * @return true表示已连接,false表示未连接 */ public static boolean areClientsConnected() { boolean gpsConnected = gpsClient != null && gpsClient.isConnected(); boolean responseConnected = responseClient != null && responseClient.isConnected(); return gpsConnected || responseConnected; } /** * 示例用法(保留向后兼容) * @deprecated 请使用 connectMQTT() 方法替代 */ @Deprecated public static void lianjiemqqt() { connectMQTT(); } }