在秒杀等大量并发的情况下,使用消息队列是解决方案之一,要使用消息队列需要安装中间件,比如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka等,如果安装这些也要浪费一定的时间,如果有现成的组件就好了?那就是redis,使用redis可以降低维护成本和实现复杂度。本文介绍如何使用redis实现消息队列。.
概述
Redis是内存中的数据结构存储,可以用作数据库、缓存和消息代理。它支持的数据类型有string(字符串),hash(哈希),list(列表),set(集合)及sortset(有序集合)。Redis实现消息队列有四种方案:
-
基于List的 LPUSH+BRPOP 的实现
-
PUB/SUB,发布/订阅模式
-
基于Sorted-Set的实现
-
基于Stream类型的实现
这四种方案各有优缺点,可以针对不同情况来选择,我们今天主要介绍前二种的实现方式,对其他二种感兴趣的可以查阅相关资料。
实践
一、基于List的的消息队列实现
我们使用控制台应用模拟消息订阅,首先用一个客户端使用list的LPUSH将消息队列放入队列中去,而另一个客户端使用命令取出队列中的消息。 废话太多,这就开干。
.NET(.net core 3.1以上版本)下环境准备
安装通过nuget或者命令模式安装ServiceStack.Redis的组件
install-package ServiceStack.Redis客户入队消息
我们模拟1000个客户秒杀,不直接进入数据库,先进入队列
//使用Redis的客户端管理器(对象池)public static IRedisClientsManager redisClientManager = new PooledRedisClientManager(new string[]{"10.10.10.59:6379" //从池中获取Redis客户端实例});//从池中获取Redis客户端实例public static IRedisClient redisClient = redisClientManager.GetClient();static void Main(string[] args){while (true){//模拟客户var customerid = new Random().Next(1, 1000).ToString();if(customerid =="1000")//如果完成跳出,这只是举例,具体根据情况而定break;redisClient.EnqueueItemOnList("customer",customerid );}}
消费队列消息
这里我们定时5秒钟获取一下数据并打印出来,正式环境还需要编写查询等逻辑。
//使用Redis的客户端管理器(对象池)public static IRedisClientsManager redisClientManager = new PooledRedisClientManager(new string[]{"10.10.10.59:6379" //从池中获取Redis客户端实例});static void Main(string[] args){IRedisClient redisClient = redisClientManager.GetClient();Timer t = new Timer((o) =>{if (redisClient.GetListCount("abc") > 0){var value = redisClient.DequeueItemFromList("abc");if (string.IsNullOrWhiteSpace(value)){Console.WriteLine("队列中数据不存在!");}else{Console.WriteLine(value);//查询是否还有,如果有返回yes,没有返回no}}elseThread.Sleep(500);//为避免CPU空转,在队列为空时休息0.5秒}, null, 5000, 5000);//每5秒取一次}
全部代码编写完成,使用list有很多缺陷,比如只能一次消费、不能做广播模式等。这只是个简单案例,仅供参考。具体异常,日志处理需要各位去完善。
二、订阅/发布模式
这里我们的链接方式与上面相同,就不再重复代码
创建订阅
static void Main(string[] args){string tname = "key";//创建订阅RedisPubSubServer pubSubServer = new RedisPubSubServer(redisClientManager, tname){OnMessage = (channel, msg) =>{//此处可以写入日志记录Console.WriteLine($"当前\"{msg}\"任务成功发布成功!");Console.WriteLine("***********************");},OnStart = () =>{Console.WriteLine("发布服务已启动,测试发布任务");},OnStop = () => { Console.WriteLine("发布服务停止"); },OnUnSubscribe = channel => { Console.WriteLine(channel); },OnError = e => { Console.WriteLine(e.Message); },OnFailover = s => { Console.WriteLine(s); },};//开始接收客户的消息pubSubServer.Start();while (true){Console.WriteLine("请输入推送内容");string message = Console.ReadLine();redisClient.PublishMessage(tname, message);}}
订阅客户端
static void Main(string[] args){try{using (IRedisClient consumer = redisClientManager.GetClient()){Console.WriteLine($"订阅客户端");var subscription = consumer.CreateSubscription();//接受到消息时subscription.OnMessage = (channel, msg) =>{if (msg != "CTRL:PULSE"){//空消息,可以日志记录}elseConsole.WriteLine(msg);};//订阅频道subscription.OnSubscribe = (channel) =>{Console.WriteLine("订阅客户端:开始订阅" + channel);};//取消订阅频道时//subscription.OnUnSubscribe = (a) => { Console.WriteLine("订阅客户端:取消订阅"); };//订阅频道string tname = "key";subscription.SubscribeToChannels(tname);}}catch (Exception ex){Console.WriteLine(ex.Message);}}