基于ASP.NET Core SignalR的流式传输

SignalR概述

SignalR是ASP.NET Core下非常流行的实现Web实时功能的库。微软再文档中列出了适合的应用场景:

适合 SignalR 的候选项:

  • 需要从服务器进行高频率更新的应用。示例包括游戏、社交网络、投票、拍卖、地图和 GPS 应用。

  • 仪表板和监视应用。示例包括公司仪表板、即时销售更新或旅行警报。

  • 协作应用。协作应用的示例包括白板应用和团队会议软件。

  • 需要通知的应用。社交网络、电子邮件、聊天、游戏、旅行警报和很多其他应用都需使用通知。

 其实只要适合使用Ajax的场景都能使用,他比WebSockets更高级,实现了断线重连,广播,分组等功能。.

流式传输

在介绍SingalR流式处理之前,我想先介绍一下流式处理的基本概念,

一提到流式传输,很多人往往感到比较棘手,那是因为可能用的比较少,通常我们习惯了准备数据然后一口气处理数据的编程范式,而不习惯一个个处理数据的范式。

流式处理就像一个水管,一头进水,一头出水。

例如C#中流式处理一个文本文件,我们是一次读取一行并处理他,而不是一口气读取文件中的所有行并处理他。

using (StreamReader sr = new StreamReader("TestFile.txt"))
{
string line;
// Read and display lines from the file until the end of
// the file is reached.
while ((line = sr.ReadLine()) != null)
{
   Console.WriteLine(line);
}
}

流式处理的好处就是数据的一部分准备好了,就可以对他立即进行处理,在内存中每次仅保留需要处理的那部分数据,这会大大优化内存的使用。

流式梳理非常适合处理大型数据集,例如文件读取,网络数据下载,IoT的数据传输,遍历并逐步处理数据库数据。

IEnumerable<T>

有必要再对这个接口重新做一个简要说明,枚举(称呼枚举或者迭代器,个人认为迭代器更合适)就是一个个列举。他是一种序列的概念,例如 1 2 3 4 5等,每列举一个我就可以处理一个。C#中的foreach就是用于处理迭代器的语法糖:

//枚举每一个Item
foreach(var item in GetNumbers())
{
//处理每个item
}

IEnumerable<int> GetNumbers()
{
int i=0;
while(i<10)
{
yield return i++;
}
}

//作为比对我列举一个常见的编程一口气准备好的编程范式。
List<int> GetNumbers()
{
   List<int> numbers = new List<int>(); //数组会分配内存,如果数据量很大,分配的内存会非常高。
   int i=0;
   while(i<10)
  {
       numbers.add(i);
  }
   return numbers;
}

在C# 8.0以前,这个接口都是同步的,也就是说产生序列的方法会阻塞调用序列的方法。在异步async await 大行其道的今天,显然没有异步版本的迭代器实现是一个巨大的缺陷。(在这一以前可以用使用ValueTask<T>)

IAsyncEnumerable<T>

C# 8.0引入了这个接口,官网文档将它称为异步流。这个接口其实就是为了统一async await IEnumerable<T> . 微软在背后做了大量的工作,具体实现细节大家可以参考Async streams - C# 8.0 specification proposals | Microsoft Docs

我们来使用这个接口返回一个异步的迭代器:

public async IAsyncEnumerable<Data> GetData([EnumeratorCancellation] CancellationToken cancellationToken)
{
   var data = await _respository.GetDataAsync(cancellationToken);//为了演示,假设从数据库异步读取数据。
foreach (var item in data)
{
yield return item;
}
}

SignalR的流式传输

SignalR的流式传输使用了IAsyncEnumerable<T>接口,我直接引用微软的说法:

ASP.NET Core SignalR支持从客户端到服务器以及从服务器到客户端的流式传输。这适用于数据片段随着时间的推移而发生的情况。流式传输时,每个片段一旦变为可用,就会发送到客户端或服务器,而不是等待所有数据都可用。

当集线器方法返回 IAsyncEnumerable ChannelReader Task<IAsyncEnumerable<T>> Task<ChannelReader<T>> 时它会自动成为流式处理中心方法

public class AsyncEnumerableHub : Hub
{
   public async IAsyncEnumerable<int> Counter(
       int count,
       int delay,
      [EnumeratorCancellation]
       CancellationToken cancellationToken)
  {
       for (var i = 0; i < count; i++)
      {
           // Check the cancellation token regularly so that the server will stop
           // producing items if the client disconnects.
           cancellationToken.ThrowIfCancellationRequested();

           yield return i;

           // Use the cancellationToken in other APIs that accept cancellation
           // tokens so the cancellation can flow down to them.
           await Task.Delay(delay, cancellationToken);
      }
  }
}

代码很清楚的说明了这一做法。ChannelReader是另一种实现方法,这篇不做讲解。

做一个例子来实现一下

当下非常流行数据监控大屏应用。监控交通状况,股票行情,企业生产数据看板,IOT传感器实时数据显示,实时销售数据分析等等。

我想做一个简单的数据更新大屏的例子(一切为了简单,只用内存中的数据来存放数据),原谅我UI能比比较差,这篇不展示UI上的东西,而只展示数据如何以流的方式发送。

第一步 准备一个类用于处理数据更新

这个类使用一个定时器来定期更新数据,并记录一个数据是否更新的状态以便于只在数据更新的时候才发送数据。同时使用一个并发字典记录待更新的数据。

public class DataTicker
  {
       private Timer _timer;
       private volatile bool isUpdated  = false;//标记数据是否更新
       private readonly ConcurrentDictionary<string, Data> _datas = new ConcurrentDictionary<string, Data>();
       public DataTicker()
      {
           InitData();
           StartUpdateData();
      }
       public bool IsUpdated
      {
           get { return isUpdated; }
      }
       public async Task<ICollection<Data>> GetData()
      {
           if (IsUpdated) //如果数据已更新才发送数据
          {
               await Task.Delay(500); //模拟返回数据的延迟。这里好的做法是异步的方式返回数据。
               return _datas.Values;
          }
           return null;
      }
       public void StartUpdateData()
      {
           _timer = new Timer(UpdateDate, null, TimeSpan.FromMilliseconds(3000), TimeSpan.FromMilliseconds(3000));
      }
       private void InitData()
      {
           for(int i=0;i<10;i++)
          {
               Data data = new Data();
               data.Id = i;
               data.Name = i.ToString();
               data.UpdatedDate = DateTime.Now;
               _datas.TryAdd(i.ToString(), data);
          }
      }
       public void UpdateDate(object state)
      {
var newData =
           foreach (var item in _datas.Values)
          {
               item.UpdatedDate = System.DateTime.Now;

          }
           this.isUpdated = true;
      }
       public void MarkAsRead()
      {
           this.isUpdated = false;
      }
  }

第二步 编写SignalR中心异步流方法

public class DataUpdateHub : Hub
  {     
       private DataTicker _ticker;

       public DataUpdateHub(DataTicker stockTicker)
      {
           this._ticker = stockTicker;
      }

       //这个方法没有使用cancellationToken,好的做法是从外部传递cancellationToken
       public async IAsyncEnumerable<Data> GetData([EnumeratorCancellation] CancellationToken cancellationToken)
      {
           while (true)
          {
               var data = await _ticker.GetData(); //当有数据更新的时候方法会返回数据,否则返回空。
               if (data != null)
              {
                   foreach (var item in data)
                  {
                       yield return item;
                  }
                   _ticker.MarkAsRead(); //发送完毕后,将标记数据标记为已发送。
              }
          }
      }
  }

第三步 Javascript客户端调用代码

connection.start().then(function () {
   connection.stream("GetData") //连接到hub的异步流方法,当有数据从中心流出时候,next方法会被调用。因为本例中心使用wihle(true)方法,数据流会一直发送,所以complete方法将不会被调用。如果中心方法返回有限数据结束后例如使用foreach,则会调用complete方法。
      .subscribe({
           next: (item) => {
               var li = document.createElement("li");
               li.textContent = "ID:" + item.id + "Datetime:" + item.updatedDate;
               document.getElementById("messagesList").appendChild(li);
          },
           complete: () => {
               var li = document.createElement("li");
               li.textContent = "Stream completed";
               document.getElementById("messagesList").appendChild(li);
          },
           error: (err) => {
               var li = document.createElement("li");
               li.textContent = err;
               document.getElementById("messagesList").appendChild(li);
          },
      });
}).catch(function (err) {
   return console.error(err.toString());
});

第四步 配置DataTicker依赖注入

builder.Services.AddSignalR();
builder.Services.AddSingleton<DataTicker>();

注意:这里的DataTicker注册的是单例的,也就是说所有发送给客户端的数据是共享的。既然是共享的,也就存在并发访问的问题,这就是为什么使用volatile关键字和并发字典的原因。

总结

ASP.NET SignalR可以很方便的使用异步流传输数据,这样客户端和服务器端是使用流的方式连接在一起的。

本文使用的是从服务器到客户端的流,当然你可以使用从客户端到服务器端的流。从.Net和Java的客户端都可以调用SignalR的中心流方法。具体可以参考微软官方文档,使用 ASP.NET Core 中的流式处理SignalR | Microsoft Docs