问题
我们使用各种系统时候会遇到以下问题:
-
12306上购买火车票如果15分钟内未完成支付则订单自动取消。
-
会议场馆预定座位,如果10分钟内未完成支付则预定自动取消。
-
在指定时间之后,我需要执行一项任务。
我之前做的很多系统,往往都是定期执行一个特定任务。而上诉问题都涉及到滑动窗口时间的定时任务。.
比如:我早上10点20分预定了一张火车票,我需要在15分钟内支付完成,否则订单会被取消。同一时间可能会有成百上千的人预定其他火车票,我需要在每个人的15分钟期限达时候执行检查,如果还未支付则自动取消订单。
方案
我们搞清楚了要解决的问题以后,我们来思考方案。有经验的程序员会立即思考出下面的方案:
-
使用消息队列的延迟投送功能,每个订单添加成功后发送一个延迟15分钟的延迟消息。订单状态处理器15分钟后收到消息,检查支付状态,如果未支付则取消订单。
-
Redis也有类似的功能,原理大致相同。
但我不想使用消息队列的功能,因为延迟消息投送是一种技术实现,我希望用代码反应这种业务实现,所以用纯代码来处理他。(我并不是为了从新发明车轮,因为这是一种业务需求,会有变化扩展的需要,所以决定自己尝试做一下增加经验)
算法思路:
我们的需求定时时间都在15分钟以内,假定都没有超过1个小时的或者几天的。(如果超过1个小时的,可以扩展这个设计,这篇暂时不展开讨论)
我们可考虑将一个小时分成3600秒,每秒代表一个位置来存储所有到期的订单,当下单的时候根据当前时间 加上 15分钟时间间隔,我们就可以得到15分钟以后的时间,将这个订单添加到对应的位置上。
数据结构选择:
我们选择C#中提供的最新的并发字典作为基础数据结构,Key值是3600秒中的每一秒的数值,内容是一个队列用于存放该时间点的所有订单。
public ConcurrentDictionary<int, ConcurrentQueue<IJob>> jobs = new ConcurrentDictionary<int, ConcurrentQueue<IJob>>();
数据结构我们思考好了,其实功能就完成了大半了,代码的设计也就基本定下来了。
代码实现(我喜欢使用控制台应用程序做实验)
1,建立一个定时管理器
public class TimerManager
{
//并发字典存储需要检查的任务(这里可以是订单检查任务,每个任务可以包含一个订单Id)
public ConcurrentDictionary<int, ConcurrentQueue<IJob>> jobs =
new ConcurrentDictionary<int, ConcurrentQueue<IJob>>();
private Timer timer;
public TimerManager()
{
//每间隔1秒钟执行一次。和当前时间同步。
timer = new Timer(ProcessJobs, null, 0, 1000);
}
}
2,增加一个任务到字典
/// <summary>
/// 增加一个任务到时间字典中
/// </summary>
/// <param name="timeKey">根据延迟时间计算出的key值</param>
/// <param name="duetime">毫秒单位</param>
/// <exception cref="NotImplementedException"></exception>
public void AddJob(IJob job, TimeSpan duetime)
{
var key = GetKey(duetime);
ConcurrentQueue<IJob> queue = new ConcurrentQueue<IJob>();
queue.Enqueue(job);
jobs.AddOrUpdate(key, queue, (key, jobs) =>
{
jobs.Enqueue(job);
return jobs;
});
}
3,根据时间计算Key的方法
/// <summary>
/// 根据延迟时间生成当前键值
/// </summary>
/// <param name="duetime"></param>
/// <returns></returns>
private int GetKey(TimeSpan duetime)
{
var currentDateTime = DateTime.Now;
//到期时间
var targetDateTime = currentDateTime.Add(duetime);
//不要忘了把分钟换算成秒,然后在和延迟时间相加就得到Key
var key = targetDateTime.Minute * 60 + targetDateTime.Second;
return key;
}
4,将任务添加到字典
/// <summary>
/// 增加一个任务到时间字典中
/// </summary>
/// <param name="job">需要执行的任务</param>
/// <param name="duetime">多少时间间隔后检查</param>
public void AddJob(IJob job, TimeSpan duetime)
{
var key = GetKey(duetime);
ConcurrentQueue<IJob> queue = new ConcurrentQueue<IJob>();
queue.Enqueue(job);
//这是并发字典的方法,这里就是当Key不存在就增加新的值进去,当Key存在就在Key的队列中增加一个新任务
jobs.AddOrUpdate(key, queue, (key, jobs) =>
{
jobs.Enqueue(job);
return jobs;
});
}
5,计时器每秒执行时处理任务的方法,循环从队列中取出任务直到所有任务处理完毕。
private async void ProcessJobs(object state)
{
//根据当前时间计算Key值
var key = DateTime.Now.Minute * 60 + DateTime.Now.Second;
Console.WriteLine(key);
//查找Key值对应的任务队列并处理。
bool keyExists = jobs.TryGetValue(key, out var jobQueue);
if (keyExists)
{
IJob job;
while(jobQueue.TryDequeue(out job))
{
await job.Run();
}
}
}
6,代码中设计IJob 和Job的一个实现,为了易于理解,这个job没有做太多事情。如果需要扩展去检查订单,可以在这里记录订单Id,创建任务的时候将订单ID和任务关联,这样定时器处理这个任务的时候能找到对应订单了。
public interface IJob
{
Task Run();
}
/// <summary>
/// 代表一个工作
/// </summary>
public class Job : IJob
{
public Guid JobId { get; set; }
public Job()
{
JobId = Guid.NewGuid();
}
public async Task Run()
{
Console.WriteLine(" Job Id: " + JobId.ToString() + " is running.");
await Task.Delay(2000);
Console.WriteLine(" Job Id:" + JobId.ToString() + " have completed.");
}
}
7,主程序Programe中调用定时管理器
using TimerTest;
Console.WriteLine("Hello, World!");
TimerManager timerManager = new TimerManager();
Job job1 = new Job();
// 添加一个任务1分钟后执行
timerManager.AddJob( job1, TimeSpan.FromMinutes(1));
// 在添加另一个任务2分钟后执行
Job job2 = new Job();
timerManager.AddJob(job2, TimeSpan.FromMinutes(2));
Console.ReadLine();
执行结果
结果中可以看到, 任务1 在1014的键值上被处理,1014的键值对应的时间是 16:54 秒,也就是在我运行这个程序1分钟后。
任务 | 添加时间 | 执行时间 |
---|---|---|
第一次任务(计时1分钟) | 15:54 | 16:54 |
第二次任务(计时2分钟) | 15:54 | 17:54 |
任务2 在 1074的键值上被处理,1074对应的时间是 17:54 秒 执行。从上表可以看出程序正常运行得出结果。
总结
这是一个简单的控制台程序验证了这个定时管理器的实现方法,我们将1个小时分成3600秒,每一秒对应一个Key值,在这个值上我们存储需要被处理的任务。在增加任务时候,我们也用同样的算法确定这个Key值。处理的时候根据当前时间计算除Key值进行处理。
这样的话,在真实场景中,我们有3600个Key值可以存储每一秒钟用户提交的所有订单,时间没走过1秒我就处理对应的任务。
后续可以完善的地方
-
我们可以将这个类添加到ASP.NET MVC中,使用依赖注入为单实例生命周期,并发字典和并发队列是线程安全的,所以这里可以放心使用。
-
我们可以扩展Job方法,根据业务逻辑添加更多的信息以便于处理。例如处理订单的ID,或其他什么业务ID。
-
处理任务的方法可以采用多个消费者并发执行,增加处理速度。
-
可以将任务实体存储到数据库,以便于应对突发宕机事故可以快速重建任务。
-
当然我们也可以用Hangfire来轻松实现这个业务。
var jobId = BackgroundJob.Schedule(
() => Console.WriteLine("Delayed!"),
TimeSpan.FromDays(7)); //这里改成分钟就好了