在秒杀等大量并发的情况下,使用消息队列是解决方案之一,要使用消息队列需要安装中间件,比如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka等,如果安装这些也要浪费一定的时间,如果有现成的组件就好了?那就是redis,使用redis可以降低维护成本和实现复杂度。本文介绍如何使用redis实现消息队列。.
概述
Redis是内存中的数据结构存储,可以用作数据库、缓存和消息代理。它支持的数据类型有string(字符串),hash(哈希),list(列表),set(集合)及sortset(有序集合)。Redis实现消息队列有四种方案:
-
基于List的 LPUSH+BRPOP 的实现
-
PUB/SUB,发布/订阅模式
-
基于Sorted-Set的实现
-
基于Stream类型的实现
这四种方案各有优缺点,可以针对不同情况来选择,我们今天主要介绍前二种的实现方式,对其他二种感兴趣的可以查阅相关资料。
实践
一、基于List的的消息队列实现
我们使用控制台应用模拟消息订阅,首先用一个客户端使用list的LPUSH将消息队列放入队列中去,而另一个客户端使用命令取出队列中的消息。 废话太多,这就开干。
.NET(.net core 3.1以上版本)下环境准备
安装通过nuget或者命令模式安装ServiceStack.Redis的组件
install-package ServiceStack.Redis
客户入队消息
我们模拟1000个客户秒杀,不直接进入数据库,先进入队列
//使用Redis的客户端管理器(对象池)
public static IRedisClientsManager redisClientManager = new PooledRedisClientManager(new string[]
{
"10.10.10.59:6379" //从池中获取Redis客户端实例
});
//从池中获取Redis客户端实例
public static IRedisClient redisClient = redisClientManager.GetClient();
static void Main(string[] args)
{
while (true)
{
//模拟客户
var customerid = new Random().Next(1, 1000).ToString();
if(customerid =="1000")//如果完成跳出,这只是举例,具体根据情况而定
break;
redisClient.EnqueueItemOnList("customer",customerid );
}
}
消费队列消息
这里我们定时5秒钟获取一下数据并打印出来,正式环境还需要编写查询等逻辑。
//使用Redis的客户端管理器(对象池)
public static IRedisClientsManager redisClientManager = new PooledRedisClientManager(new string[]
{
"10.10.10.59:6379" //从池中获取Redis客户端实例
});
static void Main(string[] args)
{
IRedisClient redisClient = redisClientManager.GetClient();
Timer t = new Timer((o) =>
{
if (redisClient.GetListCount("abc") > 0)
{
var value = redisClient.DequeueItemFromList("abc");
if (string.IsNullOrWhiteSpace(value))
{
Console.WriteLine("队列中数据不存在!");
}
else
{
Console.WriteLine(value);
//查询是否还有,如果有返回yes,没有返回no
}
}else
Thread.Sleep(500);
//为避免CPU空转,在队列为空时休息0.5秒
}, null, 5000, 5000);//每5秒取一次
}
全部代码编写完成,使用list有很多缺陷,比如只能一次消费、不能做广播模式等。这只是个简单案例,仅供参考。具体异常,日志处理需要各位去完善。
二、订阅/发布模式
这里我们的链接方式与上面相同,就不再重复代码
创建订阅
static void Main(string[] args)
{
string tname = "key";
//创建订阅
RedisPubSubServer pubSubServer = new RedisPubSubServer(redisClientManager, tname)
{
OnMessage = (channel, msg) =>
{
//此处可以写入日志记录
Console.WriteLine($"当前\"{msg}\"任务成功发布成功!");
Console.WriteLine("***********************");
},
OnStart = () =>
{
Console.WriteLine("发布服务已启动,测试发布任务");
},
OnStop = () => { Console.WriteLine("发布服务停止"); },
OnUnSubscribe = channel => { Console.WriteLine(channel); },
OnError = e => { Console.WriteLine(e.Message); },
OnFailover = s => { Console.WriteLine(s); },
};
//开始接收客户的消息
pubSubServer.Start();
while (true)
{
Console.WriteLine("请输入推送内容");
string message = Console.ReadLine();
redisClient.PublishMessage(tname, message);
}
}
订阅客户端
static void Main(string[] args)
{
try
{
using (IRedisClient consumer = redisClientManager.GetClient())
{
Console.WriteLine($"订阅客户端");
var subscription = consumer.CreateSubscription();
//接受到消息时
subscription.OnMessage = (channel, msg) =>
{
if (msg != "CTRL:PULSE")
{//空消息,可以日志记录
}
else
Console.WriteLine(msg);
};
//订阅频道
subscription.OnSubscribe = (channel) =>
{
Console.WriteLine("订阅客户端:开始订阅" + channel);
};
//取消订阅频道时
//subscription.OnUnSubscribe = (a) => { Console.WriteLine("订阅客户端:取消订阅"); };
//订阅频道
string tname = "key";
subscription.SubscribeToChannels(tname);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}