| | |
| | | package Mqttmessage; |
| | | |
| | | |
| | | |
| | | import Mqttmessage.Util.DeviceMessageParser; |
| | | import gecaoji.gecaojistatus; |
| | | import Mqttmessage.Entity.GPSData; |
| | | import Mqttmessage.Entity.GPSData.StatusInfo; |
| | | |
| | | import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; |
| | | import org.eclipse.paho.client.mqttv3.MqttCallback; |
| | | import org.eclipse.paho.client.mqttv3.MqttMessage; |
| | | import udpdell.UDPServer; |
| | | |
| | | |
| | | public class PushCallback implements MqttCallback { |
| | | |
| | | |
| | | private Client clientInstance; // 保存Client实例引用,用于重新订阅 |
| | | private String topic; // 保存订阅的主题 |
| | | private int qos; // 保存QoS等级 |
| | | |
| | | /** |
| | | * 默认构造函数 |
| | | */ |
| | | public PushCallback() { |
| | | } |
| | | |
| | | /** |
| | | * 带参数的构造函数,用于保存Client实例和订阅信息 |
| | | * @param clientInstance Client实例 |
| | | * @param topic 订阅的主题 |
| | | * @param qos QoS等级 |
| | | */ |
| | | public PushCallback(Client clientInstance, String topic, int qos) { |
| | | this.clientInstance = clientInstance; |
| | | this.topic = topic; |
| | | this.qos = qos; |
| | | } |
| | | |
| | | public void connectionLost(Throwable cause) { |
| | | // 连接丢失后,一般在这里面进行重连 |
| | | System.out.println("连接断开,可以做重连"); |
| | | |
| | | // 连接丢失后,记录日志并触发重连机制 |
| | | if (cause != null) { |
| | | System.err.println("MQTT连接断开,原因: " + cause.getMessage()); |
| | | } else { |
| | | System.err.println("MQTT连接断开,将自动重连..."); |
| | | } |
| | | |
| | | // 启动重连线程,在自动重连成功后重新订阅 |
| | | if (clientInstance != null && topic != null) { |
| | | new Thread(() -> { |
| | | reconnectAndResubscribe(); |
| | | }).start(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 重连并重新订阅 |
| | | */ |
| | | private void reconnectAndResubscribe() { |
| | | int maxRetries = 30; // 最多重试30次 |
| | | int retryCount = 0; |
| | | long retryInterval = 2000; // 每次重试间隔2秒 |
| | | |
| | | while (retryCount < maxRetries && clientInstance != null) { |
| | | try { |
| | | Thread.sleep(retryInterval); |
| | | |
| | | // 检查是否已重连 |
| | | if (clientInstance.isConnected()) { |
| | | try { |
| | | // 重新订阅主题 |
| | | clientInstance.getClient().subscribe(topic, qos); |
| | | System.out.println("MQTT重连成功,已重新订阅主题: " + topic); |
| | | return; // 重连成功,退出循环 |
| | | } catch (Exception e) { |
| | | System.err.println("重新订阅主题失败: " + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | retryCount++; |
| | | if (retryCount % 5 == 0) { |
| | | System.out.println("等待MQTT自动重连... (" + retryCount + "/" + maxRetries + ")"); |
| | | } |
| | | } catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | System.err.println("重连线程被中断"); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | if (retryCount >= maxRetries) { |
| | | System.err.println("MQTT自动重连超时,尝试手动重连..."); |
| | | // 如果自动重连失败,尝试手动重连 |
| | | try { |
| | | if (clientInstance != null) { |
| | | clientInstance.connect(); |
| | | if (clientInstance.isConnected()) { |
| | | clientInstance.subscribe(qos); |
| | | System.out.println("手动重连成功,已重新订阅主题: " + topic); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | System.err.println("手动重连失败: " + e.getMessage()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | public void deliveryComplete(IMqttDeliveryToken token) { |
| | |
| | | //ResponseData responseData = DeviceMessageParser.parseResponseData(new String(message.getPayload()));//解析响应数据 |
| | | String gpsRaw = gpsData.getGps_raw(); |
| | | UDPServer.processSerialData(gpsRaw); |
| | | gecaojistatus.parseStatus(gpsData.getStatus()); |
| | | } |
| | | |
| | | } |