package Mqttmessage; import Mqttmessage.Util.DeviceMessageParser; import gecaoji.gecaojistatus; import Mqttmessage.Entity.GPSData; import Mqttmessage.Entity.GPSData.StatusInfo; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import udpdell.UDPServer; public class PushCallback implements MqttCallback { private Client clientInstance; // 保存Client实例引用,用于重新订阅 private String topic; // 保存订阅的主题 private int qos; // 保存QoS等级 /** * 默认构造函数 */ public PushCallback() { } /** * 带参数的构造函数,用于保存Client实例和订阅信息 * @param clientInstance Client实例 * @param topic 订阅的主题 * @param qos QoS等级 */ public PushCallback(Client clientInstance, String topic, int qos) { this.clientInstance = clientInstance; this.topic = topic; this.qos = qos; } public void connectionLost(Throwable cause) { // 连接丢失后,记录日志并触发重连机制 if (cause != null) { System.err.println("MQTT连接断开,原因: " + cause.getMessage()); } else { System.err.println("MQTT连接断开,将自动重连..."); } // 启动重连线程,在自动重连成功后重新订阅 if (clientInstance != null && topic != null) { new Thread(() -> { reconnectAndResubscribe(); }).start(); } } /** * 重连并重新订阅 */ private void reconnectAndResubscribe() { int maxRetries = 30; // 最多重试30次 int retryCount = 0; long retryInterval = 2000; // 每次重试间隔2秒 while (retryCount < maxRetries && clientInstance != null) { try { Thread.sleep(retryInterval); // 检查是否已重连 if (clientInstance.isConnected()) { try { // 重新订阅主题 clientInstance.getClient().subscribe(topic, qos); System.out.println("MQTT重连成功,已重新订阅主题: " + topic); return; // 重连成功,退出循环 } catch (Exception e) { System.err.println("重新订阅主题失败: " + e.getMessage()); } } retryCount++; if (retryCount % 5 == 0) { System.out.println("等待MQTT自动重连... (" + retryCount + "/" + maxRetries + ")"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("重连线程被中断"); return; } } if (retryCount >= maxRetries) { System.err.println("MQTT自动重连超时,尝试手动重连..."); // 如果自动重连失败,尝试手动重连 try { if (clientInstance != null) { clientInstance.connect(); if (clientInstance.isConnected()) { clientInstance.subscribe(qos); System.out.println("手动重连成功,已重新订阅主题: " + topic); } } } catch (Exception e) { System.err.println("手动重连失败: " + e.getMessage()); } } } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 // System.out.println(message); GPSData gpsData = DeviceMessageParser.parseGPSData(new String(message.getPayload()));//解析GNSS数据 //ResponseData responseData = DeviceMessageParser.parseResponseData(new String(message.getPayload()));//解析响应数据 String gpsRaw = gpsData.getGps_raw(); UDPServer.processSerialData(gpsRaw); gecaojistatus.parseStatus(gpsData.getStatus()); } }