| src/Mqttmessage/Client.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| src/Mqttmessage/Entity/BasestationData.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| src/Mqttmessage/Entity/CoordinatePoint.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| src/Mqttmessage/Entity/GPSData.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| src/Mqttmessage/Entity/HxzkPathMessage.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| src/Mqttmessage/Entity/PathData.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| src/Mqttmessage/Entity/ResponseData.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| src/Mqttmessage/PushCallback.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| src/Mqttmessage/Util/DeviceMessageParser.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/Mqttmessage/Client.java
@@ -19,10 +19,15 @@ private String clientId; private MqttClient client; private MqttConnectOptions options; private int qos = 0; // 默认QoS等级 private Thread connectionMonitorThread; // 连接监控线程 private volatile boolean monitoring = false; // 监控标志 // 静态变量用于存储客户端实例 private static Client gpsClient; private static Client responseClient; // 重连标志,防止重复重连 private static volatile boolean isReconnecting = false; /** * 构造函数 @@ -68,12 +73,15 @@ // 使用内存持久化,避免文件锁定问题 client = new MqttClient(host, clientId, new MemoryPersistence()); // 先设置回调,再连接 client.setCallback(new PushCallback()); // 先设置回调,再连接(传入Client实例和订阅信息,用于自动重连后重新订阅) client.setCallback(new PushCallback(this, topic, qos)); // 执行连接 client.connect(options); System.out.println("MQTT连接成功!ClientId: " + clientId + ", 服务器: " + host + ", 主题: " + topic); // 启动连接监控线程 startConnectionMonitor(); } /** @@ -82,10 +90,12 @@ * @throws MqttException 订阅失败时抛出异常 */ public void subscribe(int qos) throws MqttException { this.qos = qos; // 保存QoS值,用于重连后重新订阅 if (client == null || !client.isConnected()) { connect(); } client.subscribe(topic, qos); System.out.println("已订阅主题: " + topic + ", QoS: " + qos); } /** @@ -93,7 +103,8 @@ * @throws MqttException 订阅失败时抛出异常 */ public void subscribe() throws MqttException { subscribe(2); // 使用QoS 0确保消息可靠传递(至少一次,且仅一次) subscribe(0); } /** @@ -107,9 +118,79 @@ } /** * 启动连接监控线程,定期检查连接状态并在断开时尝试重连 */ private void startConnectionMonitor() { if (monitoring) { return; // 已经在监控中 } monitoring = true; connectionMonitorThread = new Thread(() -> { while (monitoring && client != null) { try { Thread.sleep(5000); // 每5秒检查一次 if (client != null && !client.isConnected()) { System.out.println("检测到MQTT连接断开,尝试重连... ClientId: " + clientId); try { // 尝试重新连接 if (!client.isConnected()) { client.reconnect(); // 重连成功后重新订阅 if (client.isConnected()) { client.subscribe(topic, qos); System.out.println("连接监控:重连成功并重新订阅主题: " + topic); } } } catch (Exception e) { System.err.println("连接监控:重连失败: " + e.getMessage()); // 如果自动重连失败,尝试完全重新连接 try { connect(); if (isConnected()) { subscribe(qos); } } catch (Exception ex) { System.err.println("连接监控:完全重连失败: " + ex.getMessage()); } } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } catch (Exception e) { System.err.println("连接监控线程异常: " + e.getMessage()); } } }); connectionMonitorThread.setDaemon(true); connectionMonitorThread.setName("MQTT-ConnectionMonitor-" + clientId); connectionMonitorThread.start(); System.out.println("已启动MQTT连接监控线程: " + clientId); } /** * 停止连接监控线程 */ private void stopConnectionMonitor() { monitoring = false; if (connectionMonitorThread != null) { connectionMonitorThread.interrupt(); try { connectionMonitorThread.join(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } connectionMonitorThread = null; } } /** * 关闭客户端并释放资源 */ public void close() { stopConnectionMonitor(); // 停止监控线程 try { if (client != null) { if (client.isConnected()) { @@ -145,6 +226,20 @@ * @return true表示连接成功,false表示连接失败 */ public static boolean connectMQTT() { // 防止重复重连 if (isReconnecting) { System.out.println("MQTT正在重连中,跳过本次重连请求"); return false; } // 检查是否已经连接 if (areClientsConnected()) { System.out.println("MQTT客户端已连接,无需重复连接"); return true; } isReconnecting = true; try { // 先断开之前的连接 disconnectAll(); @@ -154,6 +249,10 @@ try { String host = "tcp://39.99.43.227:1883"; String deiveID = Setsys.getMowerIdValue(); if (deiveID == null || deiveID.isEmpty()) { System.err.println("设备ID为空,无法连接MQTT"); return false; } // 添加时间戳确保客户端ID唯一 long timestamp = System.currentTimeMillis(); String clientId = Usrdell.getUserEmail() + "mower" + "_" + timestamp; @@ -167,9 +266,11 @@ gpsClient.connect(); // 稍作延迟,确保连接稳定 Thread.sleep(100); if (gpsClient != null && gpsClient.isConnected()) { gpsClient.subscribe(); gpsSuccess = true; System.out.println("GPS主题MQTT连接并订阅成功"); } } catch (MqttException e) { System.err.println("GPS主题MQTT连接失败: " + e.getMessage()); if (e.getCause() != null) { @@ -187,9 +288,11 @@ responseClient.connect(); // 稍作延迟,确保连接稳定 Thread.sleep(100); if (responseClient != null && responseClient.isConnected()) { responseClient.subscribe(); responseSuccess = true; System.out.println("响应主题MQTT连接并订阅成功"); } } catch (MqttException e) { System.err.println("响应主题MQTT连接失败: " + e.getMessage()); if (e.getCause() != null) { @@ -213,9 +316,15 @@ } } catch (Exception e) { System.err.println("MQTT连接过程发生异常: " + e.getMessage()); if (e.getCause() != null) { System.err.println("异常原因: " + e.getCause().getMessage()); } e.printStackTrace(); return false; } } finally { isReconnecting = false; } } /** @@ -226,6 +335,20 @@ * @return true表示连接成功,false表示连接失败 */ public static boolean connectMQTT(String host, String deviceId, String userEmail) { // 防止重复重连 if (isReconnecting) { System.out.println("MQTT正在重连中,跳过本次重连请求"); return false; } // 检查是否已经连接 if (areClientsConnected()) { System.out.println("MQTT客户端已连接,无需重复连接"); return true; } isReconnecting = true; try { // 先断开之前的连接 disconnectAll(); @@ -233,6 +356,10 @@ boolean responseSuccess = false; try { if (deviceId == null || deviceId.isEmpty()) { System.err.println("设备ID为空,无法连接MQTT"); return false; } // 添加时间戳确保客户端ID唯一 long timestamp = System.currentTimeMillis(); String clientId = userEmail + "mower" + "_" + timestamp; @@ -246,9 +373,11 @@ gpsClient.connect(); // 稍作延迟,确保连接稳定 Thread.sleep(100); if (gpsClient != null && gpsClient.isConnected()) { gpsClient.subscribe(); gpsSuccess = true; System.out.println("GPS主题MQTT连接并订阅成功"); } } catch (MqttException e) { System.err.println("GPS主题MQTT连接失败: " + e.getMessage()); if (e.getCause() != null) { @@ -266,9 +395,11 @@ responseClient.connect(); // 稍作延迟,确保连接稳定 Thread.sleep(100); if (responseClient != null && responseClient.isConnected()) { responseClient.subscribe(); responseSuccess = true; System.out.println("响应主题MQTT连接并订阅成功"); } } catch (MqttException e) { System.err.println("响应主题MQTT连接失败: " + e.getMessage()); if (e.getCause() != null) { @@ -292,9 +423,15 @@ } } catch (Exception e) { System.err.println("MQTT连接过程发生异常: " + e.getMessage()); if (e.getCause() != null) { System.err.println("异常原因: " + e.getCause().getMessage()); } e.printStackTrace(); return false; } } finally { isReconnecting = false; } } /** src/Mqttmessage/Entity/BasestationData.java
@@ -1,9 +1,9 @@ package Mqttmessage.Entity; import lombok.Data; @Data public class BasestationData { private String latitude_dm; @@ -22,4 +22,44 @@ this.longitude_hemisphere = longitude_hemisphere; this.altitude = altitude; } public String getLatitude_dm() { return latitude_dm; } public void setLatitude_dm(String latitude_dm) { this.latitude_dm = latitude_dm; } public String getLatitude_hemisphere() { return latitude_hemisphere; } public void setLatitude_hemisphere(String latitude_hemisphere) { this.latitude_hemisphere = latitude_hemisphere; } public String getLongitude_dm() { return longitude_dm; } public void setLongitude_dm(String longitude_dm) { this.longitude_dm = longitude_dm; } public String getLongitude_hemisphere() { return longitude_hemisphere; } public void setLongitude_hemisphere(String longitude_hemisphere) { this.longitude_hemisphere = longitude_hemisphere; } public double getAltitude() { return altitude; } public void setAltitude(double altitude) { this.altitude = altitude; } } src/Mqttmessage/Entity/CoordinatePoint.java
@@ -1,9 +1,8 @@ package Mqttmessage.Entity; import lombok.Data; @Data public class CoordinatePoint { private String x; // X坐标 private String y; // Y坐标 @@ -15,4 +14,20 @@ this.x = x; this.y = y; } public String getX() { return x; } public void setX(String x) { this.x = x; } public String getY() { return y; } public void setY(String y) { this.y = y; } } src/Mqttmessage/Entity/GPSData.java
@@ -1,13 +1,5 @@ package Mqttmessage.Entity; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class GPSData{ // JSON中的原始字段 private String msg_id; // 消息唯一标识 @@ -26,6 +18,27 @@ private GGAData ggaData; /** * 默认构造函数 */ public GPSData() { } /** * 全参构造函数 */ public GPSData(String msg_id, Long timestamp, String device_id, String data_type, String gps_raw, IMUData imu_data, StatusInfo status, GGAData ggaData) { this.msg_id = msg_id; this.timestamp = timestamp; this.device_id = device_id; this.data_type = data_type; this.gps_raw = gps_raw; this.imu_data = imu_data; this.status = status; this.ggaData = ggaData; } /** * 获取原始GPS数据 * @return 原始GPS数据字符串 */ @@ -50,15 +63,19 @@ /** * IMU数据内部类(简化版,只包含角度信息) */ @Data @NoArgsConstructor public static class IMUData { private Double roll; // 横滚角 角度 private Double pitch; // 俯仰角 角度 private Double yaw; // 偏航角 角度 /** * 带参数的构造函数 * 默认构造函数 */ public IMUData() { } /** * 全参构造函数 * @param roll 横滚角 角度 * @param pitch 俯仰角 角度 * @param yaw 偏航角 角度 @@ -68,13 +85,35 @@ this.pitch = pitch; this.yaw = yaw; } public Double getRoll() { return roll; } public void setRoll(Double roll) { this.roll = roll; } public Double getPitch() { return pitch; } public void setPitch(Double pitch) { this.pitch = pitch; } public Double getYaw() { return yaw; } public void setYaw(Double yaw) { this.yaw = yaw; } } /** * 状态信息内部类(与StatusData中的StatusInfo保持一致) */ @Data @NoArgsConstructor public static class StatusInfo { private Integer battery_level; // 电池电量百分比 private Double battery_voltage; // 电池电压 @@ -88,6 +127,12 @@ private String path_id_saved; // 存储的路径ID /** * 默认构造函数 */ public StatusInfo() { } /** * 带参数的构造函数 * @param battery_level 电池电量百分比 * @param battery_voltage 电池电压 @@ -199,9 +244,6 @@ /** * GGA数据解析类(可选,用于存储解析后的GGA数据) */ @Data @NoArgsConstructor @AllArgsConstructor public static class GGAData { private String utcTime; // UTC时间 private String latitude; // 纬度(原始度分格式) @@ -217,6 +259,147 @@ private String geoidSepUnit;// 大地水准面分离单位 private String age; // 差分GPS数据期限 private String stationId; // 差分参考基站标号 /** * 默认构造函数 */ public GGAData() { } /** * 全参构造函数 */ public GGAData(String utcTime, String latitude, String latitudeDir, String longitude, String longitudeDir, Integer gpsQuality, Integer satellites, Double hdop, Double altitude, String altitudeUnit, Double geoidSep, String geoidSepUnit, String age, String stationId) { this.utcTime = utcTime; this.latitude = latitude; this.latitudeDir = latitudeDir; this.longitude = longitude; this.longitudeDir = longitudeDir; this.gpsQuality = gpsQuality; this.satellites = satellites; this.hdop = hdop; this.altitude = altitude; this.altitudeUnit = altitudeUnit; this.geoidSep = geoidSep; this.geoidSepUnit = geoidSepUnit; this.age = age; this.stationId = stationId; } public String getUtcTime() { return utcTime; } public void setUtcTime(String utcTime) { this.utcTime = utcTime; } public String getLatitude() { return latitude; } public void setLatitude(String latitude) { this.latitude = latitude; } public String getLatitudeDir() { return latitudeDir; } public void setLatitudeDir(String latitudeDir) { this.latitudeDir = latitudeDir; } public String getLongitude() { return longitude; } public void setLongitude(String longitude) { this.longitude = longitude; } public String getLongitudeDir() { return longitudeDir; } public void setLongitudeDir(String longitudeDir) { this.longitudeDir = longitudeDir; } public Integer getGpsQuality() { return gpsQuality; } public void setGpsQuality(Integer gpsQuality) { this.gpsQuality = gpsQuality; } public Integer getSatellites() { return satellites; } public void setSatellites(Integer satellites) { this.satellites = satellites; } public Double getHdop() { return hdop; } public void setHdop(Double hdop) { this.hdop = hdop; } public Double getAltitude() { return altitude; } public void setAltitude(Double altitude) { this.altitude = altitude; } public String getAltitudeUnit() { return altitudeUnit; } public void setAltitudeUnit(String altitudeUnit) { this.altitudeUnit = altitudeUnit; } public Double getGeoidSep() { return geoidSep; } public void setGeoidSep(Double geoidSep) { this.geoidSep = geoidSep; } public String getGeoidSepUnit() { return geoidSepUnit; } public void setGeoidSepUnit(String geoidSepUnit) { this.geoidSepUnit = geoidSepUnit; } public String getAge() { return age; } public void setAge(String age) { this.age = age; } public String getStationId() { return stationId; } public void setStationId(String stationId) { this.stationId = stationId; } } public String getMsg_id() { src/Mqttmessage/Entity/HxzkPathMessage.java
@@ -1,9 +1,9 @@ package Mqttmessage.Entity; import lombok.Data; @Data public class HxzkPathMessage { private String msg_id; private long timestamp; @@ -25,4 +25,59 @@ this.path_data = path_data; } public String getMsg_id() { return msg_id; } public void setMsg_id(String msg_id) { this.msg_id = msg_id; } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } public String getUser_id() { return user_id; } public void setUser_id(String user_id) { this.user_id = user_id; } public String getDevice_id() { return device_id; } public void setDevice_id(String device_id) { this.device_id = device_id; } public String getCommand() { return command; } public void setCommand(String command) { this.command = command; } public BasestationData getBasestation_data() { return basestation_data; } public void setBasestation_data(BasestationData basestation_data) { this.basestation_data = basestation_data; } public PathData getPath_data() { return path_data; } public void setPath_data(PathData path_data) { this.path_data = path_data; } } src/Mqttmessage/Entity/PathData.java
@@ -5,7 +5,7 @@ import java.util.List; @Data public class PathData { private String path_id; src/Mqttmessage/Entity/ResponseData.java
@@ -2,14 +2,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; import java.util.Map; /** * 设备响应消息实体类 */ @Data public class ResponseData { @JsonProperty("msg_id") @@ -37,10 +37,50 @@ this.response = response; } public String getMsgId() { return msgId; } public void setMsgId(String msgId) { this.msgId = msgId; } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } public String getDeviceId() { return deviceId; } public void setDeviceId(String deviceId) { this.deviceId = deviceId; } public String getOriginalMsgId() { return originalMsgId; } public void setOriginalMsgId(String originalMsgId) { this.originalMsgId = originalMsgId; } public ResponseInfo getResponse() { return response; } public void setResponse(ResponseInfo response) { this.response = response; } /** * 响应信息内部类 */ @Data public static class ResponseInfo { private String status; private String command; @@ -66,6 +106,44 @@ this.additionalInfo = additionalInfo; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getCommand() { return command; } public void setCommand(String command) { this.command = command; } public int getErrorCode() { return errorCode; } public void setErrorCode(int errorCode) { this.errorCode = errorCode; } public String getErrorMessage() { return errorMessage; } public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; } public Map<String, Object> getAdditionalInfo() { return additionalInfo; } public void setAdditionalInfo(Map<String, Object> additionalInfo) { this.additionalInfo = additionalInfo; } } } src/Mqttmessage/PushCallback.java
@@ -10,11 +10,94 @@ import udpdell.UDPServer; public class PushCallback implements MqttCallback { private Client clientInstance; // 保存Client实例引用,用于重新订阅 private String topic; // 保存订阅的主题 private int qos; // 保存QoS等级 /** * 默认构造函数 */ public PushCallback() { } /** * 带参数的构造函数,用于保存Client实例和订阅信息 * @param clientInstance Client实例 * @param topic 订阅的主题 * @param qos QoS等级 */ public PushCallback(Client clientInstance, String topic, int qos) { this.clientInstance = clientInstance; this.topic = topic; this.qos = qos; } public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 Client.lianjiemqqt(); // 连接丢失后,记录日志并触发重连机制 if (cause != null) { System.err.println("MQTT连接断开,原因: " + cause.getMessage()); } else { System.err.println("MQTT连接断开,将自动重连..."); } // 启动重连线程,在自动重连成功后重新订阅 if (clientInstance != null && topic != null) { new Thread(() -> { reconnectAndResubscribe(); }).start(); } } /** * 重连并重新订阅 */ private void reconnectAndResubscribe() { int maxRetries = 30; // 最多重试30次 int retryCount = 0; long retryInterval = 2000; // 每次重试间隔2秒 while (retryCount < maxRetries && clientInstance != null) { try { Thread.sleep(retryInterval); // 检查是否已重连 if (clientInstance.isConnected()) { try { // 重新订阅主题 clientInstance.getClient().subscribe(topic, qos); System.out.println("MQTT重连成功,已重新订阅主题: " + topic); return; // 重连成功,退出循环 } catch (Exception e) { System.err.println("重新订阅主题失败: " + e.getMessage()); } } retryCount++; if (retryCount % 5 == 0) { System.out.println("等待MQTT自动重连... (" + retryCount + "/" + maxRetries + ")"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("重连线程被中断"); return; } } if (retryCount >= maxRetries) { System.err.println("MQTT自动重连超时,尝试手动重连..."); // 如果自动重连失败,尝试手动重连 try { if (clientInstance != null) { clientInstance.connect(); if (clientInstance.isConnected()) { clientInstance.subscribe(qos); System.out.println("手动重连成功,已重新订阅主题: " + topic); } } } catch (Exception e) { System.err.println("手动重连失败: " + e.getMessage()); } } } public void deliveryComplete(IMqttDeliveryToken token) { src/Mqttmessage/Util/DeviceMessageParser.java
@@ -50,68 +50,7 @@ return objectMapper.readValue(jsonStr, ResponseData.class); } /** * 解析GGA格式的GPS数据(可选功能) * @param ggaString GGA格式字符串 * @return GGAData对象 */ /*public static GPSData.GGAData parseGGA(String ggaString) { // 移除可能的校验和部分 String cleanString = ggaString.split("\\*")[0]; String[] fields = cleanString.split(","); if (fields.length < 15) { return null; } GPSData.GGAData ggaData = new GPSData.GGAData(); try { ggaData.setUtcTime(fields[1]); ggaData.setLatitude(fields[2]); ggaData.setLatitudeDir(fields[3]); ggaData.setLongitude(fields[4]); ggaData.setLongitudeDir(fields[5]); if (!fields[6].isEmpty()) { ggaData.setGpsQuality(Integer.parseInt(fields[6])); } if (!fields[7].isEmpty()) { ggaData.setSatellites(Integer.parseInt(fields[7])); } if (!fields[8].isEmpty()) { ggaData.setHdop(Double.parseDouble(fields[8])); } if (!fields[9].isEmpty()) { ggaData.setAltitude(Double.parseDouble(fields[9])); } ggaData.setAltitudeUnit(fields[10]); if (!fields[11].isEmpty()) { ggaData.setGeoidSep(Double.parseDouble(fields[11])); } ggaData.setGeoidSepUnit(fields[12]); if (fields.length > 13) { ggaData.setAge(fields[13]); } if (fields.length > 14) { ggaData.setStationId(fields[14]); } } catch (Exception e) { // 解析异常,返回部分解析的数据或null System.err.println("解析GGA数据时出错: " + e.getMessage()); } return ggaData; }*/ /** * 将消息对象转换为JSON字符串