使用MASA全家桶从零开始搭建IoT平台(四)处理设备上行数据-获取并通知业务系统

前言

物联网平台首先需要可以获取并处理设备上报的MQTT中的数据,我们称这部分为上行数据。
本章将分为四小节。
1、通过MQTT获取设备上报数据并通知业务系统
2、使用时序库存储上行数据
3、使用规则引擎实现告警通知
4、业务系统查询历史数据.

分析

1:因为MQTT协议里面没用服务端和客户端的区别,那么我们可以创建一个IoT Hub的server来接收设备端的数据,它需要和其他设备一样以MQTT客户端的身份接入MQTT,并订阅特定Topic来实现获取设备数据。
2:可以使用第三章介绍的Webhook方式,在设备向特定Topic发布数据时,EMQX会收到Publish数据包,我们可以通过Webhook将数据传递出来。
以上两种方案都可以实现,这里我们使用第一种方式,因为它复杂度相对低一些。

方案

完全基于MQTT的方案
我们与设备约定,设备连接MQTT成功后,消息全部通过名称规则为"topic/{设备名称}"的Topic传递
那么我们可以订阅所有规则为"topic/{设备名称}"的Topic来获取该设备的上行数据。
EMQX中还支持通配符,我们可以通过订阅"topic/+",来获取所有设备的上行数据。关于通配符的更多内容,请参考官方文档

https://www.emqx.io/docs/zh/v5/deploy/cluster/introduction.html#主题树-主题匹配通配符

这个方案可行,但是存在单点故障,如果我们的IoT Hub出现了故障,或者数据流量很大导致Hub出现瓶颈,导致最新上报的数据无法及时有效地被处理,可能会影响业务
我们想到可以分配多个Hub,我们可以在设备发布数据时随机生成1-10个Topic分组,“topic/1/+”…“topic/10/+” ,这样我们可以启动多个Hub,然后每个Hub分别订阅一个主题,这样压力会平分到10个Hub server中,这样做虽然可以减少系统瓶颈的可能,但是如果某一个hub挂了,还是会有一部分设备上报的数据受影响,直到该hub恢复。
如果可以弹性的伸缩Hub订阅,支持负载均衡就好了。

共享订阅

EMQX 支持共享订阅,多个订阅者订阅同一主题,EMQX Broker会按照一定的分发策略将消息发给订阅者,在这个层面上实现订阅者的负载均衡。
共享订阅只需要在订阅时加上前缀即可(发布的时候无需添加)
在原有主题的基础上,添加 $share 前缀即可为一组订阅端启用共享订阅,而且也是支持通配符 “#” 和 “+”。

示例 前缀 真实主题名
$share/topic/1 $share/ topic/1

带群组的共享订阅

以 $share/<group-name> 为前缀的共享订阅是带群组的共享订阅:
group-name 可以为任意字符串,属于同一个群组内部的订阅者将以负载均衡接收消息,但 EMQX 会向不同群组广播消息。
例如,假设订阅者 s1,s2,s3 属于群组 g1,订阅者 s4,s5 属于群组 g2。
那么当 EMQX 向这个主题发布消息 msg1 的时候:
EMQX 会向两个群组 g1 和 g2 同时发送 msg1
s1,s2,s3 中只有一个会收到 msg1
s4,s5 中只有一个会收到 msg1

均衡分发策略

EMQX 支持很多不同的平衡策略(MQTT协议中并没有明确的规范平衡策略)
平衡策略可以在全局或每组中指定。可以在 etc/emqx.conf 文件中修改

  1. 1.全局策略可以在 broker.shared_subscription_strategy 配置中设置。

    2.配置 broker.shared_subscription_group.$group_name.strategy 为每组策略。

均衡策略 描述
random 在所有订阅者中随机选择
round_robin 按照订阅顺序选择
round_robin_per_group 在每个共享订阅组中按照订阅顺序进行选择
local 随机在本地订阅中进行选择,如无法找到,则在集群范围内随机选择
sticky 选定订阅者后,始终向其进行发送,直到该订阅者断开连接
hash_clientid 通过对发送者的客户端 ID 进行 Hash 处理来选择订阅者
hash_topic 通过对源主题进行 Hash 处理来选择订阅者

使用共享订阅的方式,可以根据数据量动态的添加和减少Hub数量,不存在单点故障,扩展性好。

数据格式

上行数据分两部分,Payload和Metadata
Metadata部分,我们在MQTTX中我们可以通过点击Meta来添加

使用MASA全家桶从零开始搭建IoT平台(四)处理设备上行数据-获取并通知业务系统

  • 1、Payload指消息所携带的数据本身,多数情况为传感器数据,可以是任何格式,例如二进制或者JSON字符串,格式是业务和嵌入式之间约定的,Hub部分不解析Payload只是将它传递到业务系统
  • 2、Metadata相当于我们http请求中的head数据,可以添加发布者的身份信息或者消息的唯一ID等等。据本人了解,貌似不是所有的MQTT平台都支持设备端自定义Metadata,或者说很多通讯模组已经配置好了Metadata,不支持自定义,所以这里我们暂时不在Metadata中配置任何信息。

实施流程

  1. 我们在MASA.IoT.Hub项目中创建一个服务HostedService
  2. 以mqtt client的形式连接EMQX的mqtt,并订阅所有设备的Topic
  3. 将消息原封不动的封装,并发送给业务系统(MASA.IoT.Core项目中的api接口)

编写代码

我们首选在MASA.IoT.Common项目中安装 MQTTnet nuget包,方便我们连接mqtt
并创建MQTTHelper类
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Formatter;

namespace MASA.IoT.Common.Helper
{
    public class MqttHelper
    {
        private MqttFactory _mqttFactory;
        private IMqttClient _mqttClient;
        private MqttClientOptions _mqttClientOptions;
        private MqttClientSubscribeOptions _mqttClientSubscribeOptions;

        public MqttHelper(string mqttUrl, string clientID, string userName, string passWord)
        {
            _mqttFactory = new MqttFactory();
            _mqttClient = _mqttFactory.CreateMqttClient();
            _mqttClientOptions = new MqttClientOptionsBuilder()
                                  .WithTcpServer(mqttUrl)
              .WithCredentials(userName, passWord).WithProtocolVersion(MqttProtocolVersion.V500).Build();

            _mqttClientOptions.ClientId = clientID;
        }

        /// <summary>
        /// 连接并订阅Topic
        /// </summary>
        /// <param name="callback"></param>
        /// <param name="topic"></param>
        /// <returns></returns>
        public async Task ConnectClient(Func<MqttApplicationMessageReceivedEventArgs, Task> callback, string topic) 
        {
            _mqttClientSubscribeOptions = _mqttFactory.CreateSubscribeOptionsBuilder()
                .WithTopicFilter(f => { f.WithTopic(topic); })
                .Build();

            var response = await _mqttClient.ConnectAsync(_mqttClientOptions, CancellationToken.None);
            if (response.ResultCode == MqttClientConnectResultCode.Success)
            {
                Console.WriteLine($"The MQTT client with topic:{topic} is connected.");
                await Task.Delay(500);
                _mqttClient.ApplicationMessageReceivedAsync += callback;
                await _mqttClient.SubscribeAsync(_mqttClientSubscribeOptions, CancellationToken.None);
            }
        }

        /// <summary>
        /// 断开连接
        /// </summary>
        /// <returns></returns>
        public async Task Disconnect_Client() 
        {
            if (_mqttClient.IsConnected)
            {
                await _mqttClient.DisconnectAsync();

                Console.WriteLine("The MQTT client is Disconnected.");
            }
        }
    }
}

在ConnectClient方法中,连接mqtt的同时会订阅特定topic,然后获取到订阅的消息时调用Func<MqttApplicationMessageReceivedEventArgs, Task> 类型的callback,消息的内容(Payload)通过MqttApplicationMessageReceivedEventArgs中的ApplicationMessage.Payload获取。
我们在MASA.IoT.Hub项目中新建MQHostedService后台任务
using Dapr.Client;
using MASA.IoT.Common;
using MASA.IoT.Common.Helper;
using Microsoft.Extensions.Options;
using MQTTnet.Client;

namespace MASA.IoT.Hub;

public class MQHostedService : IHostedService
{
    private readonly HubAppSettings _appSettings;
    private readonly DaprClient _daprClient;
    public MQHostedService(IOptions<HubAppSettings> appSettings)
    {
        _daprClient = new DaprClientBuilder().Build();
        _appSettings = appSettings.Value;
    }

    /// <summary>
    /// 开始                
    /// </summary>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var mqttHelper = new MqttHelper(_appSettings.MqttSetting.MqttUrl, "IoTHub", _appSettings.MqttSetting.UserName, _appSettings.MqttSetting.Password);
 
        await mqttHelper.ConnectClient(CallbackAsync, _appSettings.MqttSetting.Topic);

    }
    private async Task CallbackAsync(MqttApplicationMessageReceivedEventArgs e)
    {
        var deviceDataPointStr = System.Text.Encoding.Default.GetString(e.ApplicationMessage.PayloadSegment);
        
        Console.WriteLine(deviceDataPointStr);
        var pubSubOptions = new PubSubOptions
        {
            DeviceName = e.ApplicationMessage.Topic[6..],
            Msg = deviceDataPointStr,
            PubTime = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(),
            TrackId = Guid.NewGuid()
        };                            
        try
        {
            await _daprClient.PublishEventAsync("pubsub", "DeviceMessage", pubSubOptions);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

    }
    /// <summary>
    /// 结束
    /// </summary>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public Task StopAsync(CancellationToken cancellationToken)
    {
        return Task.CompletedTask;
    }
}

appsettings.json内容
{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "MqttSetting": {
    "MqttUrl": "192.120.5.204",
    "UserName": "IoTHub",
    "Password": "123456",
    "Topic": "$share/IotHub/topic/+"
  },
  "DaprOptions": {
    "AppId": "masa-iot-service-hub",
    "AppPort": 18001,
    "AppIdSuffix": "",
    "DaprGrpcPort": 20241,
    "DaprHttpPort": 20242
  }
}

后台任务以ClientID=“IoTHub"的mqtt 客户端形式接入,用户名和密码是在Emqx配置好的一个普通设备的权限。Topic为”$share/IotHub/topic/+",通过通配符+订阅所有设备上行数据的Topic。
数据以PubSubOptions形式包装,并通过_daprClient.PublishEventAsync通过dapr的方式发送到业务的接口上,
最后不要忘记在Program.cs中注入后台服务
builder.Services.AddHostedService<MQHostedService>();

这部分不了解的可以参考第一章的Dapr搭建教程

我们继续在MASA.IoT.Core项目中添加控制器DeviceMqttController.cs
using Dapr;
using MASA.IoT.Common;
using Microsoft.AspNetCore.Mvc;

namespace MASA.IoT.Core.Controllers
{

    [Route("api/[controller]")]
    [ApiController]
    public class DeviceMqttController : ControllerBase
    {
        [Topic("pubsub", "DeviceMessage")]
        [HttpPost("DeviceMessage")]
        public void DeviceMessageAsync([FromBody] PubSubOptions pubSubOptions)
        {
            Console.WriteLine($"Subscriber received, DeviceName:{pubSubOptions.DeviceName},Msg:{pubSubOptions.Msg}");
        }
    }
}
这样Hub获取到订阅数据后就可以调用该接口将数据通知到业务系统。
这里演示只是简单的打印出消息数据。
验证效果
我们同时启动MASA.IoT.Hub和MASA.IoT.Core项目,并启动MQTTX模拟设备连接MQTT,我们可以在EMQX后台看到连接的设备和订阅的情况

使用MASA全家桶从零开始搭建IoT平台(四)处理设备上行数据-获取并通知业务系统

使用MASA全家桶从零开始搭建IoT平台(四)处理设备上行数据-获取并通知业务系统

我们尝试发布一条数据,可以看到MASA.IoT.Core项目的接口成功的获取了数据

使用MASA全家桶从零开始搭建IoT平台(四)处理设备上行数据-获取并通知业务系统

这里发现打印的payload中没有时间戳,这是个很严重的问题,因为大部分设备是无法获取时间的,也就没有办法将时间发布到MQTT,所以需要在mqtt上接收到设备发布的消息之后,自动添加一个时间戳,其实EMQX已经自动添加了时间戳,但是MQTTNet中我没有找到方法可以直接拿到。
这部分内容我们下一节实现。
总结
本文以基于共享订阅的方式来实现获取设备上行数据,
真实的场景我们还需要处理重复数据,关于消息的去重需要先了解 MQTT QoS 0, 1, 2
篇幅问题本文不再进行扩展,可以参考下面的文章

https://www.emqx.com/zh/blog/introduction-to-mqtt-qos

完整代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos