张世豪
15 小时以前 2eea735fd4ddf0ae047687780271ef3962d256cc
src/Mqttmessage/PushCallback.java
@@ -1,22 +1,103 @@
package Mqttmessage;
import Mqttmessage.Util.DeviceMessageParser;
import gecaoji.gecaojistatus;
import Mqttmessage.Entity.GPSData;
import Mqttmessage.Entity.GPSData.StatusInfo;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import udpdell.UDPServer;
public class PushCallback implements MqttCallback {
    private Client clientInstance;  // 保存Client实例引用,用于重新订阅
    private String topic;           // 保存订阅的主题
    private int qos;               // 保存QoS等级
    /**
     * 默认构造函数
     */
    public PushCallback() {
    }
    /**
     * 带参数的构造函数,用于保存Client实例和订阅信息
     * @param clientInstance Client实例
     * @param topic 订阅的主题
     * @param qos QoS等级
     */
    public PushCallback(Client clientInstance, String topic, int qos) {
        this.clientInstance = clientInstance;
        this.topic = topic;
        this.qos = qos;
    }
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
        // 连接丢失后,记录日志并触发重连机制
        if (cause != null) {
            System.err.println("MQTT连接断开,原因: " + cause.getMessage());
        } else {
            System.err.println("MQTT连接断开,将自动重连...");
        }
        // 启动重连线程,在自动重连成功后重新订阅
        if (clientInstance != null && topic != null) {
            new Thread(() -> {
                reconnectAndResubscribe();
            }).start();
        }
    }
    /**
     * 重连并重新订阅
     */
    private void reconnectAndResubscribe() {
        int maxRetries = 30;  // 最多重试30次
        int retryCount = 0;
        long retryInterval = 2000;  // 每次重试间隔2秒
        while (retryCount < maxRetries && clientInstance != null) {
            try {
                Thread.sleep(retryInterval);
                // 检查是否已重连
                if (clientInstance.isConnected()) {
                    try {
                        // 重新订阅主题
                        clientInstance.getClient().subscribe(topic, qos);
                        System.out.println("MQTT重连成功,已重新订阅主题: " + topic);
                        return;  // 重连成功,退出循环
                    } catch (Exception e) {
                        System.err.println("重新订阅主题失败: " + e.getMessage());
                    }
                }
                retryCount++;
                if (retryCount % 5 == 0) {
                    System.out.println("等待MQTT自动重连... (" + retryCount + "/" + maxRetries + ")");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("重连线程被中断");
                return;
            }
        }
        if (retryCount >= maxRetries) {
            System.err.println("MQTT自动重连超时,尝试手动重连...");
            // 如果自动重连失败,尝试手动重连
            try {
                if (clientInstance != null) {
                    clientInstance.connect();
                    if (clientInstance.isConnected()) {
                        clientInstance.subscribe(qos);
                        System.out.println("手动重连成功,已重新订阅主题: " + topic);
                    }
                }
            } catch (Exception e) {
                System.err.println("手动重连失败: " + e.getMessage());
            }
        }
    }
    public void deliveryComplete(IMqttDeliveryToken token) {
@@ -30,6 +111,7 @@
        //ResponseData responseData = DeviceMessageParser.parseResponseData(new String(message.getPayload()));//解析响应数据
        String gpsRaw = gpsData.getGps_raw();
        UDPServer.processSerialData(gpsRaw);
        gecaojistatus.parseStatus(gpsData.getStatus());
    }
}