本文分享自华为云社区《一文带你掌握物联网 mqtt 网关搭建背后的技术原理》,作者:张俭。

前言

物联网是现在比较热门的软件领域,众多物联网厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是 Mqtt 网关。这篇文章的目的是手把手教大家写书写一个 mqtt 网关,后端存储支持 Kafka/Pulsar,支持 mqtt 连接、断链、发送消息、订阅消息。技术选型:

  • Netty java 最流行的网络框架
  • netty-codec-mqtt netty 的子项目,mqtt 编解码插件
  • Pulsar/Kafka 流行的消息中间件作为后端存储

核心 pom 依赖如下
<dependency>
  • <groupId>io.netty</groupId>
  • <artifactId>netty-codec-mqtt</artifactId>
  • </dependency>
  • <dependency>
  • <groupId>io.netty</groupId>
  • <artifactId>netty-common</artifactId>
  • </dependency>
  • <dependency>
  • <groupId>io.netty</groupId>
  • <artifactId>netty-transport</artifactId>
  • </dependency>
  • <dependency>
  • <groupId>org.apache.pulsar</groupId>
  • <artifactId>pulsar-client-original</artifactId>
  • <version>${pulsar.version}</version>
  • </dependency>
  • <dependency>
  • <groupId>org.apache.kafka</groupId>
  • <artifactId>kafka-clients</artifactId>
  • <version>${kafka.version}</version>
  • </dependency>
  • <dependency>
  • <groupId>org.eclipse.paho</groupId>
  • <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  • <version>${mqtt-client.version}</version>
  • <scope>test</scope>
  • </dependency>
  • 复制代码
    软件参数设计

    软件参数可谓是非常常见,复杂的开源项目,参数甚至可以达到上百个、配置文件长达数千行。我们需要的配置有

    MqttServer 监听的端口
    监听端口的配置即使是写 demo 也非常必要,常常用在单元测试中,由于单元测试跑完之后,即使网络服务器关闭,操作系统也不会立即释放端口,所以单元测试的时候指定随机端口非常关键,在 java 中,我们可以通过这样的工具类来获取一个空闲的端口。未配置的话,我们就使用 mqtt 的默认端口 1883。
    package io.github.protocol.mqtt.broker.util;
  • import java.io.IOException;
  • import java.io.UncheckedIOException;
  • import java.net.ServerSocket;
  • public class SocketUtil {
  • public static int getFreePort() {
  • try (ServerSocket serverSocket = new ServerSocket(0)) {
  • return serverSocket.getLocalPort();
  •         } catch (IOException e) {
  • throw new UncheckedIOException(e);
  •         }
  •     }
  • }
  • 复制代码
    后端存储配置
    我们的 mqtt 网关是没有可靠的存储能力的,依赖后端的消息中间件来做持久化处理。后端规划支持 Pulsar、Kafka 两种类型。定义枚举类如下
    public enum ProcessorType {
  •     KAFKA,
  •     PULSAR,
  • }
  • 复制代码
    对应的 KafkaProcessorConfig、PulsarProcessorConfig 比较简单,包含基础的连接地址即可,如果后续要做性能调优、安全,这块还是会有更多的配置项
    @Setter
  • @Getter
  • public class KafkaProcessorConfig {
  • private String bootstrapServers = "localhost:9092";
  • public KafkaProcessorConfig() {
  •     }
  • }
  • @Setter
  • @Getter
  • public class PulsarProcessorConfig {
  • private String httpUrl = "http://localhost:8080";
  • private String serviceUrl = "pulsar://localhost:6650";
  • public PulsarProcessorConfig() {
  •     }
  • }
  • 复制代码
    启动 netty MqttServer
    我们通过 netty 启动一个 mqttServer,添加 mqtt 解码器
    package io.github.protocol.mqtt.broker;
  • import io.github.protocol.mqtt.broker.processor.KafkaProcessor;
  • import io.github.protocol.mqtt.broker.processor.KafkaProcessorConfig;
  • import io.github.protocol.mqtt.broker.processor.MqttProcessor;
  • import io.github.protocol.mqtt.broker.processor.PulsarProcessor;
  • import io.github.protocol.mqtt.broker.processor.PulsarProcessorConfig;
  • import io.github.protocol.mqtt.broker.util.SocketUtil;
  • import io.netty.bootstrap.ServerBootstrap;
  • import io.netty.channel.ChannelFuture;
  • import io.netty.channel.ChannelInitializer;
  • import io.netty.channel.ChannelOption;
  • import io.netty.channel.ChannelPipeline;
  • import io.netty.channel.EventLoopGroup;
  • import io.netty.channel.nio.NioEventLoopGroup;
  • import io.netty.channel.socket.SocketChannel;
  • import io.netty.channel.socket.nio.NioServerSocketChannel;
  • import io.netty.handler.codec.mqtt.MqttDecoder;
  • import io.netty.handler.codec.mqtt.MqttEncoder;
  • import io.netty.handler.logging.LogLevel;
  • import io.netty.handler.logging.LoggingHandler;
  • import lombok.extern.slf4j.Slf4j;
  • @Slf4j
  • public class MqttServer {
  • private final MqttServerConfig mqttServerConfig;
  • public MqttServer() {
  • this(new MqttServerConfig());
  •     }
  • public MqttServer(MqttServerConfig mqttServerConfig) {
  • this.mqttServerConfig = mqttServerConfig;
  • if (mqttServerConfig.getPort() == 0) {
  •             mqttServerConfig.setPort(SocketUtil.getFreePort());
  •         }
  •     }
  • public void start() throws Exception {
  •         EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  •         EventLoopGroup workerGroup = new NioEventLoopGroup();
  • try {
  •             ServerBootstrap b = new ServerBootstrap();
  •             b.group(bossGroup, workerGroup)
  •                     .channel(NioServerSocketChannel.class)
  •                     .option(ChannelOption.SO_BACKLOG, 100)
  •                     .handler(new LoggingHandler(LogLevel.INFO))
  •                     .childHandler(new ChannelInitializer<SocketChannel>() {
  •                         @Override
  • public void initChannel(SocketChannel ch) throws Exception {
  •                             ChannelPipeline p = ch.pipeline();
  • // decoder
  •                             p.addLast(new MqttDecoder());
  •                             p.addLast(MqttEncoder.INSTANCE);
  •                         }
  •                     });
  • // Start the server.
  •             ChannelFuture f = b.bind(mqttServerConfig.getPort()).sync();
  • // Wait until the server socket is closed.
  •             f.channel().closeFuture().sync();
  •         } finally {
  • // Shut down all event loops to terminate all threads.
  •             bossGroup.shutdownGracefully();
  •             workerGroup.shutdownGracefully();
  •         }
  •     }
  • private MqttProcessor processor(MqttServerConfig config) {
  • return switch (config.getProcessorType()) {
  • case KAFKA -> new KafkaProcessor(config.getMqttAuth(), config.getKafkaProcessorConfig());
  • case PULSAR -> new PulsarProcessor(config.getMqttAuth(), config.getPulsarProcessorConfig());
  •         };
  •     }
  • public int getPort() {
  • return mqttServerConfig.getPort();
  •     }
  • }
  • 复制代码
    MqttserverStarter.java

    我们写一个简单的 main 函数用来启动 mqttServer,方便调测
    package io.github.protocol.mqtt.broker;
  • public class MqttServerStarter {
  • public static void main(String[] args) throws Exception {
  • new MqttServer().start();
  •     }
  • }
  • 复制代码
    客户端使用 eclipse mqtt client 进行测试
    package io.github.protocol.mqtt;
  • import lombok.extern.log4j.Log4j2;
  • import org.eclipse.paho.client.mqttv3.MqttClient;
  • import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  • import org.eclipse.paho.client.mqttv3.MqttException;
  • import org.eclipse.paho.client.mqttv3.MqttMessage;
  • import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  • @Log4j2
  • public class MqttClientPublishExample {
  • public static void main(String[] args) throws Exception {
  •         String topic = "MQTT Examples";
  •         String content = "Message from MqttPublishExample";
  •         int qos = 2;
  •         String broker = "tcp://127.0.0.1:1883";
  •         String clientId = "JavaSample";
  •         MemoryPersistence persistence = new MemoryPersistence();
  • try {
  •             MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  •             MqttConnectOptions connOpts = new MqttConnectOptions();
  •             connOpts.setCleanSession(true);
  •             log.info("Connecting to broker: {}", broker);
  •             sampleClient.connect(connOpts);
  •             log.info("Connected");
  •             log.info("Publishing message: {}", content);
  •             MqttMessage message = new MqttMessage(content.getBytes());
  •             message.setQos(qos);
  •             sampleClient.publish(topic, message);
  •             log.info("Message published");
  •             sampleClient.disconnect();
  •             log.info("Disconnected");
  •             System.exit(0);
  •         } catch (MqttException me) {
  •             log.error("reason {} msg {}", me.getReasonCode(), me.getMessage(), me);
  •         }
  •     }
  • }
  • 复制代码
    然后我们先运行 MqttServer,再运行 MqttClient,发现 MqttClient 卡住了
    Connecting to broker: tcp://127.0.0.1:1883
    复制代码
    这是为什么呢,我们通过抓包发现仅仅只有客户端发送了 Mqtt connect 信息,服务端并没有响应
        v2-e4116b3cd0300b948f56180e010bc62a_720w.jpg
    但是根据 mqtt 标准协议,发送 Connect 消息,必须要有 ConnAck 响应
        v2-a147e16bc55acb1451087d719e302f55_720w.jpg
    所以我们需要在接收到 Connect 后,返回 connAck 消息。我们创建一个 MqttHandler,让他继承 ChannelInboundHandlerAdapter, 用来接力 MqttDecoder 解码完成后的消息,这里要重点继承其中的 channelRead 方法,以及 channelInactive 方法,用来释放断链时需要释放的资源
    package com.github.shoothzj.mqtt;
  • import io.netty.channel.ChannelHandlerContext;
  • import io.netty.channel.ChannelInboundHandlerAdapter;
  • import lombok.extern.slf4j.Slf4j;
  • @Slf4j
  • public class MqttHandler extends ChannelInboundHandlerAdapter {
  •     @Override
  • public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  • super.channelRead(ctx, msg);
  •     }
  • }
  • 复制代码
    然后把这个 handler 加入到 netty 的职责链中,放到解码器的后面
        v2-d42ce1c26325abb7e632f4b46e5e2066_720w.jpg
    在 mqtt handler 中插入我们的代码
    @Override
  • public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  • super.channelRead(ctx, msg);
  • if (msg instanceof MqttConnectMessage) {
  •             handleConnect(ctx, (MqttConnectMessage) msg);
  •         } else {
  •             log.error("Unsupported type msg [{}]", msg);
  •         }
  •     }
  • private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
  •         log.info("connect msg is [{}]", connectMessage);
  •     }
  • 复制代码
    打印出 connectMessage 如下
    [MqttConnectMessage[fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=22], variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=60], payload=MqttConnectPayload[clientIdentifier=JavaSample, willTopic=null, willMessage=null, userName=null, password=null]]]
  • 复制代码
    通常,mqtt connect message 中会包含 qos、用户名、密码等信息,由于我们启动客户端的时候也没有携带用户名和密码,这里获取到的都为 null,我们先不校验这些消息,直接给客户端返回 connack 消息,代表连接成功  
    final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();
  •         ctx.channel().writeAndFlush(ackMessage);
  • 复制代码
    我们再运行起 Server 和 Client,随后可以看到已经走过了 Connect 阶段,进入了 publish message 过程,接下来我们再实现更多的其他场景
        v2-a6c4c297bc072a4d68591cdc587babcc_720w.jpg
    附上此阶段的 MqttHandler 代码
    package com.github.shoothzj.mqtt;
  • import io.netty.channel.ChannelHandlerContext;
  • import io.netty.channel.ChannelInboundHandlerAdapter;
  • import io.netty.handler.codec.mqtt.MqttConnAckMessage;
  • import io.netty.handler.codec.mqtt.MqttConnectMessage;
  • import io.netty.handler.codec.mqtt.MqttConnectPayload;
  • import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
  • import io.netty.handler.codec.mqtt.MqttFixedHeader;
  • import io.netty.handler.codec.mqtt.MqttMessageBuilders;
  • import lombok.extern.slf4j.Slf4j;
  • import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
  • @Slf4j
  • public class MqttHandler extends ChannelInboundHandlerAdapter {
  •     @Override
  • public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  • super.channelRead(ctx, msg);
  • if (msg instanceof MqttConnectMessage) {
  •             handleConnect(ctx, (MqttConnectMessage) msg);
  •         } else {
  •             log.error("Unsupported type msg [{}]", msg);
  •         }
  •     }
  • private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
  •         log.info("connect msg is [{}]", connectMessage);
  •         final MqttFixedHeader fixedHeader = connectMessage.fixedHeader();
  •         final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();
  •         final MqttConnectPayload connectPayload = connectMessage.payload();
  •         final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();
  •         ctx.channel().writeAndFlush(ackMessage);
  •     }
  • }
  • 复制代码
    我们当前把所有的逻辑都放在 MqttHandler 里面,不方便后续的扩展。抽象出一个 MqttProcessor 接口来处理具体的请求,MqttHandler 负责解析 MqttMessage 的类型并分发。MqttProcess 接口设计如下
    package io.github.protocol.mqtt.broker.processor;
  • import io.netty.channel.ChannelHandlerContext;
  • import io.netty.handler.codec.mqtt.MqttConnAckMessage;
  • import io.netty.handler.codec.mqtt.MqttConnectMessage;
  • import io.netty.handler.codec.mqtt.MqttMessage;
  • import io.netty.handler.codec.mqtt.MqttPubAckMessage;
  • import io.netty.handler.codec.mqtt.MqttPublishMessage;
  • import io.netty.handler.codec.mqtt.MqttSubAckMessage;
  • import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
  • import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
  • import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
  • public interface MqttProcessor {
  •     void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception;
  •     void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception;
  •     void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception;
  •     void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception;
  •     void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
  •     void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
  •     void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
  •     void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception;
  •     void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception;
  •     void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception;
  •     void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception;
  •     void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
  •     void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
  •     void processDisconnect(ChannelHandlerContext ctx) throws Exception;
  •     void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
  • }
  • 复制代码
    我们允许这些方法抛出异常,当遇到极难处理的故障时,把 mqtt 连接断掉(如后端存储故障),等待客户端的重连。
    MqttHandler 中来调用 MqttProcessor,相关 MqttHandler 代码如下
          Preconditions.checkArgument(message instanceof MqttMessage);
  •         MqttMessage msg = (MqttMessage) message;
  • try {
  • if (msg.decoderResult().isFailure()) {
  •                 Throwable cause = msg.decoderResult().cause();
  • if (cause instanceof MqttUnacceptableProtocolVersionException) {
  • // Unsupported protocol version
  •                     MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
  • new MqttFixedHeader(MqttMessageType.CONNACK,
  • false, MqttQoS.AT_MOST_ONCE, false, 0),
  • new MqttConnAckVariableHeader(
  •                                     MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION,
  • false), null);
  •                     ctx.writeAndFlush(connAckMessage);
  •                     log.error("connection refused due to invalid protocol, client address [{}]",
  •                             ctx.channel().remoteAddress());
  •                     ctx.close();
  • return;
  •                 } else if (cause instanceof MqttIdentifierRejectedException) {
  • // ineligible clientId
  •                     MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
  • new MqttFixedHeader(MqttMessageType.CONNACK,
  • false, MqttQoS.AT_MOST_ONCE, false, 0),
  • new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,
  • false), null);
  •                     ctx.writeAndFlush(connAckMessage);
  •                     log.error("ineligible clientId, client address [{}]", ctx.channel().remoteAddress());
  •                     ctx.close();
  • return;
  •                 }
  • throw new IllegalStateException(msg.decoderResult().cause().getMessage());
  •             }
  •             MqttMessageType messageType = msg.fixedHeader().messageType();
  • if (log.isDebugEnabled()) {
  •                 log.debug("Processing MQTT Inbound handler message, type={}", messageType);
  •             }
  • switch (messageType) {
  • case CONNECT:
  •                     Preconditions.checkArgument(msg instanceof MqttConnectMessage);
  •                     processor.processConnect(ctx, (MqttConnectMessage) msg);
  • break;
  • case CONNACK:
  •                     Preconditions.checkArgument(msg instanceof MqttConnAckMessage);
  •                     processor.processConnAck(ctx, (MqttConnAckMessage) msg);
  • break;
  • case PUBLISH:
  •                     Preconditions.checkArgument(msg instanceof MqttPublishMessage);
  •                     processor.processPublish(ctx, (MqttPublishMessage) msg);
  • break;
  • case PUBACK:
  •                     Preconditions.checkArgument(msg instanceof MqttPubAckMessage);
  •                     processor.processPubAck(ctx, (MqttPubAckMessage) msg);
  • break;
  • case PUBREC:
  •                     processor.processPubRec(ctx, msg);
  • break;
  • case PUBREL:
  •                     processor.processPubRel(ctx, msg);
  • break;
  • case PUBCOMP:
  •                     processor.processPubComp(ctx, msg);
  • break;
  • case SUBSCRIBE:
  •                     Preconditions.checkArgument(msg instanceof MqttSubscribeMessage);
  •                     processor.processSubscribe(ctx, (MqttSubscribeMessage) msg);
  • break;
  • case SUBACK:
  •                     Preconditions.checkArgument(msg instanceof MqttSubAckMessage);
  •                     processor.processSubAck(ctx, (MqttSubAckMessage) msg);
  • break;
  • case UNSUBSCRIBE:
  •                     Preconditions.checkArgument(msg instanceof MqttUnsubscribeMessage);
  •                     processor.processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
  • break;
  • case UNSUBACK:
  •                     Preconditions.checkArgument(msg instanceof MqttUnsubAckMessage);
  •                     processor.processUnsubAck(ctx, (MqttUnsubAckMessage) msg);
  • break;
  • case PINGREQ:
  •                     processor.processPingReq(ctx, msg);
  • break;
  • case PINGRESP:
  •                     processor.processPingResp(ctx, msg);
  • break;
  • case DISCONNECT:
  •                     processor.processDisconnect(ctx);
  • break;
  • case AUTH:
  •                     processor.processAuth(ctx, msg);
  • break;
  • default:
  • throw new UnsupportedOperationException("Unknown MessageType: " + messageType);
  •             }
  •         } catch (Throwable ex) {
  •             ReferenceCountUtil.safeRelease(msg);
  •             log.error("Exception was caught while processing MQTT message, ", ex);
  •             ctx.close();
  •         }
  • 复制代码
    这里的代码,主要是针对 MqttMessage 的不同类型,调用 MqttProcessor 的不同方法,值得一提的有两点

    • 提前判断了一些解码异常,fast fail
    • 全局捕获异常,并进行断链处理
      
    维护 MqttSession

    维护 Mqtt 会话的 session,主要用来持续跟踪客户端会话信息,跟踪在系统中占用的资源等,考虑到无论是何种后端实现,都需要维护 Mqtt 的 Session,我们构筑一个 AbstractMqttProcessor 来维护 MqttSession
    package io.github.protocol.mqtt.broker.processor;
  • import io.github.protocol.mqtt.broker.MqttSessionKey;
  • import io.github.protocol.mqtt.broker.auth.MqttAuth;
  • import io.github.protocol.mqtt.broker.util.ChannelUtils;
  • import io.github.protocol.mqtt.broker.util.MqttMessageUtil;
  • import io.netty.channel.ChannelHandlerContext;
  • import io.netty.handler.codec.mqtt.MqttConnAckMessage;
  • import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
  • import io.netty.handler.codec.mqtt.MqttConnectMessage;
  • import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
  • import io.netty.handler.codec.mqtt.MqttFixedHeader;
  • import io.netty.handler.codec.mqtt.MqttMessage;
  • import io.netty.handler.codec.mqtt.MqttMessageFactory;
  • import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
  • import io.netty.handler.codec.mqtt.MqttMessageType;
  • import io.netty.handler.codec.mqtt.MqttPubAckMessage;
  • import io.netty.handler.codec.mqtt.MqttPublishMessage;
  • import io.netty.handler.codec.mqtt.MqttQoS;
  • import io.netty.handler.codec.mqtt.MqttSubAckMessage;
  • import io.netty.handler.codec.mqtt.MqttSubAckPayload;
  • import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
  • import io.netty.handler.codec.mqtt.MqttSubscribePayload;
  • import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
  • import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
  • import lombok.extern.slf4j.Slf4j;
  • import org.apache.commons.lang3.StringUtils;
  • import java.util.stream.IntStream;
  • @Slf4j
  • public abstract class AbstractProcessor implements MqttProcessor {
  • protected final MqttAuth mqttAuth;
  • public AbstractProcessor(MqttAuth mqttAuth) {
  • this.mqttAuth = mqttAuth;
  •     }
  •     @Override
  • public void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception {
  •         String clientId = msg.payload().clientIdentifier();
  •         String username = msg.payload().userName();
  •         byte[] pwd = msg.payload().passwordInBytes();
  • if (StringUtils.isBlank(clientId) || StringUtils.isBlank(username)) {
  •             MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
  • new MqttFixedHeader(MqttMessageType.CONNACK,
  • false, MqttQoS.AT_MOST_ONCE, false, 0),
  • new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,
  • false), null);
  •             ctx.writeAndFlush(connAckMessage);
  •             log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());
  •             ctx.close();
  • return;
  •         }
  • if (!mqttAuth.connAuth(clientId, username, pwd)) {
  •             MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
  • new MqttFixedHeader(MqttMessageType.CONNACK,
  • false, MqttQoS.AT_MOST_ONCE, false, 0),
  • new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD,
  • false), null);
  •             ctx.writeAndFlush(connAckMessage);
  •             log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());
  •             ctx.close();
  • return;
  •         }
  •         MqttSessionKey mqttSessionKey = new MqttSessionKey();
  •         mqttSessionKey.setUsername(username);
  •         mqttSessionKey.setClientId(clientId);
  •         ChannelUtils.setMqttSession(ctx.channel(), mqttSessionKey);
  •         log.info("username {} clientId {} remote address {} connected",
  •                 username, clientId, ctx.channel().remoteAddress());
  •         onConnect(mqttSessionKey);
  •         MqttConnAckMessage mqttConnectMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
  • new MqttFixedHeader(MqttMessageType.CONNACK,
  • false, MqttQoS.AT_MOST_ONCE, false, 0),
  • new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false),
  • null);
  •         ctx.writeAndFlush(mqttConnectMessage);
  •     }
  • protected void onConnect(MqttSessionKey mqttSessionKey) {
  •     }
  •     @Override
  • public void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception {
  •         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  • if (mqttSession == null) {
  •             log.error("conn ack, client address {} not authed", ctx.channel().remoteAddress());
  •             ctx.close();
  •         }
  •     }
  •     @Override
  • public void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception {
  •         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  • if (mqttSession == null) {
  •             log.error("publish, client address {} not authed", ctx.channel().remoteAddress());
  •             ctx.close();
  • return;
  •         }
  • if (msg.fixedHeader().qosLevel() == MqttQoS.FAILURE) {
  •             log.error("failure. clientId {}, username {} ", mqttSession.getClientId(), mqttSession.getUsername());
  • return;
  •         }
  • if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
  •             log.error("does not support QoS2 protocol. clientId {}, username {} ",
  •                     mqttSession.getClientId(), mqttSession.getUsername());
  • return;
  •         }
  •         onPublish(ctx, mqttSession, msg);
  •     }
  • protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
  •                              MqttPublishMessage msg) throws Exception {
  •     }
  •     @Override
  • public void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception {
  •         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  • if (mqttSession == null) {
  •             log.error("pub ack, client address {} not authed", ctx.channel().remoteAddress());
  •             ctx.close();
  •         }
  •     }
  •     @Override
  • public void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
  •         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  • if (mqttSession == null) {
  •             log.error("pub rec, client address {} not authed", ctx.channel().remoteAddress());
  •             ctx.close();
  •         }
  •     }
  •     @Override
  • public void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
  •         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  • if (mqttSession == null) {
  •             log.error("pub rel, client address {} not authed", ctx.channel().remoteAddress());
  •             ctx.close();
  •         }
  •     }
  •     @Override
  • public void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
  •         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  • if (mqttSession == null) {
  •             log.error("pub comp, client address {} not authed", ctx.channel().remoteAddress());
  •             ctx.close();
  •         }
  •     }
  •     @Override
  • public void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception {
  •         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  • if (mqttSession == null) {
  •             log.error("sub, client address {} not authed", ctx.channel().remoteAddress());
  •             ctx.close();
  •         }
  •         onSubscribe(ctx, mqttSession, msg.payload());
  •         MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK,
  • false, MqttQoS.AT_MOST_ONCE, false, 0);
  •         IntStream intStream = msg.payload().topicSubscriptions().stream().mapToInt(s -> s.qualityOfService().value());
  •         MqttSubAckPayload payload = new MqttSubAckPayload(intStream.toArray());
  •         ctx.writeAndFlush(MqttMessageFactory.newMessage(
  •                 fixedHeader,
  •                 MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),
  •                 payload));
  •     }
  • protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
  •                                MqttSubscribePayload subscribePayload) throws Exception {
  •     }
  •     @Override
  • public void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception {
  •         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  • if (mqttSession == null) {
  •             log.error("sub ack, client address {} not authed", ctx.channel().remoteAddress());
  •             ctx.close();
  •         }
  •     }
  •     @Override
  • public void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception {
  •         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  • if (mqttSession == null) {
  •             log.error("unsub, client address {} not authed", ctx.channel().remoteAddress());
  •             ctx.close();
  •         }
  •     }
  •     @Override
  • public void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception {
  •         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  • if (mqttSession == null) {
  •             log.error("unsub ack, client address {} not authed", ctx.channel().remoteAddress());
  •             ctx.close();
  •         }
  •     }
  •     @Override
  • public void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
  •         ctx.writeAndFlush(MqttMessageUtil.pingResp());
  •     }
  •     @Override
  • public void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
  •         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  • if (mqttSession == null) {
  •             log.error("ping resp, client address {} not authed", ctx.channel().remoteAddress());
  •             ctx.close();
  •         }
  •     }
  •     @Override
  • public void processDisconnect(ChannelHandlerContext ctx) throws Exception {
  •         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  • if (mqttSession == null) {
  •             log.error("disconnect, client address {} not authed", ctx.channel().remoteAddress());
  •         }
  •         onDisconnect(mqttSession);
  •     }
  • protected void onDisconnect(MqttSessionKey mqttSessionKey) {
  •     }
  •     @Override
  • public void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
  •         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  • if (mqttSession == null) {
  •             log.error("auth, client address {} not authed", ctx.channel().remoteAddress());
  •             ctx.close();
  •         }
  •     }
  • }
  • 复制代码

    可以看到,这里的 AbstractProcessor 主要是维护了 MqttSessionKey,校验 MqttSessionKey,并拦截 publish 中不支持的 Qos2、Failure。同时,也影响了 mqtt 心跳请求。同样的,我们允许在 onPublishonSubscribe 中抛出异常。
    基于消息队列实现的 mqtt 网关的基础思想也比较简单,简而言之就是,有 publish 消息的时候向消息队列中生产消息。有订阅的时候就从消息队列中拉取消息。由此延伸出来,我们可能需要维护每个 mqtt topic 和 producer、consumer 的对应关系,因为像 kafka、pulsar 这些消息中间件的消费者都是区分 topic 的,片段通用代码如下:
      
    protected final ReentrantReadWriteLock.ReadLock rLock;
  • protected final ReentrantReadWriteLock.WriteLock wLock;
  • protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;
  • protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;
  • protected final Map<MqttTopicKey, P> producerMap;
  • protected final Map<MqttTopicKey, C> consumerMap;
  • public AbstractMqProcessor(MqttAuth mqttAuth) {
  • super(mqttAuth);
  •         ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  •         rLock = lock.readLock();
  •         wLock = lock.writeLock();
  • this.sessionProducerMap = new HashMap<>();
  • this.sessionConsumerMap = new HashMap<>();
  • this.producerMap = new HashMap<>();
  • this.consumerMap = new HashMap<>();
  •     }
  •     @Override
  • protected void onConnect(MqttSessionKey mqttSessionKey) {
  •         wLock.lock();
  • try {
  •             sessionProducerMap.put(mqttSessionKey, new ArrayList<>());
  •             sessionConsumerMap.put(mqttSessionKey, new ArrayList<>());
  •         } finally {
  •             wLock.unlock();
  •         }
  •     }
  •     @Override
  • protected void onDisconnect(MqttSessionKey mqttSessionKey) {
  •         wLock.lock();
  • try {
  • // find producers
  •             List<MqttTopicKey> produceTopicKeys = sessionProducerMap.get(mqttSessionKey);
  • if (produceTopicKeys != null) {
  • for (MqttTopicKey mqttTopicKey : produceTopicKeys) {
  •                     P producer = producerMap.get(mqttTopicKey);
  • if (producer != null) {
  •                         ClosableUtils.close(producer);
  •                         producerMap.remove(mqttTopicKey);
  •                     }
  •                 }
  •             }
  •             sessionProducerMap.remove(mqttSessionKey);
  •             List<MqttTopicKey> consumeTopicKeys = sessionConsumerMap.get(mqttSessionKey);
  • if (consumeTopicKeys != null) {
  • for (MqttTopicKey mqttTopicKey : consumeTopicKeys) {
  •                     C consumer = consumerMap.get(mqttTopicKey);
  • if (consumer != null) {
  •                         ClosableUtils.close(consumer);
  •                         consumerMap.remove(mqttTopicKey);
  •                     }
  •                 }
  •             }
  •             sessionConsumerMap.remove(mqttSessionKey);
  •         } finally {
  •             wLock.unlock();
  •         }
  •     }
  • }
  • 复制代码

      kafka processor 实现  
    由于kafka producer不区分topic,我们可以在kafka processor中复用producer,在将来单个kafka producer的性能到达上限时,我们可以将kafka producer扩展为kafka producer列表进行轮询处理,消费者由于mqtt协议可能针对每个订阅topic有不同的行为,不合适复用同一个消费者实例。我们在构造函数中启动KafkaProducer
  • private final KafkaProcessorConfig kafkaProcessorConfig;
  • private final KafkaProducer<String, ByteBuffer> producer;
  • public KafkaProcessor(MqttAuth mqttAuth, KafkaProcessorConfig kafkaProcessorConfig) {
  • super(mqttAuth);
  • this.kafkaProcessorConfig = kafkaProcessorConfig;
  • this.producer = createProducer();
  •     }
  • protected KafkaProducer<String, ByteBuffer> createProducer() {
  •         Properties properties = new Properties();
  •         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProcessorConfig.getBootstrapServers());
  •         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  •         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class);
  • return new KafkaProducer<>(properties);
  •     }
  • 复制代码

    处理 MqttPublish 消息,MqttPublish 消息包含如下几个关键参数
      
  • MqttQoS mqttQoS = publishMessage.fixedHeader().qosLevel();
  • String topic = publishMessage.variableHeader().topicName();
  • ByteBuffer byteBuffer = publishMessage.payload().nioBuffer();
  • 复制代码

    其中

    • qos 代表这条消息的质量级别,0 没有任何保障,1 代表至少一次,2 代表恰好一次。当前仅支持 qos0、qos1
    • topicName 就是 topic 的名称
    • ByteBuffer 就是消息的内容
    根据 topic、qos 发送消息,代码如下
      
      String topic = msg.variableHeader().topicName();
  •         ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(topic, msg.payload().nioBuffer());
  • switch (msg.fixedHeader().qosLevel()) {
  • case AT_MOST_ONCE -> producer.send(record, (metadata, exception) -> {
  • if (exception != null) {
  •                     log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, exception);
  • return;
  •                 }
  •                 log.debug("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",
  •                         mqttSessionKey, metadata.topic(), metadata.partition(), metadata.offset());
  •             });
  • case AT_LEAST_ONCE -> {
  • try {
  •                     RecordMetadata recordMetadata = producer.send(record).get();
  •                     log.info("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",
  •                             mqttSessionKey, recordMetadata.topic(),
  •                             recordMetadata.partition(), recordMetadata.offset());
  •                     ctx.writeAndFlush(MqttMessageUtil.pubAckMessage(msg.variableHeader().packetId()));
  •                 } catch (Exception e) {
  •                     log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, e);
  •                 }
  •             }
  • case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(
  •                     String.format("mqttSessionKey %s can not reach here", mqttSessionKey));
  •         }
  • 复制代码

    处理订阅消息,我们暂时仅根据订阅的 topic,创建 topic 进行消费即可,由于 kafka 原生客户端建议的消费代码模式如下
      
    while (true) {
  •   ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));
  • for (ConsumerRecord<String, byte[]> record : records) {
  • // do logic
  •   }
  • }
  • 复制代码

    我们需要切换到其他线程对 consumer 进行消息,书写一个 KafkaConsumerListenerWrapper 的 wrapper,转换为 listener 异步消费模型
      
    package io.github.protocol.mqtt.broker.processor;
  • import lombok.extern.slf4j.Slf4j;
  • import org.apache.kafka.clients.admin.AdminClient;
  • import org.apache.kafka.clients.admin.AdminClientConfig;
  • import org.apache.kafka.clients.admin.KafkaAdminClient;
  • import org.apache.kafka.clients.admin.NewTopic;
  • import org.apache.kafka.clients.admin.TopicDescription;
  • import org.apache.kafka.clients.consumer.ConsumerConfig;
  • import org.apache.kafka.clients.consumer.ConsumerRecord;
  • import org.apache.kafka.clients.consumer.ConsumerRecords;
  • import org.apache.kafka.clients.consumer.KafkaConsumer;
  • import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  • import org.apache.kafka.common.errors.WakeupException;
  • import org.apache.kafka.common.serialization.ByteArrayDeserializer;
  • import org.apache.kafka.common.serialization.StringDeserializer;
  • import java.time.Duration;
  • import java.util.Collections;
  • import java.util.Properties;
  • import java.util.concurrent.ExecutionException;
  • @Slf4j
  • public class KafkaConsumerListenerWrapper implements AutoCloseable {
  • private final AdminClient adminClient;
  • private final KafkaConsumer<String, byte[]> consumer;
  • public KafkaConsumerListenerWrapper(KafkaProcessorConfig config, String username) {
  •         Properties adminProperties = new Properties();
  •         adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
  • this.adminClient = KafkaAdminClient.create(adminProperties);
  •         Properties properties = new Properties();
  •         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
  •         properties.put(ConsumerConfig.GROUP_ID_CONFIG, username);
  •         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  •         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
  • this.consumer = new KafkaConsumer<>(properties);
  •     }
  • public void start(String topic, KafkaMessageListener listener) throws Exception {
  • try {
  •             TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topic))
  •                     .values().get(topic).get();
  •             log.info("topic info is {}", topicDescription);
  •         } catch (ExecutionException ee) {
  • if (ee.getCause() instanceof UnknownTopicOrPartitionException) {
  •                 log.info("topic {} not exist, create it", topic);
  •                 adminClient.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1)));
  •             } else {
  •                 log.error("find topic info {} error", topic, ee);
  •             }
  •         } catch (Exception e) {
  • throw new IllegalStateException("find topic info error", e);
  •         }
  •         consumer.subscribe(Collections.singletonList(topic));
  •         log.info("consumer topic {} start", topic);
  • new Thread(() -> {
  • try {
  • while (true) {
  •                     ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));
  • for (ConsumerRecord<String, byte[]> record : records) {
  •                         listener.messageReceived(record);
  •                     }
  •                 }
  •             } catch (WakeupException we) {
  •                 consumer.close();
  •             } catch (Exception e) {
  •                 log.error("consumer topic {} consume error", topic, e);
  •                 consumer.close();
  •             }
  •         }).start();
  •         Thread.sleep(5_000);
  •     }
  •     @Override
  • public void close() throws Exception {
  •         log.info("wake up {} consumer", consumer);
  •         consumer.wakeup();
  •     }
  • }
  •     @Override
  • protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
  •                                MqttSubscribePayload subscribePayload) throws Exception {
  • for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {
  •             KafkaConsumerListenerWrapper consumer = createConsumer(mqttSessionKey, topicSubscription.topicName());
  •             subscribe(ctx, consumer, topicSubscription.topicName());
  •         }
  •     }
  • private KafkaConsumerListenerWrapper createConsumer(MqttSessionKey mqttSessionKey, String topic) {
  •         MqttTopicKey mqttTopicKey = new MqttTopicKey();
  •         mqttTopicKey.setTopic(topic);
  •         mqttTopicKey.setMqttSessionKey(mqttSessionKey);
  •         wLock.lock();
  • try {
  •             KafkaConsumerListenerWrapper consumer = consumerMap.get(mqttTopicKey);
  • if (consumer == null) {
  •                 consumer = new KafkaConsumerListenerWrapper(kafkaProcessorConfig, mqttSessionKey.getUsername());
  •                 sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
  • if (mqttTopicKeys == null) {
  •                         mqttTopicKeys = new ArrayList<>();
  •                     }
  •                     mqttTopicKeys.add(mqttTopicKey);
  • return mqttTopicKeys;
  •                 });
  •                 consumerMap.put(mqttTopicKey, consumer);
  •             }
  • return consumer;
  •         } finally {
  •             wLock.unlock();
  •         }
  •     }
  • protected void subscribe(ChannelHandlerContext ctx,
  •                              KafkaConsumerListenerWrapper consumer, String topic) throws Exception {
  •         BoundInt boundInt = new BoundInt(65535);
  •         consumer.start(topic, record -> {
  •             log.info("receive message from kafka, topic {}, partition {}, offset {}",
  •                     record.topic(), record.partition(), record.offset());
  •             MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(
  •                     MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), record.value());
  •             ctx.writeAndFlush(mqttPublishMessage);
  •         });
  •     }
  • 复制代码

    在上述的代码中,有一个需要通篇注意的点:日志打印的时候,注意要将关键的信息携带,比如:topic、mqtt username、mqtt clientId 等,在写 demo 的时候没有感觉,但是海量请求下需要定位问题的时候,就知道这些信息的关键之处了。
    使用 BountInt 这个简单的工具类来生成从 0~65535 的 packageId,满足协议的要求
      pulsar processor 实现
    pulsar 相比 kafka 来说,更适合作为 mqtt 协议的代理。原因有如下几点:

    • pulsar 支持百万 topic、topic 实现更轻量
    • pulsar 原生支持 listener 的消费模式,不需要每个消费者启动一个线程
    • pulsar 支持 share 的消费模式,消费模式更灵活
    • pulsar 消费者的 subscribe 可确保成功创建订阅,相比 kafka 的消费者没有这样的语义保障
      
    protected final ReentrantReadWriteLock.ReadLock rLock;
  • protected final ReentrantReadWriteLock.WriteLock wLock;
  • protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;
  • protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;
  • protected final Map<MqttTopicKey, Producer<byte[]>> producerMap;
  • protected final Map<MqttTopicKey, Consumer<byte[]>> consumerMap;
  • private final PulsarProcessorConfig pulsarProcessorConfig;
  • private final PulsarAdmin pulsarAdmin;
  • private final PulsarClient pulsarClient;
  • public PulsarProcessor(MqttAuth mqttAuth, PulsarProcessorConfig pulsarProcessorConfig) {
  • super(mqttAuth);
  •         ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  •         rLock = lock.readLock();
  •         wLock = lock.writeLock();
  • this.sessionProducerMap = new HashMap<>();
  • this.sessionConsumerMap = new HashMap<>();
  • this.producerMap = new HashMap<>();
  • this.consumerMap = new HashMap<>();
  • this.pulsarProcessorConfig = pulsarProcessorConfig;
  • try {
  • this.pulsarAdmin = PulsarAdmin.builder()
  •                     .serviceHttpUrl(pulsarProcessorConfig.getHttpUrl())
  •                     .build();
  • this.pulsarClient = PulsarClient.builder()
  •                     .serviceUrl(pulsarProcessorConfig.getServiceUrl())
  •                     .build();
  •         } catch (Exception e) {
  • throw new IllegalStateException("Failed to create pulsar client", e);
  •         }
  •     }
  • 复制代码


    处理 publish 消息
      
        @Override
  • protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
  •                              MqttPublishMessage msg) throws Exception {
  •         String topic = msg.variableHeader().topicName();
  •         Producer<byte[]> producer = getOrCreateProducer(mqttSessionKey, topic);
  •         int len = msg.payload().readableBytes();
  •         byte[] messageBytes = new byte[len];
  •         msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
  • switch (msg.fixedHeader().qosLevel()) {
  • case AT_MOST_ONCE -> producer.sendAsync(messageBytes).
  •                     thenAccept(messageId -> log.info("clientId [{}],"
  •                                     + " username [{}]. send message to pulsar success messageId: {}",
  •                             mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId))
  •                     .exceptionally((e) -> {
  •                         log.error("clientId [{}], username [{}]. send message to pulsar fail: ",
  •                                 mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e);
  • return null;
  •                     });
  • case AT_LEAST_ONCE -> {
  • try {
  •                     MessageId messageId = producer.send(messageBytes);
  •                     MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK,
  • false, MqttQoS.AT_MOST_ONCE, false, 0);
  •                     MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(fixedHeader,
  •                             MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);
  •                     log.info("clientId [{}], username [{}]. send pulsar success. messageId: {}",
  •                             mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId);
  •                     ctx.writeAndFlush(pubAckMessage);
  •                 } catch (PulsarClientException e) {
  •                     log.error("clientId [{}], username [{}]. send pulsar error: {}",
  •                             mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e.getMessage());
  •                 }
  •             }
  • case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(
  •                     String.format("mqttSessionKey %s can not reach here", mqttSessionKey));
  •         }
  •     }
  • private Producer<byte[]> getOrCreateProducer(MqttSessionKey mqttSessionKey, String topic) throws Exception {
  •         MqttTopicKey mqttTopicKey = new MqttTopicKey();
  •         mqttTopicKey.setTopic(topic);
  •         mqttTopicKey.setMqttSessionKey(mqttSessionKey);
  •         rLock.lock();
  • try {
  •             Producer<byte[]> producer = producerMap.get(mqttTopicKey);
  • if (producer != null) {
  • return producer;
  •             }
  •         } finally {
  •             rLock.unlock();
  •         }
  •         wLock.lock();
  • try {
  •             Producer<byte[]> producer = producerMap.get(mqttTopicKey);
  • if (producer == null) {
  •                 producer = createProducer(topic);
  •                 sessionProducerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
  • if (mqttTopicKeys == null) {
  •                         mqttTopicKeys = new ArrayList<>();
  •                     }
  •                     mqttTopicKeys.add(mqttTopicKey);
  • return mqttTopicKeys;
  •                 });
  •                 producerMap.put(mqttTopicKey, producer);
  •             }
  • return producer;
  •         } finally {
  •             wLock.unlock();
  •         }
  •     }
  • protected Producer<byte[]> createProducer(String topic) throws Exception {
  • return pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
  •     }
  • 复制代码
    处理 subscribe 消息
        @Override
  • protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
  •                                MqttSubscribePayload subscribePayload) throws Exception {
  • for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {
  •             subscribe(ctx, mqttSessionKey, topicSubscription.topicName());
  •         }
  •     }
  • protected void subscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
  •                              String topic) throws Exception {
  •         MqttTopicKey mqttTopicKey = new MqttTopicKey();
  •         mqttTopicKey.setTopic(topic);
  •         mqttTopicKey.setMqttSessionKey(mqttSessionKey);
  •         wLock.lock();
  • try {
  •             Consumer<byte[]> consumer = consumerMap.get(mqttTopicKey);
  • if (consumer == null) {
  •                 consumer = createConsumer(ctx, mqttSessionKey.getUsername(), topic);
  •                 sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
  • if (mqttTopicKeys == null) {
  •                         mqttTopicKeys = new ArrayList<>();
  •                     }
  •                     mqttTopicKeys.add(mqttTopicKey);
  • return mqttTopicKeys;
  •                 });
  •                 consumerMap.put(mqttTopicKey, consumer);
  •             }
  •         } finally {
  •             wLock.unlock();
  •         }
  •     }
  • protected Consumer<byte[]> createConsumer(ChannelHandlerContext ctx, String username,
  •                                               String topic) throws Exception {
  •         BoundInt boundInt = new BoundInt(65535);
  • try {
  •             PartitionedTopicStats partitionedStats = pulsarAdmin.topics().getPartitionedStats(topic, false);
  •             log.info("topic {} partitioned stats {}", topic, partitionedStats);
  •         } catch (PulsarAdminException.NotFoundException nfe) {
  •             log.info("topic {} not found", topic);
  •             pulsarAdmin.topics().createPartitionedTopic(topic, 1);
  •         }
  • return pulsarClient.newConsumer(Schema.BYTES).topic(topic)
  •                 .messageListener((consumer, msg) -> {
  •                     log.info("receive message from pulsar, topic {}, message {}", topic, msg.getMessageId());
  •                     MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(
  •                             MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), msg.getData());
  •                     ctx.writeAndFlush(mqttPublishMessage);
  •                 })
  •                 .subscriptionName(username).subscribe();
  •     }
  • 复制代码
    集成测试  kafka

    我们可以通过 embedded-kafka-java 这个项目来启动用做单元测试的 kafka broker。通过如下的 group 引入依赖
    <dependency>
  • <groupId>io.github.embedded-middleware</groupId>
  • <artifactId>embedded-kafka-core</artifactId>
  • <version>0.0.2</version>
  • <scope>test</scope>
  • </dependency>
  • 复制代码
    我们就可以通过如下的代码启动基于 kafka 的 mqtt broker
    @Slf4j
  • public class MqttKafkaTestUtil {
  • public static MqttServer setupMqttKafka() throws Exception {
  •         EmbeddedKafkaServer embeddedKafkaServer = new EmbeddedKafkaServer();
  • new Thread(() -> {
  • try {
  •                 embeddedKafkaServer.start();
  •             } catch (Exception e) {
  •                 log.error("kafka broker started exception ", e);
  •             }
  •         }).start();
  •         Thread.sleep(5_000);
  •         MqttServerConfig mqttServerConfig = new MqttServerConfig();
  •         mqttServerConfig.setPort(0);
  •         mqttServerConfig.setProcessorType(ProcessorType.KAFKA);
  •         KafkaProcessorConfig kafkaProcessorConfig = new KafkaProcessorConfig();
  •         kafkaProcessorConfig.setBootstrapServers(String.format("localhost:%d", embeddedKafkaServer.getKafkaPort()));
  •         mqttServerConfig.setKafkaProcessorConfig(kafkaProcessorConfig);
  •         MqttServer mqttServer = new MqttServer(mqttServerConfig);
  • new Thread(() -> {
  • try {
  •                 mqttServer.start();
  •             } catch (Exception e) {
  •                 log.error("mqsar broker started exception ", e);
  •             }
  •         }).start();
  •         Thread.sleep(5000L);
  • return mqttServer;
  •     }
  • }
  • 复制代码
    kafka 端到端测试用例,比较简单,通过 mqtt client publish 一条消息,然后消费出来
    @Log4j2
  • public class MqttKafkaPubSubTest {
  •     @Test
  • public void pubSubTest() throws Exception {
  •         MqttServer mqttServer = MqttKafkaTestUtil.setupMqttKafka();
  •         String topic = UUID.randomUUID().toString();
  •         String content = "test-msg";
  •         String broker = String.format("tcp://localhost:%d", mqttServer.getPort());
  •         String clientId = UUID.randomUUID().toString();
  •         MemoryPersistence persistence = new MemoryPersistence();
  •         MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  •         MqttConnectOptions connOpts = new MqttConnectOptions();
  •         connOpts.setUserName(UUID.randomUUID().toString());
  •         connOpts.setPassword(UUID.randomUUID().toString().toCharArray());
  •         connOpts.setCleanSession(true);
  •         log.info("Mqtt connecting to broker");
  •         sampleClient.connect(connOpts);
  •         CompletableFuture<String> future = new CompletableFuture<>();
  •         log.info("Mqtt subscribing");
  •         sampleClient.subscribe(topic, (s, mqttMessage) -> {
  •             log.info("messageArrived");
  •             future.complete(mqttMessage.toString());
  •         });
  •         log.info("Mqtt subscribed");
  •         MqttMessage message = new MqttMessage(content.getBytes());
  •         message.setQos(1);
  •         log.info("Mqtt message publishing");
  •         sampleClient.publish(topic, message);
  •         log.info("Mqtt message published");
  •         TimeUnit.SECONDS.sleep(3);
  •         sampleClient.disconnect();
  •         String msg = future.get(5, TimeUnit.SECONDS);
  •         Assertions.assertEquals(content, msg);
  •     }
  • }
  • 复制代码
    pulsar
    我们可以通过 embedded-pulsar-java 这个项目来启动用做单元测试的 pulsar broker。通过如下的 group 引入依赖
    <dependency>
  • <groupId>io.github.embedded-middleware</groupId>
  • <artifactId>embedded-pulsar-core</artifactId>
  • <version>0.0.2</version>
  • <scope>test</scope>
  • </dependency>
  • 复制代码
    我们就可以通过如下的代码启动基于 pulsar 的 mqtt broker
    @Slf4j
  • public class MqttPulsarTestUtil {
  • public static MqttServer setupMqttPulsar() throws Exception {
  •         EmbeddedPulsarServer embeddedPulsarServer = new EmbeddedPulsarServer();
  •         embeddedPulsarServer.start();
  •         MqttServerConfig mqttServerConfig = new MqttServerConfig();
  •         mqttServerConfig.setPort(0);
  •         mqttServerConfig.setProcessorType(ProcessorType.PULSAR);
  •         PulsarProcessorConfig pulsarProcessorConfig = new PulsarProcessorConfig();
  •         pulsarProcessorConfig.setHttpUrl(String.format("http://localhost:%d", embeddedPulsarServer.getWebPort()));
  •         pulsarProcessorConfig.setServiceUrl(String.format("pulsar://localhost:%d", embeddedPulsarServer.getTcpPort()));
  •         mqttServerConfig.setPulsarProcessorConfig(pulsarProcessorConfig);
  •         MqttServer mqttServer = new MqttServer(mqttServerConfig);
  • new Thread(() -> {
  • try {
  •                 mqttServer.start();
  •             } catch (Exception e) {
  •                 log.error("mqsar broker started exception ", e);
  •             }
  •         }).start();
  •         Thread.sleep(5000L);
  • return mqttServer;
  •     }
  • }
  • 复制代码
    pulsar 端到端测试用例,比较简单,通过 mqtt client publish 一条消息,然后消费出来
    @Log4j2
  • public class MqttPulsarPubSubTest {
  •     @Test
  • public void pubSubTest() throws Exception {
  •         MqttServer mqttServer = MqttPulsarTestUtil.setupMqttPulsar();
  •         String topic = UUID.randomUUID().toString();
  •         String content = "test-msg";
  •         String broker = String.format("tcp://localhost:%d", mqttServer.getPort());
  •         String clientId = UUID.randomUUID().toString();
  •         MemoryPersistence persistence = new MemoryPersistence();
  •         MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  •         MqttConnectOptions connOpts = new MqttConnectOptions();
  •         connOpts.setUserName(UUID.randomUUID().toString());
  •         connOpts.setPassword(UUID.randomUUID().toString().toCharArray());
  •         connOpts.setCleanSession(true);
  •         log.info("Mqtt connecting to broker");
  •         sampleClient.connect(connOpts);
  •         CompletableFuture<String> future = new CompletableFuture<>();
  •         log.info("Mqtt subscribing");
  •         sampleClient.subscribe(topic, (s, mqttMessage) -> {
  •             log.info("messageArrived");
  •             future.complete(mqttMessage.toString());
  •         });
  •         log.info("Mqtt subscribed");
  •         MqttMessage message = new MqttMessage(content.getBytes());
  •         message.setQos(1);
  •         log.info("Mqtt message publishing");
  •         sampleClient.publish(topic, message);
  •         log.info("Mqtt message published");
  •         TimeUnit.SECONDS.sleep(3);
  •         sampleClient.disconnect();
  •         String msg = future.get(5, TimeUnit.SECONDS);
  •         Assertions.assertEquals(content, msg);
  •     }
  • }
  • 复制代码
    性能优化
    这里我们简单描述几个性能优化点,像一些调整线程数、buffer 大小这类的参数调整就不在这里赘述了,这些需要具体的性能压测来决定参数的设置。

    在 linux 上使用 Epoll 网络模型
      
    public class EventLoopUtil {
  • /**
  •      * @return an EventLoopGroup suitable for the current platform
  • */
  • public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
  • if (Epoll.isAvailable()) {
  • return new EpollEventLoopGroup(nThreads, threadFactory);
  •         } else {
  • return new NioEventLoopGroup(nThreads, threadFactory);
  •         }
  •     }
  • public static Class<? extends ServerSocketChannel> getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {
  • if (eventLoopGroup instanceof EpollEventLoopGroup) {
  • return EpollServerSocketChannel.class;
  •         } else {
  • return NioServerSocketChannel.class;
  •         }
  •     }
  • }
  • 复制代码
    通过 Epollo.isAvailable,以及在指定 channel 类型的时候通过判断 group 的类型选择对应的 channel 类型
            EventLoopGroup acceptorGroup = EventLoopUtil.newEventLoopGroup(1,
  • new DefaultThreadFactory("mqtt-acceptor"));
  •         EventLoopGroup workerGroup = EventLoopUtil.newEventLoopGroup(1,
  • new DefaultThreadFactory("mqtt-worker"));
  •                 b.group(acceptorGroup, workerGroup)
  • // key point
  •                     .channel(EventLoopUtil.getServerSocketChannelClass(workerGroup))
  •                     .option(ChannelOption.SO_BACKLOG, 100)
  •                     .handler(new LoggingHandler(LogLevel.INFO))
  •                     .childHandler(new ChannelInitializer<SocketChannel>() {
  •                         @Override
  • public void initChannel(SocketChannel ch) throws Exception {
  •                             ChannelPipeline p = ch.pipeline();
  • // decoder
  •                             p.addLast(new MqttDecoder());
  •                             p.addLast(MqttEncoder.INSTANCE);
  •                             p.addLast(new MqttHandler(processor(mqttServerConfig)));
  •                         }
  •                     });
  • 复制代码
    关闭 tcp keepalive

    由于 mqtt 协议本身就有心跳机制,所以可以关闭 tcp 的 keepalive,依赖 mqtt 协议层的心跳即可,节约海量连接下的性能。配置 ChannelOption.SO_KEEPALIVE 为 false 即可
                        .option(ChannelOption.SO_KEEPALIVE, false)
    复制代码
    超时时间调短

    默认情况下,无论是单元测试中 mqtt,还是 pulsar producer 和 kafka producer 的生产超时时间,都相对较长(一般为 30s),如果在内网环境部署,可以将超时时间调整到 5s。来避免无意义的超时等待

    使用多个 KafkaProducer 来优化性能


    单个 KafkaProducer 会达到 tcp 链路带宽的瓶颈,当有海量请求,而延时在 kafka 生产比较突出的情况下,可以考虑启动多个 KafkaProducer。并根据 mqtt 协议的特点(链路多,单个链路上 qps 不高),用 mqttSessionKey 的哈希值来决定使用那个 KafkaProducer 发送消息

    在 KafkaProcessorConfig 中添加如下配置,生产者个数,默认为 1
    private int producerNum = 1;
    复制代码
    在初始化的时候,初始化 Producer 数组,而不是单个 Producer
    this.producerArray = new KafkaProducer[kafkaProcessorConfig.getProducerNum()];
  • for (int i = 0; i < kafkaProcessorConfig.getProducerNum(); i++) {
  •             producerArray[i] = createProducer();
  •         }
  • 复制代码
    封装一个方法来获取 producer
    private Producer<String, ByteBuffer> getProducer(MqttSessionKey mqttSessionKey) {
  • return producerArray[Math.abs(mqttSessionKey.hashCode() % kafkaProcessorConfig.getProducerNum())];
  •     }
  • 复制代码

    结语


    本文的代码均已上传到 github。我们这里仅仅只实现了基础的 mqtt 连接、发布、订阅功能,甚至不支持暂停、取消订阅。想要实现一个成熟商用的 mqtt 网关,我们还需要用户隔离、对协议的更多支持、可靠性、可运维、流控、安全等能力。如有商用生产级别的 mqtt 需求,又无法快速构筑成熟的 mqtt 网关的,可以选择华为云 IoTDA 服务,提供稳定可靠的 mqtt 服务,支持海量设备连接上云、设备和云端消息双向通信能力。