【MQTT 系列】(五)实现反向 RPC
今天,我将向大家介绍我基于 Golang 实现的 MQTT 反向 RPC 框架:Go reverse RPC。
RRPC 的定义
RRPC 即 Revese RPC,我们一般称设备端向服务端请求成为 RPC(Remote Protocal Call),所以反向即服务端向设备端请求,这在设备端是常见的做法,因为设备端没有独立的公网 IP,服务器端无法直接向设备端请求,这种情况下,就需要设备端发起一个长链接到服务器端,然后服务器就能在这个长链接的基础上,对设备端进行 RPC 请求。
为什么需要 RRPC
首先是因为 MQTT 3.1/3.1.1 并没有对请求回复模式的原生的支持,直到 MQTT 5 才提供,其次就是我们通常用的发布订阅模式来实现 RPC 要么是针对每个方法进行单独的处理。其次在 Golang 中,相对于 HTTP 协议的中的 Gin、Beego、Echo 等优秀框架,MQTT 并没有现成并且好用的框架,我们经常实现的方式就是针对每个需要回复的地方单独处理,容易造成大量冗余的代码,不优雅,另一种方式是用数据库来记录请求,从而将请求跟响应联系起来,显然,如果是针对调用设备端的服务而言,这种实现方式跟数据库有耦合存在,不仅实现起来有些复杂,也容易在数据库中留下大量无法配对的请求,而且性能也不会太好。于是 RRPC 就出现了,它针对我们服务器对现成设备的服务调用提供了简洁、高效的实现方式,特别是在与第三方云平台的 MQTT 物联网协议对接时。
功能点
RRPC 的基本原理已经明确,现在我们来看一下它的功能点:
- 支持 MQTT 长连接协议(目前只实现了 MQTT)
- 支持 Protobuf 以及 JSON 等多种编码格式
- 支持数据压缩:GZip/Zlib/Brotli
- 提供监控:指标、日志以及追踪
- 支持错误恢复以及处理
如何使用
在这个项目的使用场景中,Client 代表客户端(在服务器端中使用),Server 代表服务端(在设备端中使用)。
首先,我们需要新建一个 MQTT 客户端实例,然后在服务端创建以及注册调用的方法:
1 | import ( |
不知道你有没有看到 Gin 的身影,我参照了它的设计。
然后,我们需要在客户端中,也是一样,创建一个 MQTT 客户端实例,然后向服务端进行请求:
1 | import ( |
从上面的例子中,大家应该有初步的印象了,其中的关键在于:
- 连接到同一个 MQTT Broker
tcp://localhost
; - 服务端需要先注册好
example-method
所对应的方法,并实现业务逻辑; - 客户端发送请求调用设备号为
dev-123456
的example-method
的方法;
设计与实现
从设计本身而言,并没有什么复杂的地方,因为本质上它还是所谓的 RPC,所以,先让我们来看看流程图:
graph LR Client -->|Call| ClientStub ClientStub -->|Marshal| Network Network -->|Send| ServerStub ServerStub -->|Unmarshal| Server Server -->|Process| ServerStub ServerStub -->|Marshal| Network Network -->|Send| ClientStub ClientStub -->|Unmarshal| Client
从中可以看到,主要的模块也就客户端以及服务端,客户端承担的职责是发送请求以及接收回复,服务端承担的职责是接收以及处理请求,要达到发送,接收处理以及返回。因此两边必然要有基础的协议来进行序列化以及反序列化,以及发送以及接收数据流。
那么,上面的例子的原理是什么呢?让我们用时序图来解释:
sequenceDiagram Client->>MQTTBroker: Subscribe test-prefix/dev-123456/response/+ Server->>MQTTBroker: Subscribe test-prefix/dev-123456/request/+ Client->>MQTTBroker: Send to test-prefix/dev-123456/request/123, call "example-method" MQTTBroker->>Server: Forward data to test-prefix/dev-123456/request/123 Server->>Server: Dispatch call to method "example-method", handle it Server->>MQTTBroker: Send reply to test-prefix/dev-123456/response/123 MQTTBroker->>Client: Forward data to test-prefix/dev-123456/response/123
这里有几个非常关键的地方:
request
以及response
这两个关键词是约定好的,当前设计不可修改,后续可以考虑改为可修改;request
以及response
后面建议加上唯一的请求 ID,方便 RPC 解析处理;Publish
以及Subscribe
的 QoS 默认为 0,且不可修改(思考题:为什么?);
另外一个值得注意的地方在于,这个 RRPC 框架同时支持 JSON 以及 Protobuf 两种序列化与反序列化方式,因此如果是为了传输数据的可读性,那么选 JSON 即可,而如果是在意效率,那么 Protobuf 就是你的选择。在设备的性能以及电能足够的前提下,还可以选择给 Protobuf 启用数据压缩,目前支持 GZip/Zlib/Brotli,这样就能进一步的节约网络流量,JSON 暂时不支持数据压缩,毕竟需要用到 JSON 的场景下,都是以纯文本为主。
计划
- 这个库的 MQTT 协议是以 paho.mqtt.golang 为基础的,它目前只支持 MQTT 3.1/3.1.1 版本,因此之后会计划支持 MQTT 5 支持。实际上对于 MQTT 5 协议,其实是更适合改造的,毕竟它在协议层面就支持了 RPC 模式;
- 计划支持常用的长连接协议:WebSocket 以及 AMQP 协议支持;
- 长期来看,如果项目越来越多人用的话,会考虑参考 gRPC 的方式,即定一个协议文件,生成相关的 SDK;
- 目前程序可观测性方面仅支持 Prometheus,计划在之后支持 OpenTelemetry;
- 其它语言的版本 Node.js / Java / C++ 等;
总结
目前整个框架还是有许多不足之处,也没有得到广泛的验证,只在之前公司的项目中得到了应用,这也是我将它开源出来的目的,希望物联网开发者可以考虑下这个框架并且提出你的宝贵意见。
最后,如果可以的话,请帮忙点个 Star 吧!