本帖最后由 跋扈洋 于 2021-6-13 15:59 编辑

Paho是什么?Paho是Eclipse IoT开源项目的一个子项目。Paho项目提供了开源可靠的开放标准消息协议的实现,目标是为机器到机器和物联网提供新的、存在的和新兴的应用程序。
Paho是一个开源的MQTT客户端SDK,准确来说是一组,它包含各种语言的实现版本,比如C、Java、Python、javascript、golang等,其中paho.mqtt.java是Java语言的实现版本,这个就是本文的主角。
Paho怎么用?Paho的API设计的非常清晰简洁。MQTT的发布者和订阅者都是客户端,所以首先要创建一个客户端,然后连接到代理服务器,接下来就可以订阅或者发布消息了。
导入paho包,以maven工程为例子,在pom.xml文件加入下面的依赖
  1. <dependency>
  2.     <groupId>org.eclipse.paho</groupId>
  3.     <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  4.     <version>1.2.5</version>
  5. </dependency>
创建客户端
  1. // 代理服务器地址(服务端)
  2. String broker = "tcp://127.0.0.1:1883";
  3. String clientId = "mqtt-client-1";
  4. MemoryPersistence persistence = new MemoryPersistence();
  5. MqttClient client = new MqttClient(broker, clientId, persistence);
  6. client.setCallback(new MqttCallback() {
  7.     public void connectionLost(Throwable throwable) {
  8.     }
  9.     public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
  10.         System.out.println("receive message:" + s + " " + new String(mqttMessage.getPayload()));
  11.     }
  12.     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  13.     }
  14. });
连接代理服务器
  1. MqttConnectOptions options = new MqttConnectOptions();
  2. options.setUserName("admin");
  3. options.setPassword("123456".toCharArray());
  4. client.connect(options);
发布消息
  1. String content = "hello mqtt";
  2. MqttMessage mqttMessage = new MqttMessage();
  3. mqttMessage.setPayload(content.getBytes());
  4. client.publish("jack", mqttMessage);
订阅消息
  1. 订阅消息
  1. String topic = "testtopic";
  2. client.subscribe(topic);
Paho设计原理通过socket实现的通讯,会涉及到从InputStream读取数据和向OutputStream写入数据,实现这样的双向通讯需要使用到多线程技术,还需要考虑CPU资源的合理化利用、生产者消费者等问题。
b02f742eb2f54f6db0b878cca174fb5f~tplv-k3u1fbpfcp-watermark.jpg
Paho用了四个线程来实现这个通讯过程,读和写分别使用两个线程。读和写各使用一个消息队列来存储收发的消息,用于消息缓存和中转,同时使用Java的Object的wait()/notify()来阻塞/唤醒线程,解决生产者消费者问题和CPU资源浪费问题。
fc942116f0094fceb1b246685ad38882~tplv-k3u1fbpfcp-watermark.jpg
协议实现主要集中在org.eclipse.paho.client.mqttv3.internal.wire包,协议一共定义了15种类型的消息,使用面向对象编程,经过抽象封装,MqttWireMessage是消息的基类。