张世豪
3 天以前 0930bed760105b81e2e5055801bec6d6e8d57358
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package Mqttmessage;
 
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
 
/**
 * MQTT客户端工具类
 * 用于连接MQTT服务器并订阅主题
 */
public class Client {
    
    private String host;
    private String topic;
    private String clientId;
    private MqttClient client;
    private MqttConnectOptions options;
    
    /**
     * 构造函数
     * @param host MQTT服务器地址,格式:tcp://ip:port
     * @param topic 订阅的主题
     * @param clientId 客户端ID,不能重复
     */
    public Client(String host, String topic, String clientId) {
 
        this.host = host;
        this.topic = topic;
        this.clientId = clientId;
        this.options = new MqttConnectOptions();
        this.options.setCleanSession(true);
    }
    
    /**
     * 连接到MQTT服务器
     * @throws MqttException 连接失败时抛出异常
     */
    public void connect() throws MqttException {
        if (client != null && client.isConnected()) {
            return;
        }
        client = new MqttClient(host, clientId);
        client.connect(options);
        client.setCallback(new PushCallback());
    }
    
    /**
     * 订阅主题
     * @param qos 服务质量等级,0-2
     * @throws MqttException 订阅失败时抛出异常
     */
    public void subscribe(int qos) throws MqttException {
        if (client == null || !client.isConnected()) {
            connect();
        }
        client.subscribe(topic, qos);
    }
    
    /**
     * 订阅主题(默认QoS为2)
     * @throws MqttException 订阅失败时抛出异常
     */
    public void subscribe() throws MqttException {
        subscribe(2);
    }
    
    /**
     * 断开连接
     * @throws MqttException 断开连接失败时抛出异常
     */
    public void disconnect() throws MqttException {
        if (client != null && client.isConnected()) {
            client.disconnect();
        }
    }
    
    /**
     * 检查是否已连接
     * @return true表示已连接,false表示未连接
     */
    public boolean isConnected() {
        return client != null && client.isConnected();
    }
    
    /**
     * 获取MQTT客户端实例
     * @return MqttClient实例
     */
    public MqttClient getClient() {
        return client;
    }
    
    /**
     * 示例用法
     */
 
    public static void test()  {
        try {
            String host = "tcp://39.99.43.227:1883";
            String deiveID="6258";
            String clientId = "hxzkMQTT";
            String clientId2 = "hxzkMQTT2";
            String topic = "mower/"+deiveID+"/gps";
            String topic2 = "mower/"+deiveID+"/response";
            Client mqttClient = new Client(host, topic, clientId);
            Client mqttClient1 = new Client(host, topic2, clientId2);
            mqttClient.connect();
            mqttClient.subscribe();
 
            mqttClient1.connect();
            mqttClient1.subscribe();
 
            // 保持程序运行
           //Thread.sleep(Long.MAX_VALUE);
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }
}