| | |
| | | private String clientId; |
| | | private MqttClient client; |
| | | private MqttConnectOptions options; |
| | | private int qos = 0; // 默认QoS等级 |
| | | private Thread connectionMonitorThread; // 连接监控线程 |
| | | private volatile boolean monitoring = false; // 监控标志 |
| | | |
| | | // 静态变量用于存储客户端实例 |
| | | private static Client gpsClient; |
| | | private static Client responseClient; |
| | | // 重连标志,防止重复重连 |
| | | private static volatile boolean isReconnecting = false; |
| | | |
| | | /** |
| | | * 构造函数 |
| | |
| | | |
| | | // 使用内存持久化,避免文件锁定问题 |
| | | client = new MqttClient(host, clientId, new MemoryPersistence()); |
| | | // 先设置回调,再连接 |
| | | client.setCallback(new PushCallback()); |
| | | // 先设置回调,再连接(传入Client实例和订阅信息,用于自动重连后重新订阅) |
| | | client.setCallback(new PushCallback(this, topic, qos)); |
| | | |
| | | // 执行连接 |
| | | client.connect(options); |
| | | System.out.println("MQTT连接成功!ClientId: " + clientId + ", 服务器: " + host + ", 主题: " + topic); |
| | | |
| | | // 启动连接监控线程 |
| | | startConnectionMonitor(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @throws MqttException 订阅失败时抛出异常 |
| | | */ |
| | | public void subscribe(int qos) throws MqttException { |
| | | this.qos = qos; // 保存QoS值,用于重连后重新订阅 |
| | | if (client == null || !client.isConnected()) { |
| | | connect(); |
| | | } |
| | | client.subscribe(topic, qos); |
| | | System.out.println("已订阅主题: " + topic + ", QoS: " + qos); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @throws MqttException 订阅失败时抛出异常 |
| | | */ |
| | | public void subscribe() throws MqttException { |
| | | subscribe(2); |
| | | // 使用QoS 0确保消息可靠传递(至少一次,且仅一次) |
| | | subscribe(0); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * 启动连接监控线程,定期检查连接状态并在断开时尝试重连 |
| | | */ |
| | | private void startConnectionMonitor() { |
| | | if (monitoring) { |
| | | return; // 已经在监控中 |
| | | } |
| | | |
| | | monitoring = true; |
| | | connectionMonitorThread = new Thread(() -> { |
| | | while (monitoring && client != null) { |
| | | try { |
| | | Thread.sleep(5000); // 每5秒检查一次 |
| | | |
| | | if (client != null && !client.isConnected()) { |
| | | System.out.println("检测到MQTT连接断开,尝试重连... ClientId: " + clientId); |
| | | try { |
| | | // 尝试重新连接 |
| | | if (!client.isConnected()) { |
| | | client.reconnect(); |
| | | // 重连成功后重新订阅 |
| | | if (client.isConnected()) { |
| | | client.subscribe(topic, qos); |
| | | System.out.println("连接监控:重连成功并重新订阅主题: " + topic); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | System.err.println("连接监控:重连失败: " + e.getMessage()); |
| | | // 如果自动重连失败,尝试完全重新连接 |
| | | try { |
| | | connect(); |
| | | if (isConnected()) { |
| | | subscribe(qos); |
| | | } |
| | | } catch (Exception ex) { |
| | | System.err.println("连接监控:完全重连失败: " + ex.getMessage()); |
| | | } |
| | | } |
| | | } |
| | | } catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | break; |
| | | } catch (Exception e) { |
| | | System.err.println("连接监控线程异常: " + e.getMessage()); |
| | | } |
| | | } |
| | | }); |
| | | connectionMonitorThread.setDaemon(true); |
| | | connectionMonitorThread.setName("MQTT-ConnectionMonitor-" + clientId); |
| | | connectionMonitorThread.start(); |
| | | System.out.println("已启动MQTT连接监控线程: " + clientId); |
| | | } |
| | | |
| | | /** |
| | | * 停止连接监控线程 |
| | | */ |
| | | private void stopConnectionMonitor() { |
| | | monitoring = false; |
| | | if (connectionMonitorThread != null) { |
| | | connectionMonitorThread.interrupt(); |
| | | try { |
| | | connectionMonitorThread.join(1000); |
| | | } catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | } |
| | | connectionMonitorThread = null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 关闭客户端并释放资源 |
| | | */ |
| | | public void close() { |
| | | stopConnectionMonitor(); // 停止监控线程 |
| | | try { |
| | | if (client != null) { |
| | | if (client.isConnected()) { |
| | |
| | | * @return true表示连接成功,false表示连接失败 |
| | | */ |
| | | public static boolean connectMQTT() { |
| | | // 防止重复重连 |
| | | if (isReconnecting) { |
| | | System.out.println("MQTT正在重连中,跳过本次重连请求"); |
| | | return false; |
| | | } |
| | | |
| | | // 检查是否已经连接 |
| | | if (areClientsConnected()) { |
| | | System.out.println("MQTT客户端已连接,无需重复连接"); |
| | | return true; |
| | | } |
| | | |
| | | isReconnecting = true; |
| | | try { |
| | | // 先断开之前的连接 |
| | | disconnectAll(); |
| | | |
| | |
| | | try { |
| | | String host = "tcp://39.99.43.227:1883"; |
| | | String deiveID = Setsys.getMowerIdValue(); |
| | | if (deiveID == null || deiveID.isEmpty()) { |
| | | System.err.println("设备ID为空,无法连接MQTT"); |
| | | return false; |
| | | } |
| | | // 添加时间戳确保客户端ID唯一 |
| | | long timestamp = System.currentTimeMillis(); |
| | | String clientId = Usrdell.getUserEmail() + "mower" + "_" + timestamp; |
| | |
| | | gpsClient.connect(); |
| | | // 稍作延迟,确保连接稳定 |
| | | Thread.sleep(100); |
| | | if (gpsClient != null && gpsClient.isConnected()) { |
| | | gpsClient.subscribe(); |
| | | gpsSuccess = true; |
| | | System.out.println("GPS主题MQTT连接并订阅成功"); |
| | | } |
| | | } catch (MqttException e) { |
| | | System.err.println("GPS主题MQTT连接失败: " + e.getMessage()); |
| | | if (e.getCause() != null) { |
| | |
| | | responseClient.connect(); |
| | | // 稍作延迟,确保连接稳定 |
| | | Thread.sleep(100); |
| | | if (responseClient != null && responseClient.isConnected()) { |
| | | responseClient.subscribe(); |
| | | responseSuccess = true; |
| | | System.out.println("响应主题MQTT连接并订阅成功"); |
| | | } |
| | | } catch (MqttException e) { |
| | | System.err.println("响应主题MQTT连接失败: " + e.getMessage()); |
| | | if (e.getCause() != null) { |
| | |
| | | } |
| | | } catch (Exception e) { |
| | | System.err.println("MQTT连接过程发生异常: " + e.getMessage()); |
| | | if (e.getCause() != null) { |
| | | System.err.println("异常原因: " + e.getCause().getMessage()); |
| | | } |
| | | e.printStackTrace(); |
| | | return false; |
| | | } |
| | | } finally { |
| | | isReconnecting = false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return true表示连接成功,false表示连接失败 |
| | | */ |
| | | public static boolean connectMQTT(String host, String deviceId, String userEmail) { |
| | | // 防止重复重连 |
| | | if (isReconnecting) { |
| | | System.out.println("MQTT正在重连中,跳过本次重连请求"); |
| | | return false; |
| | | } |
| | | |
| | | // 检查是否已经连接 |
| | | if (areClientsConnected()) { |
| | | System.out.println("MQTT客户端已连接,无需重复连接"); |
| | | return true; |
| | | } |
| | | |
| | | isReconnecting = true; |
| | | try { |
| | | // 先断开之前的连接 |
| | | disconnectAll(); |
| | | |
| | |
| | | boolean responseSuccess = false; |
| | | |
| | | try { |
| | | if (deviceId == null || deviceId.isEmpty()) { |
| | | System.err.println("设备ID为空,无法连接MQTT"); |
| | | return false; |
| | | } |
| | | // 添加时间戳确保客户端ID唯一 |
| | | long timestamp = System.currentTimeMillis(); |
| | | String clientId = userEmail + "mower" + "_" + timestamp; |
| | |
| | | gpsClient.connect(); |
| | | // 稍作延迟,确保连接稳定 |
| | | Thread.sleep(100); |
| | | if (gpsClient != null && gpsClient.isConnected()) { |
| | | gpsClient.subscribe(); |
| | | gpsSuccess = true; |
| | | System.out.println("GPS主题MQTT连接并订阅成功"); |
| | | } |
| | | } catch (MqttException e) { |
| | | System.err.println("GPS主题MQTT连接失败: " + e.getMessage()); |
| | | if (e.getCause() != null) { |
| | |
| | | responseClient.connect(); |
| | | // 稍作延迟,确保连接稳定 |
| | | Thread.sleep(100); |
| | | if (responseClient != null && responseClient.isConnected()) { |
| | | responseClient.subscribe(); |
| | | responseSuccess = true; |
| | | System.out.println("响应主题MQTT连接并订阅成功"); |
| | | } |
| | | } catch (MqttException e) { |
| | | System.err.println("响应主题MQTT连接失败: " + e.getMessage()); |
| | | if (e.getCause() != null) { |
| | |
| | | } |
| | | } catch (Exception e) { |
| | | System.err.println("MQTT连接过程发生异常: " + e.getMessage()); |
| | | if (e.getCause() != null) { |
| | | System.err.println("异常原因: " + e.getCause().getMessage()); |
| | | } |
| | | e.printStackTrace(); |
| | | return false; |
| | | } |
| | | } finally { |
| | | isReconnecting = false; |
| | | } |
| | | } |
| | | |
| | | /** |