.NET 使用redis实现消息队列

在秒杀等大量并发的情况下,使用消息队列是解决方案之一,要使用消息队列需要安装中间件,比如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka等,如果安装这些也要浪费一定的时间,如果有现成的组件就好了?那就是redis,使用redis可以降低维护成本和实现复杂度。本文介绍如何使用redis实现消息队列。.

概述

    Redis是内存中的数据结构存储,可以用作数据库、缓存和消息代理。它支持的数据类型有string(字符串),hash(哈希),list(列表),set(集合)及sortset(有序集合)。Redis实现消息队列有四种方案:

  1. 基于List的 LPUSH+BRPOP 的实现

  2. PUB/SUB,发布/订阅模式

  3. 基于Sorted-Set的实现

  4. 基于Stream类型的实现

这四种方案各有优缺点,可以针对不同情况来选择,我们今天主要介绍前二种的实现方式,对其他二种感兴趣的可以查阅相关资料。

实践

一、基于List的的消息队列实现

       我们使用控制台应用模拟消息订阅,首先用一个客户端使用list的LPUSH将消息队列放入队列中去,而另一个客户端使用命令取出队列中的消息。 废话太多,这就开干。

.NET(.net core 3.1以上版本)下环境准备

安装通过nuget或者命令模式安装ServiceStack.Redis的组件

install-package  ServiceStack.Redis

客户入队消息

我们模拟1000个客户秒杀,不直接进入数据库,先进入队列

         //使用Redis的客户端管理器(对象池)        public static IRedisClientsManager redisClientManager = new PooledRedisClientManager(new string[]        {       "10.10.10.59:6379" //从池中获取Redis客户端实例        });         //从池中获取Redis客户端实例        public static IRedisClient redisClient = redisClientManager.GetClient();        static void  Main(string[] args)        {            while (true)              {                //模拟客户                var customerid = new Random().Next(1, 1000).ToString();                if(customerid =="1000")//如果完成跳出,这只是举例,具体根据情况而定                    break;                 redisClient.EnqueueItemOnList("customer",customerid );               }        }

消费队列消息

这里我们定时5秒钟获取一下数据并打印出来,正式环境还需要编写查询等逻辑。

      //使用Redis的客户端管理器(对象池)        public static IRedisClientsManager redisClientManager = new PooledRedisClientManager(new string[]        {       "10.10.10.59:6379" //从池中获取Redis客户端实例        });      static void Main(string[] args)        {            IRedisClient redisClient = redisClientManager.GetClient();             Timer t = new Timer((o) =>            {                if (redisClient.GetListCount("abc") > 0)                {                    var value = redisClient.DequeueItemFromList("abc");                    if (string.IsNullOrWhiteSpace(value))                    {                        Console.WriteLine("队列中数据不存在!");                    }                    else                    {                        Console.WriteLine(value);                        //查询是否还有,如果有返回yes,没有返回no                    }                }else                    Thread.Sleep(500);                     //为避免CPU空转,在队列为空时休息0.5秒            }, null, 5000, 5000);//每5秒取一次        }

  全部代码编写完成,使用list有很多缺陷,比如只能一次消费、不能做广播模式等。这只是个简单案例,仅供参考。具体异常,日志处理需要各位去完善。

二、订阅/发布模式

    这里我们的链接方式与上面相同,就不再重复代码

创建订阅

        static void  Main(string[] args)        {            string tname = "key";            //创建订阅            RedisPubSubServer pubSubServer = new RedisPubSubServer(redisClientManager, tname)            {                OnMessage = (channel, msg) =>                {                    //此处可以写入日志记录                    Console.WriteLine($"当前\"{msg}\"任务成功发布成功!");                    Console.WriteLine("***********************");                },                OnStart = () =>                {                    Console.WriteLine("发布服务已启动,测试发布任务");                },                OnStop = () => { Console.WriteLine("发布服务停止"); },                OnUnSubscribe = channel => { Console.WriteLine(channel); },                OnError = e => { Console.WriteLine(e.Message); },                OnFailover = s => { Console.WriteLine(s); },            };            //开始接收客户的消息            pubSubServer.Start();              while (true)            {                Console.WriteLine("请输入推送内容");                  string message = Console.ReadLine();                redisClient.PublishMessage(tname, message);            }       }

订阅客户端

static void Main(string[] args)        {            try            {                using (IRedisClient consumer = redisClientManager.GetClient())                {
                    Console.WriteLine($"订阅客户端");                    var subscription = consumer.CreateSubscription();                    //接受到消息时                    subscription.OnMessage = (channel, msg) =>                    {                        if (msg != "CTRL:PULSE")                        {//空消息,可以日志记录                         }                        else                            Console.WriteLine(msg);                    };                    //订阅频道                    subscription.OnSubscribe = (channel) =>                    {                        Console.WriteLine("订阅客户端:开始订阅" + channel);                    };                    //取消订阅频道时                    //subscription.OnUnSubscribe = (a) => { Console.WriteLine("订阅客户端:取消订阅"); };                    //订阅频道                    string tname = "key";                    subscription.SubscribeToChannels(tname);                }            }            catch (Exception ex)            {                Console.WriteLine(ex.Message);            }    }
源代码:
https://pan.baidu.com/s/1f8GuYFj0f27eH8jufJagZA?pwd=0wfg
提取码:0wfg
 
结语
    本文讲述了消息队列在redis中实践的基本概念,以及列举了两种实现方式,它们各有利弊,可以根据环境和实际的需求选择,在实际应用中还需要考虑异常、日志等处理。