本帖最后由 跋扈洋 于 2021-6-13 15:59 编辑

Paho是什么?Paho是Eclipse IoT开源项目的一个子项目。Paho项目提供了开源可靠的开放标准消息协议的实现,目标是为机器到机器和物联网提供新的、存在的和新兴的应用程序。
Paho是一个开源的MQTT客户端SDK,准确来说是一组,它包含各种语言的实现版本,比如C、Java、Python、javascript、golang等,其中paho.mqtt.java是Java语言的实现版本,这个就是本文的主角。
Paho怎么用?Paho的API设计的非常清晰简洁。MQTT的发布者和订阅者都是客户端,所以首先要创建一个客户端,然后连接到代理服务器,接下来就可以订阅或者发布消息了。
导入paho包,以maven工程为例子,在pom.xml文件加入下面的依赖
<dependency>
  •     <groupId>org.eclipse.paho</groupId>
  •     <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  •     <version>1.2.5</version>
  • </dependency>
  • 复制代码
    创建客户端
    // 代理服务器地址(服务端)
  • String broker = "tcp://127.0.0.1:1883";
  • String clientId = "mqtt-client-1";
  • MemoryPersistence persistence = new MemoryPersistence();
  • MqttClient client = new MqttClient(broker, clientId, persistence);
  • client.setCallback(new MqttCallback() {
  •     public void connectionLost(Throwable throwable) {
  •     }
  •     public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
  •         System.out.println("receive message:" + s + " " + new String(mqttMessage.getPayload()));
  •     }
  •     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  •     }
  • });
  • 复制代码
    连接代理服务器
    MqttConnectOptions options = new MqttConnectOptions();
  • options.setUserName("admin");
  • options.setPassword("123456".toCharArray());
  • client.connect(options);
  • 复制代码
    发布消息
    String content = "hello mqtt";
  • MqttMessage mqttMessage = new MqttMessage();
  • mqttMessage.setPayload(content.getBytes());
  • client.publish("jack", mqttMessage);
  • 复制代码
    订阅消息
    订阅消息
    复制代码
    String topic = "testtopic";
  • client.subscribe(topic);
  • 复制代码
    Paho设计原理通过socket实现的通讯,会涉及到从InputStream读取数据和向OutputStream写入数据,实现这样的双向通讯需要使用到多线程技术,还需要考虑CPU资源的合理化利用、生产者消费者等问题。
    b02f742eb2f54f6db0b878cca174fb5f~tplv-k3u1fbpfcp-watermark.jpg
    Paho用了四个线程来实现这个通讯过程,读和写分别使用两个线程。读和写各使用一个消息队列来存储收发的消息,用于消息缓存和中转,同时使用Java的Object的wait()/notify()来阻塞/唤醒线程,解决生产者消费者问题和CPU资源浪费问题。
    fc942116f0094fceb1b246685ad38882~tplv-k3u1fbpfcp-watermark.jpg
    协议实现主要集中在org.eclipse.paho.client.mqttv3.internal.wire包,协议一共定义了15种类型的消息,使用面向对象编程,经过抽象封装,MqttWireMessage是消息的基类。