package Mqttmessage; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import set.Setsys; import user.Usrdell; /** * MQTT客户端工具类 * 用于连接MQTT服务器并订阅主题 */ public class Client { private String host; private String topic; private String clientId; private MqttClient client; private MqttConnectOptions options; private int qos = 0; // 默认QoS等级 private Thread connectionMonitorThread; // 连接监控线程 private volatile boolean monitoring = false; // 监控标志 // 静态变量用于存储客户端实例 private static Client gpsClient; private static Client responseClient; // 重连标志,防止重复重连 private static volatile boolean isReconnecting = false; /** * 构造函数 * @param host MQTT服务器地址,格式:tcp://ip:port * @param topic 订阅的主题 * @param clientId 客户端ID,不能重复 */ public Client(String host, String topic, String clientId) { this.host = host; this.topic = topic; this.clientId = clientId; this.options = new MqttConnectOptions(); this.options.setCleanSession(true); // 设置连接超时时间(秒) this.options.setConnectionTimeout(30); // 设置KeepAlive间隔(秒),用于保持连接活跃 this.options.setKeepAliveInterval(60); // 设置自动重连 this.options.setAutomaticReconnect(true); // 设置MQTT版本,使用3.1.1 this.options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); } /** * 连接到MQTT服务器 * @throws MqttException 连接失败时抛出异常 */ public void connect() throws MqttException { if (client != null && client.isConnected()) { System.out.println("MQTT客户端已连接,ClientId: " + clientId); return; } // 如果客户端已存在但未连接,先关闭 if (client != null) { try { client.close(); } catch (Exception e) { // 忽略关闭时的异常 } client = null; } // 使用内存持久化,避免文件锁定问题 client = new MqttClient(host, clientId, new MemoryPersistence()); // 先设置回调,再连接(传入Client实例和订阅信息,用于自动重连后重新订阅) client.setCallback(new PushCallback(this, topic, qos)); // 执行连接 client.connect(options); System.out.println("MQTT连接成功!ClientId: " + clientId + ", 服务器: " + host + ", 主题: " + topic); // 启动连接监控线程 startConnectionMonitor(); } /** * 订阅主题 * @param qos 服务质量等级,0-2 * @throws MqttException 订阅失败时抛出异常 */ public void subscribe(int qos) throws MqttException { this.qos = qos; // 保存QoS值,用于重连后重新订阅 if (client == null || !client.isConnected()) { connect(); } client.subscribe(topic, qos); System.out.println("已订阅主题: " + topic + ", QoS: " + qos); } /** * 订阅主题(默认QoS为2) * @throws MqttException 订阅失败时抛出异常 */ public void subscribe() throws MqttException { // 使用QoS 0确保消息可靠传递(至少一次,且仅一次) subscribe(0); } /** * 断开连接 * @throws MqttException 断开连接失败时抛出异常 */ public void disconnect() throws MqttException { if (client != null && client.isConnected()) { client.disconnect(); } } /** * 启动连接监控线程,定期检查连接状态并在断开时尝试重连 */ private void startConnectionMonitor() { if (monitoring) { return; // 已经在监控中 } monitoring = true; connectionMonitorThread = new Thread(() -> { while (monitoring && client != null) { try { Thread.sleep(5000); // 每5秒检查一次 if (client != null && !client.isConnected()) { System.out.println("检测到MQTT连接断开,尝试重连... ClientId: " + clientId); try { // 尝试重新连接 if (!client.isConnected()) { client.reconnect(); // 重连成功后重新订阅 if (client.isConnected()) { client.subscribe(topic, qos); System.out.println("连接监控:重连成功并重新订阅主题: " + topic); } } } catch (Exception e) { System.err.println("连接监控:重连失败: " + e.getMessage()); // 如果自动重连失败,尝试完全重新连接 try { connect(); if (isConnected()) { subscribe(qos); } } catch (Exception ex) { System.err.println("连接监控:完全重连失败: " + ex.getMessage()); } } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } catch (Exception e) { System.err.println("连接监控线程异常: " + e.getMessage()); } } }); connectionMonitorThread.setDaemon(true); connectionMonitorThread.setName("MQTT-ConnectionMonitor-" + clientId); connectionMonitorThread.start(); System.out.println("已启动MQTT连接监控线程: " + clientId); } /** * 停止连接监控线程 */ private void stopConnectionMonitor() { monitoring = false; if (connectionMonitorThread != null) { connectionMonitorThread.interrupt(); try { connectionMonitorThread.join(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } connectionMonitorThread = null; } } /** * 关闭客户端并释放资源 */ public void close() { stopConnectionMonitor(); // 停止监控线程 try { if (client != null) { if (client.isConnected()) { client.disconnect(); } client.close(); client = null; } } catch (Exception e) { // 忽略关闭时的异常 } } /** * 检查是否已连接 * @return true表示已连接,false表示未连接 */ public boolean isConnected() { return client != null && client.isConnected(); } /** * 获取MQTT客户端实例 * @return MqttClient实例 */ public MqttClient getClient() { return client; } /** * 连接MQTT服务器的工具方法 * 供其他类直接调用,连接GPS主题和响应主题 * @return true表示连接成功,false表示连接失败 */ public static boolean connectMQTT() { // 防止重复重连 if (isReconnecting) { System.out.println("MQTT正在重连中,跳过本次重连请求"); return false; } // 检查是否已经连接 if (areClientsConnected()) { System.out.println("MQTT客户端已连接,无需重复连接"); return true; } isReconnecting = true; try { // 先断开之前的连接 disconnectAll(); boolean gpsSuccess = false; boolean responseSuccess = false; try { String host = "tcp://39.99.43.227:1883"; String deiveID = Setsys.getMowerIdValue(); if (deiveID == null || deiveID.isEmpty()) { System.err.println("设备ID为空,无法连接MQTT"); return false; } // 添加时间戳确保客户端ID唯一 long timestamp = System.currentTimeMillis(); String clientId = Usrdell.getUserEmail() + "mower" + "_" + timestamp; String clientId2 = Usrdell.getUserEmail() + "response" + "_" + timestamp; String topic = "mower/" + deiveID + "/gps"; String topic2 = "mower/" + deiveID + "/response"; // 连接GPS主题 try { gpsClient = new Client(host, topic, clientId); gpsClient.connect(); // 稍作延迟,确保连接稳定 Thread.sleep(100); if (gpsClient != null && gpsClient.isConnected()) { gpsClient.subscribe(); gpsSuccess = true; System.out.println("GPS主题MQTT连接并订阅成功"); } } catch (MqttException e) { System.err.println("GPS主题MQTT连接失败: " + e.getMessage()); if (e.getCause() != null) { System.err.println("失败原因: " + e.getCause().getMessage()); } e.printStackTrace(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("连接过程被中断"); } // 连接响应主题 try { responseClient = new Client(host, topic2, clientId2); responseClient.connect(); // 稍作延迟,确保连接稳定 Thread.sleep(100); if (responseClient != null && responseClient.isConnected()) { responseClient.subscribe(); responseSuccess = true; System.out.println("响应主题MQTT连接并订阅成功"); } } catch (MqttException e) { System.err.println("响应主题MQTT连接失败: " + e.getMessage()); if (e.getCause() != null) { System.err.println("失败原因: " + e.getCause().getMessage()); } e.printStackTrace(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("连接过程被中断"); } if (gpsSuccess && responseSuccess) { System.out.println("所有MQTT主题连接并订阅成功!"); return true; } else if (gpsSuccess || responseSuccess) { System.out.println("部分MQTT主题连接成功"); return true; } else { System.err.println("所有MQTT主题连接失败"); return false; } } catch (Exception e) { System.err.println("MQTT连接过程发生异常: " + e.getMessage()); if (e.getCause() != null) { System.err.println("异常原因: " + e.getCause().getMessage()); } e.printStackTrace(); return false; } } finally { isReconnecting = false; } } /** * 连接MQTT服务器的工具方法(带参数版本) * @param host MQTT服务器地址,格式:tcp://ip:port * @param deviceId 设备ID * @param userEmail 用户邮箱 * @return true表示连接成功,false表示连接失败 */ public static boolean connectMQTT(String host, String deviceId, String userEmail) { // 防止重复重连 if (isReconnecting) { System.out.println("MQTT正在重连中,跳过本次重连请求"); return false; } // 检查是否已经连接 if (areClientsConnected()) { System.out.println("MQTT客户端已连接,无需重复连接"); return true; } isReconnecting = true; try { // 先断开之前的连接 disconnectAll(); boolean gpsSuccess = false; boolean responseSuccess = false; try { if (deviceId == null || deviceId.isEmpty()) { System.err.println("设备ID为空,无法连接MQTT"); return false; } // 添加时间戳确保客户端ID唯一 long timestamp = System.currentTimeMillis(); String clientId = userEmail + "mower" + "_" + timestamp; String clientId2 = userEmail + "response" + "_" + timestamp; String topic = "mower/" + deviceId + "/gps"; String topic2 = "mower/" + deviceId + "/response"; // 连接GPS主题 try { gpsClient = new Client(host, topic, clientId); gpsClient.connect(); // 稍作延迟,确保连接稳定 Thread.sleep(100); if (gpsClient != null && gpsClient.isConnected()) { gpsClient.subscribe(); gpsSuccess = true; System.out.println("GPS主题MQTT连接并订阅成功"); } } catch (MqttException e) { System.err.println("GPS主题MQTT连接失败: " + e.getMessage()); if (e.getCause() != null) { System.err.println("失败原因: " + e.getCause().getMessage()); } e.printStackTrace(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("连接过程被中断"); } // 连接响应主题 try { responseClient = new Client(host, topic2, clientId2); responseClient.connect(); // 稍作延迟,确保连接稳定 Thread.sleep(100); if (responseClient != null && responseClient.isConnected()) { responseClient.subscribe(); responseSuccess = true; System.out.println("响应主题MQTT连接并订阅成功"); } } catch (MqttException e) { System.err.println("响应主题MQTT连接失败: " + e.getMessage()); if (e.getCause() != null) { System.err.println("失败原因: " + e.getCause().getMessage()); } e.printStackTrace(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("连接过程被中断"); } if (gpsSuccess && responseSuccess) { System.out.println("所有MQTT主题连接并订阅成功!"); return true; } else if (gpsSuccess || responseSuccess) { System.out.println("部分MQTT主题连接成功"); return true; } else { System.err.println("所有MQTT主题连接失败"); return false; } } catch (Exception e) { System.err.println("MQTT连接过程发生异常: " + e.getMessage()); if (e.getCause() != null) { System.err.println("异常原因: " + e.getCause().getMessage()); } e.printStackTrace(); return false; } } finally { isReconnecting = false; } } /** * 创建并连接MQTT客户端的工具方法 * @param host MQTT服务器地址 * @param topic 订阅主题 * @param clientId 客户端ID * @param qos 服务质量等级,默认2 * @return Client实例,连接失败返回null */ public static Client createAndConnect(String host, String topic, String clientId, int qos) { try { Client mqttClient = new Client(host, topic, clientId); mqttClient.connect(); mqttClient.subscribe(qos); System.out.println("MQTT客户端创建并订阅成功,主题: " + topic + ", ClientId: " + clientId); return mqttClient; } catch (MqttException e) { System.err.println("MQTT客户端创建失败: " + e.getMessage() + ", 主题: " + topic); e.printStackTrace(); return null; } } /** * 创建并连接MQTT客户端的工具方法(默认QoS为2) * @param host MQTT服务器地址 * @param topic 订阅主题 * @param clientId 客户端ID * @return Client实例,连接失败返回null */ public static Client createAndConnect(String host, String topic, String clientId) { return createAndConnect(host, topic, clientId, 2); } /** * 断开所有MQTT连接 */ public static void disconnectAll() { try { if (gpsClient != null) { gpsClient.close(); System.out.println("GPS主题MQTT连接已断开"); gpsClient = null; } if (responseClient != null) { responseClient.close(); System.out.println("响应主题MQTT连接已断开"); responseClient = null; } } catch (Exception e) { System.err.println("断开MQTT连接失败: " + e.getMessage()); e.printStackTrace(); } } /** * 获取GPS客户端实例 * @return GPS客户端实例 */ public static Client getGpsClient() { return gpsClient; } /** * 获取响应客户端实例 * @return 响应客户端实例 */ public static Client getResponseClient() { return responseClient; } /** * 检查MQTT连接状态(静态方法) * @return true表示已连接,false表示未连接 */ public static boolean areClientsConnected() { boolean gpsConnected = gpsClient != null && gpsClient.isConnected(); boolean responseConnected = responseClient != null && responseClient.isConnected(); return gpsConnected || responseConnected; } /** * 示例用法(保留向后兼容) * @deprecated 请使用 connectMQTT() 方法替代 */ @Deprecated public static void lianjiemqqt() { connectMQTT(); } }