package Mqttmessage;
|
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
import set.Setsys;
|
import user.Usrdell;
|
|
/**
|
* MQTT客户端工具类
|
* 用于连接MQTT服务器并订阅主题
|
*/
|
public class Client {
|
|
private String host;
|
private String topic;
|
private String clientId;
|
private MqttClient client;
|
private MqttConnectOptions options;
|
private int qos = 0; // 默认QoS等级
|
private Thread connectionMonitorThread; // 连接监控线程
|
private volatile boolean monitoring = false; // 监控标志
|
|
// 静态变量用于存储客户端实例
|
private static Client gpsClient;
|
private static Client responseClient;
|
// 重连标志,防止重复重连
|
private static volatile boolean isReconnecting = false;
|
|
/**
|
* 构造函数
|
* @param host MQTT服务器地址,格式:tcp://ip:port
|
* @param topic 订阅的主题
|
* @param clientId 客户端ID,不能重复
|
*/
|
public Client(String host, String topic, String clientId) {
|
this.host = host;
|
this.topic = topic;
|
this.clientId = clientId;
|
this.options = new MqttConnectOptions();
|
this.options.setCleanSession(true);
|
// 设置连接超时时间(秒)
|
this.options.setConnectionTimeout(30);
|
// 设置KeepAlive间隔(秒),用于保持连接活跃
|
this.options.setKeepAliveInterval(60);
|
// 设置自动重连
|
this.options.setAutomaticReconnect(true);
|
// 设置MQTT版本,使用3.1.1
|
this.options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
|
}
|
|
/**
|
* 连接到MQTT服务器
|
* @throws MqttException 连接失败时抛出异常
|
*/
|
public void connect() throws MqttException {
|
if (client != null && client.isConnected()) {
|
System.out.println("MQTT客户端已连接,ClientId: " + clientId);
|
return;
|
}
|
|
// 如果客户端已存在但未连接,先关闭
|
if (client != null) {
|
try {
|
client.close();
|
} catch (Exception e) {
|
// 忽略关闭时的异常
|
}
|
client = null;
|
}
|
|
// 使用内存持久化,避免文件锁定问题
|
client = new MqttClient(host, clientId, new MemoryPersistence());
|
// 先设置回调,再连接(传入Client实例和订阅信息,用于自动重连后重新订阅)
|
client.setCallback(new PushCallback(this, topic, qos));
|
|
// 执行连接
|
client.connect(options);
|
System.out.println("MQTT连接成功!ClientId: " + clientId + ", 服务器: " + host + ", 主题: " + topic);
|
|
// 启动连接监控线程
|
startConnectionMonitor();
|
}
|
|
/**
|
* 订阅主题
|
* @param qos 服务质量等级,0-2
|
* @throws MqttException 订阅失败时抛出异常
|
*/
|
public void subscribe(int qos) throws MqttException {
|
this.qos = qos; // 保存QoS值,用于重连后重新订阅
|
if (client == null || !client.isConnected()) {
|
connect();
|
}
|
client.subscribe(topic, qos);
|
System.out.println("已订阅主题: " + topic + ", QoS: " + qos);
|
}
|
|
/**
|
* 订阅主题(默认QoS为2)
|
* @throws MqttException 订阅失败时抛出异常
|
*/
|
public void subscribe() throws MqttException {
|
// 使用QoS 0确保消息可靠传递(至少一次,且仅一次)
|
subscribe(0);
|
}
|
|
/**
|
* 断开连接
|
* @throws MqttException 断开连接失败时抛出异常
|
*/
|
public void disconnect() throws MqttException {
|
if (client != null && client.isConnected()) {
|
client.disconnect();
|
}
|
}
|
|
/**
|
* 启动连接监控线程,定期检查连接状态并在断开时尝试重连
|
*/
|
private void startConnectionMonitor() {
|
if (monitoring) {
|
return; // 已经在监控中
|
}
|
|
monitoring = true;
|
connectionMonitorThread = new Thread(() -> {
|
while (monitoring && client != null) {
|
try {
|
Thread.sleep(5000); // 每5秒检查一次
|
|
if (client != null && !client.isConnected()) {
|
System.out.println("检测到MQTT连接断开,尝试重连... ClientId: " + clientId);
|
try {
|
// 尝试重新连接
|
if (!client.isConnected()) {
|
client.reconnect();
|
// 重连成功后重新订阅
|
if (client.isConnected()) {
|
client.subscribe(topic, qos);
|
System.out.println("连接监控:重连成功并重新订阅主题: " + topic);
|
}
|
}
|
} catch (Exception e) {
|
System.err.println("连接监控:重连失败: " + e.getMessage());
|
// 如果自动重连失败,尝试完全重新连接
|
try {
|
connect();
|
if (isConnected()) {
|
subscribe(qos);
|
}
|
} catch (Exception ex) {
|
System.err.println("连接监控:完全重连失败: " + ex.getMessage());
|
}
|
}
|
}
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
break;
|
} catch (Exception e) {
|
System.err.println("连接监控线程异常: " + e.getMessage());
|
}
|
}
|
});
|
connectionMonitorThread.setDaemon(true);
|
connectionMonitorThread.setName("MQTT-ConnectionMonitor-" + clientId);
|
connectionMonitorThread.start();
|
System.out.println("已启动MQTT连接监控线程: " + clientId);
|
}
|
|
/**
|
* 停止连接监控线程
|
*/
|
private void stopConnectionMonitor() {
|
monitoring = false;
|
if (connectionMonitorThread != null) {
|
connectionMonitorThread.interrupt();
|
try {
|
connectionMonitorThread.join(1000);
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
}
|
connectionMonitorThread = null;
|
}
|
}
|
|
/**
|
* 关闭客户端并释放资源
|
*/
|
public void close() {
|
stopConnectionMonitor(); // 停止监控线程
|
try {
|
if (client != null) {
|
if (client.isConnected()) {
|
client.disconnect();
|
}
|
client.close();
|
client = null;
|
}
|
} catch (Exception e) {
|
// 忽略关闭时的异常
|
}
|
}
|
|
/**
|
* 检查是否已连接
|
* @return true表示已连接,false表示未连接
|
*/
|
public boolean isConnected() {
|
return client != null && client.isConnected();
|
}
|
|
/**
|
* 获取MQTT客户端实例
|
* @return MqttClient实例
|
*/
|
public MqttClient getClient() {
|
return client;
|
}
|
|
/**
|
* 连接MQTT服务器的工具方法
|
* 供其他类直接调用,连接GPS主题和响应主题
|
* @return true表示连接成功,false表示连接失败
|
*/
|
public static boolean connectMQTT() {
|
// 防止重复重连
|
if (isReconnecting) {
|
System.out.println("MQTT正在重连中,跳过本次重连请求");
|
return false;
|
}
|
|
// 检查是否已经连接
|
if (areClientsConnected()) {
|
System.out.println("MQTT客户端已连接,无需重复连接");
|
return true;
|
}
|
|
isReconnecting = true;
|
try {
|
// 先断开之前的连接
|
disconnectAll();
|
|
boolean gpsSuccess = false;
|
boolean responseSuccess = false;
|
|
try {
|
String host = "tcp://39.99.43.227:1883";
|
String deiveID = Setsys.getMowerIdValue();
|
if (deiveID == null || deiveID.isEmpty()) {
|
System.err.println("设备ID为空,无法连接MQTT");
|
return false;
|
}
|
// 添加时间戳确保客户端ID唯一
|
long timestamp = System.currentTimeMillis();
|
String clientId = Usrdell.getUserEmail() + "mower" + "_" + timestamp;
|
String clientId2 = Usrdell.getUserEmail() + "response" + "_" + timestamp;
|
String topic = "mower/" + deiveID + "/gps";
|
String topic2 = "mower/" + deiveID + "/response";
|
|
// 连接GPS主题
|
try {
|
gpsClient = new Client(host, topic, clientId);
|
gpsClient.connect();
|
// 稍作延迟,确保连接稳定
|
Thread.sleep(100);
|
if (gpsClient != null && gpsClient.isConnected()) {
|
gpsClient.subscribe();
|
gpsSuccess = true;
|
System.out.println("GPS主题MQTT连接并订阅成功");
|
}
|
} catch (MqttException e) {
|
System.err.println("GPS主题MQTT连接失败: " + e.getMessage());
|
if (e.getCause() != null) {
|
System.err.println("失败原因: " + e.getCause().getMessage());
|
}
|
e.printStackTrace();
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
System.err.println("连接过程被中断");
|
}
|
|
// 连接响应主题
|
try {
|
responseClient = new Client(host, topic2, clientId2);
|
responseClient.connect();
|
// 稍作延迟,确保连接稳定
|
Thread.sleep(100);
|
if (responseClient != null && responseClient.isConnected()) {
|
responseClient.subscribe();
|
responseSuccess = true;
|
System.out.println("响应主题MQTT连接并订阅成功");
|
}
|
} catch (MqttException e) {
|
System.err.println("响应主题MQTT连接失败: " + e.getMessage());
|
if (e.getCause() != null) {
|
System.err.println("失败原因: " + e.getCause().getMessage());
|
}
|
e.printStackTrace();
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
System.err.println("连接过程被中断");
|
}
|
|
if (gpsSuccess && responseSuccess) {
|
System.out.println("所有MQTT主题连接并订阅成功!");
|
return true;
|
} else if (gpsSuccess || responseSuccess) {
|
System.out.println("部分MQTT主题连接成功");
|
return true;
|
} else {
|
System.err.println("所有MQTT主题连接失败");
|
return false;
|
}
|
} catch (Exception e) {
|
System.err.println("MQTT连接过程发生异常: " + e.getMessage());
|
if (e.getCause() != null) {
|
System.err.println("异常原因: " + e.getCause().getMessage());
|
}
|
e.printStackTrace();
|
return false;
|
}
|
} finally {
|
isReconnecting = false;
|
}
|
}
|
|
/**
|
* 连接MQTT服务器的工具方法(带参数版本)
|
* @param host MQTT服务器地址,格式:tcp://ip:port
|
* @param deviceId 设备ID
|
* @param userEmail 用户邮箱
|
* @return true表示连接成功,false表示连接失败
|
*/
|
public static boolean connectMQTT(String host, String deviceId, String userEmail) {
|
// 防止重复重连
|
if (isReconnecting) {
|
System.out.println("MQTT正在重连中,跳过本次重连请求");
|
return false;
|
}
|
|
// 检查是否已经连接
|
if (areClientsConnected()) {
|
System.out.println("MQTT客户端已连接,无需重复连接");
|
return true;
|
}
|
|
isReconnecting = true;
|
try {
|
// 先断开之前的连接
|
disconnectAll();
|
|
boolean gpsSuccess = false;
|
boolean responseSuccess = false;
|
|
try {
|
if (deviceId == null || deviceId.isEmpty()) {
|
System.err.println("设备ID为空,无法连接MQTT");
|
return false;
|
}
|
// 添加时间戳确保客户端ID唯一
|
long timestamp = System.currentTimeMillis();
|
String clientId = userEmail + "mower" + "_" + timestamp;
|
String clientId2 = userEmail + "response" + "_" + timestamp;
|
String topic = "mower/" + deviceId + "/gps";
|
String topic2 = "mower/" + deviceId + "/response";
|
|
// 连接GPS主题
|
try {
|
gpsClient = new Client(host, topic, clientId);
|
gpsClient.connect();
|
// 稍作延迟,确保连接稳定
|
Thread.sleep(100);
|
if (gpsClient != null && gpsClient.isConnected()) {
|
gpsClient.subscribe();
|
gpsSuccess = true;
|
System.out.println("GPS主题MQTT连接并订阅成功");
|
}
|
} catch (MqttException e) {
|
System.err.println("GPS主题MQTT连接失败: " + e.getMessage());
|
if (e.getCause() != null) {
|
System.err.println("失败原因: " + e.getCause().getMessage());
|
}
|
e.printStackTrace();
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
System.err.println("连接过程被中断");
|
}
|
|
// 连接响应主题
|
try {
|
responseClient = new Client(host, topic2, clientId2);
|
responseClient.connect();
|
// 稍作延迟,确保连接稳定
|
Thread.sleep(100);
|
if (responseClient != null && responseClient.isConnected()) {
|
responseClient.subscribe();
|
responseSuccess = true;
|
System.out.println("响应主题MQTT连接并订阅成功");
|
}
|
} catch (MqttException e) {
|
System.err.println("响应主题MQTT连接失败: " + e.getMessage());
|
if (e.getCause() != null) {
|
System.err.println("失败原因: " + e.getCause().getMessage());
|
}
|
e.printStackTrace();
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
System.err.println("连接过程被中断");
|
}
|
|
if (gpsSuccess && responseSuccess) {
|
System.out.println("所有MQTT主题连接并订阅成功!");
|
return true;
|
} else if (gpsSuccess || responseSuccess) {
|
System.out.println("部分MQTT主题连接成功");
|
return true;
|
} else {
|
System.err.println("所有MQTT主题连接失败");
|
return false;
|
}
|
} catch (Exception e) {
|
System.err.println("MQTT连接过程发生异常: " + e.getMessage());
|
if (e.getCause() != null) {
|
System.err.println("异常原因: " + e.getCause().getMessage());
|
}
|
e.printStackTrace();
|
return false;
|
}
|
} finally {
|
isReconnecting = false;
|
}
|
}
|
|
/**
|
* 创建并连接MQTT客户端的工具方法
|
* @param host MQTT服务器地址
|
* @param topic 订阅主题
|
* @param clientId 客户端ID
|
* @param qos 服务质量等级,默认2
|
* @return Client实例,连接失败返回null
|
*/
|
public static Client createAndConnect(String host, String topic, String clientId, int qos) {
|
try {
|
Client mqttClient = new Client(host, topic, clientId);
|
mqttClient.connect();
|
mqttClient.subscribe(qos);
|
System.out.println("MQTT客户端创建并订阅成功,主题: " + topic + ", ClientId: " + clientId);
|
return mqttClient;
|
} catch (MqttException e) {
|
System.err.println("MQTT客户端创建失败: " + e.getMessage() + ", 主题: " + topic);
|
e.printStackTrace();
|
return null;
|
}
|
}
|
|
/**
|
* 创建并连接MQTT客户端的工具方法(默认QoS为2)
|
* @param host MQTT服务器地址
|
* @param topic 订阅主题
|
* @param clientId 客户端ID
|
* @return Client实例,连接失败返回null
|
*/
|
public static Client createAndConnect(String host, String topic, String clientId) {
|
return createAndConnect(host, topic, clientId, 2);
|
}
|
|
/**
|
* 断开所有MQTT连接
|
*/
|
public static void disconnectAll() {
|
try {
|
if (gpsClient != null) {
|
gpsClient.close();
|
System.out.println("GPS主题MQTT连接已断开");
|
gpsClient = null;
|
}
|
if (responseClient != null) {
|
responseClient.close();
|
System.out.println("响应主题MQTT连接已断开");
|
responseClient = null;
|
}
|
} catch (Exception e) {
|
System.err.println("断开MQTT连接失败: " + e.getMessage());
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 获取GPS客户端实例
|
* @return GPS客户端实例
|
*/
|
public static Client getGpsClient() {
|
return gpsClient;
|
}
|
|
/**
|
* 获取响应客户端实例
|
* @return 响应客户端实例
|
*/
|
public static Client getResponseClient() {
|
return responseClient;
|
}
|
|
/**
|
* 检查MQTT连接状态(静态方法)
|
* @return true表示已连接,false表示未连接
|
*/
|
public static boolean areClientsConnected() {
|
boolean gpsConnected = gpsClient != null && gpsClient.isConnected();
|
boolean responseConnected = responseClient != null && responseClient.isConnected();
|
return gpsConnected || responseConnected;
|
}
|
|
/**
|
* 示例用法(保留向后兼容)
|
* @deprecated 请使用 connectMQTT() 方法替代
|
*/
|
@Deprecated
|
public static void lianjiemqqt() {
|
connectMQTT();
|
}
|
}
|