Paho是什么?Paho是Eclipse IoT开源项目的一个子项目。Paho项目提供了开源可靠的开放标准消息协议的实现,目标是为机器到机器和物联网提供新的、存在的和新兴的应用程序。
Paho是一个开源的MQTT客户端SDK,准确来说是一组,它包含各种语言的实现版本,比如C、Java、Python、javascript、golang等,其中paho.mqtt.java是Java语言的实现版本,这个就是本文的主角。
Paho怎么用?Paho的API设计的非常清晰简洁。MQTT的发布者和订阅者都是客户端,所以首先要创建一个客户端,然后连接到代理服务器,接下来就可以订阅或者发布消息了。
导入paho包,以maven工程为例子,在pom.xml文件加入下面的依赖
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
- <version>1.2.5</version>
- </dependency>
- // 代理服务器地址(服务端)
- String broker = "tcp://127.0.0.1:1883";
- String clientId = "mqtt-client-1";
- MemoryPersistence persistence = new MemoryPersistence();
- MqttClient client = new MqttClient(broker, clientId, persistence);
- client.setCallback(new MqttCallback() {
- public void connectionLost(Throwable throwable) {
- }
- public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
- System.out.println("receive message:" + s + " " + new String(mqttMessage.getPayload()));
- }
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- }
- });
- MqttConnectOptions options = new MqttConnectOptions();
- options.setUserName("admin");
- options.setPassword("123456".toCharArray());
- client.connect(options);
- String content = "hello mqtt";
- MqttMessage mqttMessage = new MqttMessage();
- mqttMessage.setPayload(content.getBytes());
- client.publish("jack", mqttMessage);
- 订阅消息
- String topic = "testtopic";
- client.subscribe(topic);
Paho用了四个线程来实现这个通讯过程,读和写分别使用两个线程。读和写各使用一个消息队列来存储收发的消息,用于消息缓存和中转,同时使用Java的Object的wait()/notify()来阻塞/唤醒线程,解决生产者消费者问题和CPU资源浪费问题。
协议实现主要集中在org.eclipse.paho.client.mqttv3.internal.wire包,协议一共定义了15种类型的消息,使用面向对象编程,经过抽象封装,MqttWireMessage是消息的基类。