package Mqttmessage; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; /** * MQTT客户端工具类 * 用于连接MQTT服务器并订阅主题 */ public class Client { private String host; private String topic; private String clientId; private MqttClient client; private MqttConnectOptions options; /** * 构造函数 * @param host MQTT服务器地址,格式:tcp://ip:port * @param topic 订阅的主题 * @param clientId 客户端ID,不能重复 */ public Client(String host, String topic, String clientId) { this.host = host; this.topic = topic; this.clientId = clientId; this.options = new MqttConnectOptions(); this.options.setCleanSession(true); } /** * 连接到MQTT服务器 * @throws MqttException 连接失败时抛出异常 */ public void connect() throws MqttException { if (client != null && client.isConnected()) { return; } client = new MqttClient(host, clientId); client.connect(options); client.setCallback(new PushCallback()); } /** * 订阅主题 * @param qos 服务质量等级,0-2 * @throws MqttException 订阅失败时抛出异常 */ public void subscribe(int qos) throws MqttException { if (client == null || !client.isConnected()) { connect(); } client.subscribe(topic, qos); } /** * 订阅主题(默认QoS为2) * @throws MqttException 订阅失败时抛出异常 */ public void subscribe() throws MqttException { subscribe(2); } /** * 断开连接 * @throws MqttException 断开连接失败时抛出异常 */ public void disconnect() throws MqttException { if (client != null && client.isConnected()) { client.disconnect(); } } /** * 检查是否已连接 * @return true表示已连接,false表示未连接 */ public boolean isConnected() { return client != null && client.isConnected(); } /** * 获取MQTT客户端实例 * @return MqttClient实例 */ public MqttClient getClient() { return client; } /** * 示例用法 */ public static void test() { try { String host = "tcp://39.99.43.227:1883"; String deiveID="6258"; String clientId = "hxzkMQTT"; String clientId2 = "hxzkMQTT2"; String topic = "mower/"+deiveID+"/gps"; String topic2 = "mower/"+deiveID+"/response"; Client mqttClient = new Client(host, topic, clientId); Client mqttClient1 = new Client(host, topic2, clientId2); mqttClient.connect(); mqttClient.subscribe(); mqttClient1.connect(); mqttClient1.subscribe(); // 保持程序运行 //Thread.sleep(Long.MAX_VALUE); } catch (MqttException e) { throw new RuntimeException(e); } } }