package Mqttmessage;
|
import Mqttmessage.Util.DeviceMessageParser;
|
import gecaoji.gecaojistatus;
|
import Mqttmessage.Entity.GPSData;
|
import Mqttmessage.Entity.GPSData.StatusInfo;
|
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
import udpdell.UDPServer;
|
public class PushCallback implements MqttCallback {
|
|
private Client clientInstance; // 保存Client实例引用,用于重新订阅
|
private String topic; // 保存订阅的主题
|
private int qos; // 保存QoS等级
|
|
/**
|
* 默认构造函数
|
*/
|
public PushCallback() {
|
}
|
|
/**
|
* 带参数的构造函数,用于保存Client实例和订阅信息
|
* @param clientInstance Client实例
|
* @param topic 订阅的主题
|
* @param qos QoS等级
|
*/
|
public PushCallback(Client clientInstance, String topic, int qos) {
|
this.clientInstance = clientInstance;
|
this.topic = topic;
|
this.qos = qos;
|
}
|
|
public void connectionLost(Throwable cause) {
|
// 连接丢失后,记录日志并触发重连机制
|
if (cause != null) {
|
System.err.println("MQTT连接断开,原因: " + cause.getMessage());
|
} else {
|
System.err.println("MQTT连接断开,将自动重连...");
|
}
|
|
// 启动重连线程,在自动重连成功后重新订阅
|
if (clientInstance != null && topic != null) {
|
new Thread(() -> {
|
reconnectAndResubscribe();
|
}).start();
|
}
|
}
|
|
/**
|
* 重连并重新订阅
|
*/
|
private void reconnectAndResubscribe() {
|
int maxRetries = 30; // 最多重试30次
|
int retryCount = 0;
|
long retryInterval = 2000; // 每次重试间隔2秒
|
|
while (retryCount < maxRetries && clientInstance != null) {
|
try {
|
Thread.sleep(retryInterval);
|
|
// 检查是否已重连
|
if (clientInstance.isConnected()) {
|
try {
|
// 重新订阅主题
|
clientInstance.getClient().subscribe(topic, qos);
|
System.out.println("MQTT重连成功,已重新订阅主题: " + topic);
|
return; // 重连成功,退出循环
|
} catch (Exception e) {
|
System.err.println("重新订阅主题失败: " + e.getMessage());
|
}
|
}
|
|
retryCount++;
|
if (retryCount % 5 == 0) {
|
System.out.println("等待MQTT自动重连... (" + retryCount + "/" + maxRetries + ")");
|
}
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
System.err.println("重连线程被中断");
|
return;
|
}
|
}
|
|
if (retryCount >= maxRetries) {
|
System.err.println("MQTT自动重连超时,尝试手动重连...");
|
// 如果自动重连失败,尝试手动重连
|
try {
|
if (clientInstance != null) {
|
clientInstance.connect();
|
if (clientInstance.isConnected()) {
|
clientInstance.subscribe(qos);
|
System.out.println("手动重连成功,已重新订阅主题: " + topic);
|
}
|
}
|
} catch (Exception e) {
|
System.err.println("手动重连失败: " + e.getMessage());
|
}
|
}
|
}
|
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
System.out.println("deliveryComplete---------" + token.isComplete());
|
}
|
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
// subscribe后得到的消息会执行到这里面
|
// System.out.println(message);
|
GPSData gpsData = DeviceMessageParser.parseGPSData(new String(message.getPayload()));//解析GNSS数据
|
//ResponseData responseData = DeviceMessageParser.parseResponseData(new String(message.getPayload()));//解析响应数据
|
String gpsRaw = gpsData.getGps_raw();
|
UDPServer.processSerialData(gpsRaw);
|
gecaojistatus.parseStatus(gpsData.getStatus());
|
}
|
|
}
|