| | |
| | | import org.eclipse.paho.client.mqttv3.MqttClient; |
| | | import org.eclipse.paho.client.mqttv3.MqttConnectOptions; |
| | | import org.eclipse.paho.client.mqttv3.MqttException; |
| | | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
| | | |
| | | import set.Setsys; |
| | | import user.Usrdell; |
| | |
| | | private MqttClient client; |
| | | private MqttConnectOptions options; |
| | | |
| | | // 静态变量用于存储客户端实例 |
| | | private static Client gpsClient; |
| | | private static Client responseClient; |
| | | |
| | | /** |
| | | * 构造函数 |
| | | * @param host MQTT服务器地址,格式:tcp://ip:port |
| | |
| | | * @param clientId 客户端ID,不能重复 |
| | | */ |
| | | public Client(String host, String topic, String clientId) { |
| | | |
| | | this.host = host; |
| | | this.topic = topic; |
| | | this.clientId = clientId; |
| | | this.options = new MqttConnectOptions(); |
| | | this.options.setCleanSession(true); |
| | | // 设置连接超时时间(秒) |
| | | this.options.setConnectionTimeout(30); |
| | | // 设置KeepAlive间隔(秒),用于保持连接活跃 |
| | | this.options.setKeepAliveInterval(60); |
| | | // 设置自动重连 |
| | | this.options.setAutomaticReconnect(true); |
| | | // 设置MQTT版本,使用3.1.1 |
| | | this.options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void connect() throws MqttException { |
| | | if (client != null && client.isConnected()) { |
| | | System.out.println("MQTT客户端已连接,ClientId: " + clientId); |
| | | return; |
| | | } |
| | | client = new MqttClient(host, clientId); |
| | | client.connect(options); |
| | | |
| | | // 如果客户端已存在但未连接,先关闭 |
| | | if (client != null) { |
| | | try { |
| | | client.close(); |
| | | } catch (Exception e) { |
| | | // 忽略关闭时的异常 |
| | | } |
| | | client = null; |
| | | } |
| | | |
| | | // 使用内存持久化,避免文件锁定问题 |
| | | client = new MqttClient(host, clientId, new MemoryPersistence()); |
| | | // 先设置回调,再连接 |
| | | client.setCallback(new PushCallback()); |
| | | |
| | | // 执行连接 |
| | | client.connect(options); |
| | | System.out.println("MQTT连接成功!ClientId: " + clientId + ", 服务器: " + host + ", 主题: " + topic); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * 关闭客户端并释放资源 |
| | | */ |
| | | public void close() { |
| | | try { |
| | | if (client != null) { |
| | | if (client.isConnected()) { |
| | | client.disconnect(); |
| | | } |
| | | client.close(); |
| | | client = null; |
| | | } |
| | | } catch (Exception e) { |
| | | // 忽略关闭时的异常 |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 检查是否已连接 |
| | | * @return true表示已连接,false表示未连接 |
| | | */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * 示例用法 |
| | | * 连接MQTT服务器的工具方法 |
| | | * 供其他类直接调用,连接GPS主题和响应主题 |
| | | * @return true表示连接成功,false表示连接失败 |
| | | */ |
| | | public static boolean connectMQTT() { |
| | | // 先断开之前的连接 |
| | | disconnectAll(); |
| | | |
| | | public static void lianjiemqqt() { |
| | | boolean gpsSuccess = false; |
| | | boolean responseSuccess = false; |
| | | |
| | | try { |
| | | String host = "tcp://39.99.43.227:1883"; |
| | | String deiveID=Setsys.getMowerIdValue(); |
| | | String clientId =Usrdell.getUserEmail()+"mower"; |
| | | String clientId2 =Usrdell.getUserEmail()+"response"; |
| | | // 添加时间戳确保客户端ID唯一 |
| | | long timestamp = System.currentTimeMillis(); |
| | | String clientId = Usrdell.getUserEmail() + "mower" + "_" + timestamp; |
| | | String clientId2 = Usrdell.getUserEmail() + "response" + "_" + timestamp; |
| | | String topic = "mower/"+deiveID+"/gps"; |
| | | String topic2 = "mower/"+deiveID+"/response"; |
| | | Client mqttClient = new Client(host, topic, clientId); |
| | | Client mqttClient1 = new Client(host, topic2, clientId2); |
| | | mqttClient.connect(); |
| | | mqttClient.subscribe(); |
| | | |
| | | mqttClient1.connect(); |
| | | mqttClient1.subscribe(); |
| | | |
| | | // 保持程序运行 |
| | | // Thread.sleep(Long.MAX_VALUE); |
| | | // 连接GPS主题 |
| | | try { |
| | | gpsClient = new Client(host, topic, clientId); |
| | | gpsClient.connect(); |
| | | // 稍作延迟,确保连接稳定 |
| | | Thread.sleep(100); |
| | | gpsClient.subscribe(); |
| | | gpsSuccess = true; |
| | | System.out.println("GPS主题MQTT连接并订阅成功"); |
| | | } catch (MqttException e) { |
| | | throw new RuntimeException(e); |
| | | System.err.println("GPS主题MQTT连接失败: " + e.getMessage()); |
| | | if (e.getCause() != null) { |
| | | System.err.println("失败原因: " + e.getCause().getMessage()); |
| | | } |
| | | e.printStackTrace(); |
| | | } catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | System.err.println("连接过程被中断"); |
| | | } |
| | | |
| | | // 连接响应主题 |
| | | try { |
| | | responseClient = new Client(host, topic2, clientId2); |
| | | responseClient.connect(); |
| | | // 稍作延迟,确保连接稳定 |
| | | Thread.sleep(100); |
| | | responseClient.subscribe(); |
| | | responseSuccess = true; |
| | | System.out.println("响应主题MQTT连接并订阅成功"); |
| | | } catch (MqttException e) { |
| | | System.err.println("响应主题MQTT连接失败: " + e.getMessage()); |
| | | if (e.getCause() != null) { |
| | | System.err.println("失败原因: " + e.getCause().getMessage()); |
| | | } |
| | | e.printStackTrace(); |
| | | } catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | System.err.println("连接过程被中断"); |
| | | } |
| | | |
| | | if (gpsSuccess && responseSuccess) { |
| | | System.out.println("所有MQTT主题连接并订阅成功!"); |
| | | return true; |
| | | } else if (gpsSuccess || responseSuccess) { |
| | | System.out.println("部分MQTT主题连接成功"); |
| | | return true; |
| | | } else { |
| | | System.err.println("所有MQTT主题连接失败"); |
| | | return false; |
| | | } |
| | | } catch (Exception e) { |
| | | System.err.println("MQTT连接过程发生异常: " + e.getMessage()); |
| | | e.printStackTrace(); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 连接MQTT服务器的工具方法(带参数版本) |
| | | * @param host MQTT服务器地址,格式:tcp://ip:port |
| | | * @param deviceId 设备ID |
| | | * @param userEmail 用户邮箱 |
| | | * @return true表示连接成功,false表示连接失败 |
| | | */ |
| | | public static boolean connectMQTT(String host, String deviceId, String userEmail) { |
| | | // 先断开之前的连接 |
| | | disconnectAll(); |
| | | |
| | | boolean gpsSuccess = false; |
| | | boolean responseSuccess = false; |
| | | |
| | | try { |
| | | // 添加时间戳确保客户端ID唯一 |
| | | long timestamp = System.currentTimeMillis(); |
| | | String clientId = userEmail + "mower" + "_" + timestamp; |
| | | String clientId2 = userEmail + "response" + "_" + timestamp; |
| | | String topic = "mower/" + deviceId + "/gps"; |
| | | String topic2 = "mower/" + deviceId + "/response"; |
| | | |
| | | // 连接GPS主题 |
| | | try { |
| | | gpsClient = new Client(host, topic, clientId); |
| | | gpsClient.connect(); |
| | | // 稍作延迟,确保连接稳定 |
| | | Thread.sleep(100); |
| | | gpsClient.subscribe(); |
| | | gpsSuccess = true; |
| | | System.out.println("GPS主题MQTT连接并订阅成功"); |
| | | } catch (MqttException e) { |
| | | System.err.println("GPS主题MQTT连接失败: " + e.getMessage()); |
| | | if (e.getCause() != null) { |
| | | System.err.println("失败原因: " + e.getCause().getMessage()); |
| | | } |
| | | e.printStackTrace(); |
| | | } catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | System.err.println("连接过程被中断"); |
| | | } |
| | | |
| | | // 连接响应主题 |
| | | try { |
| | | responseClient = new Client(host, topic2, clientId2); |
| | | responseClient.connect(); |
| | | // 稍作延迟,确保连接稳定 |
| | | Thread.sleep(100); |
| | | responseClient.subscribe(); |
| | | responseSuccess = true; |
| | | System.out.println("响应主题MQTT连接并订阅成功"); |
| | | } catch (MqttException e) { |
| | | System.err.println("响应主题MQTT连接失败: " + e.getMessage()); |
| | | if (e.getCause() != null) { |
| | | System.err.println("失败原因: " + e.getCause().getMessage()); |
| | | } |
| | | e.printStackTrace(); |
| | | } catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | System.err.println("连接过程被中断"); |
| | | } |
| | | |
| | | if (gpsSuccess && responseSuccess) { |
| | | System.out.println("所有MQTT主题连接并订阅成功!"); |
| | | return true; |
| | | } else if (gpsSuccess || responseSuccess) { |
| | | System.out.println("部分MQTT主题连接成功"); |
| | | return true; |
| | | } else { |
| | | System.err.println("所有MQTT主题连接失败"); |
| | | return false; |
| | | } |
| | | } catch (Exception e) { |
| | | System.err.println("MQTT连接过程发生异常: " + e.getMessage()); |
| | | e.printStackTrace(); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 创建并连接MQTT客户端的工具方法 |
| | | * @param host MQTT服务器地址 |
| | | * @param topic 订阅主题 |
| | | * @param clientId 客户端ID |
| | | * @param qos 服务质量等级,默认2 |
| | | * @return Client实例,连接失败返回null |
| | | */ |
| | | public static Client createAndConnect(String host, String topic, String clientId, int qos) { |
| | | try { |
| | | Client mqttClient = new Client(host, topic, clientId); |
| | | mqttClient.connect(); |
| | | mqttClient.subscribe(qos); |
| | | System.out.println("MQTT客户端创建并订阅成功,主题: " + topic + ", ClientId: " + clientId); |
| | | return mqttClient; |
| | | } catch (MqttException e) { |
| | | System.err.println("MQTT客户端创建失败: " + e.getMessage() + ", 主题: " + topic); |
| | | e.printStackTrace(); |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 创建并连接MQTT客户端的工具方法(默认QoS为2) |
| | | * @param host MQTT服务器地址 |
| | | * @param topic 订阅主题 |
| | | * @param clientId 客户端ID |
| | | * @return Client实例,连接失败返回null |
| | | */ |
| | | public static Client createAndConnect(String host, String topic, String clientId) { |
| | | return createAndConnect(host, topic, clientId, 2); |
| | | } |
| | | |
| | | /** |
| | | * 断开所有MQTT连接 |
| | | */ |
| | | public static void disconnectAll() { |
| | | try { |
| | | if (gpsClient != null) { |
| | | gpsClient.close(); |
| | | System.out.println("GPS主题MQTT连接已断开"); |
| | | gpsClient = null; |
| | | } |
| | | if (responseClient != null) { |
| | | responseClient.close(); |
| | | System.out.println("响应主题MQTT连接已断开"); |
| | | responseClient = null; |
| | | } |
| | | } catch (Exception e) { |
| | | System.err.println("断开MQTT连接失败: " + e.getMessage()); |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 获取GPS客户端实例 |
| | | * @return GPS客户端实例 |
| | | */ |
| | | public static Client getGpsClient() { |
| | | return gpsClient; |
| | | } |
| | | |
| | | /** |
| | | * 获取响应客户端实例 |
| | | * @return 响应客户端实例 |
| | | */ |
| | | public static Client getResponseClient() { |
| | | return responseClient; |
| | | } |
| | | |
| | | /** |
| | | * 检查MQTT连接状态(静态方法) |
| | | * @return true表示已连接,false表示未连接 |
| | | */ |
| | | public static boolean areClientsConnected() { |
| | | boolean gpsConnected = gpsClient != null && gpsClient.isConnected(); |
| | | boolean responseConnected = responseClient != null && responseClient.isConnected(); |
| | | return gpsConnected || responseConnected; |
| | | } |
| | | |
| | | /** |
| | | * 示例用法(保留向后兼容) |
| | | * @deprecated 请使用 connectMQTT() 方法替代 |
| | | */ |
| | | @Deprecated |
| | | public static void lianjiemqqt() { |
| | | connectMQTT(); |
| | | } |
| | | } |