啥是Actor模型
Actor (英语翻译) 这个概念要回溯到面向对象程序设计的本身上来,更偏向于现实世界,现实世界就是由单个个体(人)与其他个体或(人)通讯(消息)组成的现实世界,所以,它的好处是显而易见的,生活就是Actor。
.
现有的面向对象编程模型是基于内存共享线程模型的,而Actor是基于消息模型的,Actor 之间是完全隔离的,不会共享任何变量。
基于内存,那么,内存会溢出,异常,基于消息的话,则没有这种困扰。
又由于它变相的来讲是单线程的,自己处理自己的事务,又实现了与其他业务的隔离。
Erlang语言,天生支持Actor模型,是一个好语言啊。
Actor 的优势
-
1. Actor不共享状态
-
2. 高并发,无锁(无共享状态)
-
3. Actor效率高(同一时刻只处理一个任务,属于单线程处理)
Actor 框架有
-
1. Orleans
-
2. Dapr
-
3. Akka.NET
-
4. Proto.Actor
Actor模型的实现
为啥要实现Actor模型,一个是为了更深入的了解它,一个是想实现一下。
可能在某些地方直接就用了。没必要搞那么复杂引用。
Actor模型的原理
按照自己的理解画了一下。

简单来讲,就是各个服务都可以投递消息到Actor实例里,Actor会从邮箱里把消息取出来,然后,消费掉。这么简单的一件事情。
Actor 代码逻辑的实现
IActor.cs
/// <summary>
/// 无锁并行编程模型(暂时用来处理串行任务,任务串行执行)
/// </summary>
public interface IActor
{
/// <summary>
/// 增加消息
/// </summary>
/// <returns></returns>
bool AddMsg(object message);
/// <summary>
/// 启动服务
/// </summary>
/// <returns></returns>
Task Start();
/// <summary>
/// 停止服务运行,等待毫秒数
/// </summary>
/// <param name="WatingTimeout"></param>
/// <returns></returns>
bool Stop(int WatingTimeout);
}
Actor.cs
/// <summary>
/// Actor抽象
/// </summary>
public abstract class Actor : IDisposable, IActor
{
public Actor(string name)
{
Name = name;
MailBox = new BlockingCollection<object>();
}
/// <summary>
/// 名称
/// </summary>
public string Name { get; set; }
/// <summary>
/// 是否启用
/// </summary>
public bool Active { get; private set; }
/// <summary>
/// 是否长时间运行。长时间运行任务使用独立线程,默认true
/// </summary>
public bool LongRunning { get; set; } = true;
/// <summary>
/// 处理的消息邮箱
/// </summary>
public BlockingCollection<object> MailBox { get; set; }
/// <summary>
/// 内置任务
/// </summary>
private Task _task;
public virtual Task Start()
{
if (Active) return _task;
Active = true;
// 启动异步
if (_task == null)
{
lock (this)
{
if (_task == null)
{
_task = Task.Factory.StartNew(DoActorWork, LongRunning ? TaskCreationOptions.LongRunning : TaskCreationOptions.None);
}
}
}
return _task;
}
public virtual bool Stop(int WatingTimeout = 100)
{
MailBox?.CompleteAdding();
Active = false;
if (WatingTimeout == 0 || _task == null) return true;
return _task.Wait(WatingTimeout);
}
public virtual bool AddMsg(object message)
{
// 自动开始
if (!Active)
{
Start();
}
if (!Active)
{
return false;
}
MailBox.Add(message);
return true;
}
/// <summary>
/// 循环消费消息
/// </summary>
private void DoActorWork()
{
while (!MailBox.IsCompleted)
{
try
{
var ctx = MailBox.Take();
var task = ProcessAsync(ctx);
if (task != null)
{
task.Wait();
}
}
catch (InvalidOperationException) { }
catch (Exception ex)
{
Console.WriteLine($"DoActorWork Error : {ex.Message}");
}
}
Active = false;
}
/// <summary>
/// 处理消息
/// </summary>
/// <returns></returns>
public abstract Task ProcessAsync(object msg);
public void Dispose()
{
try
{
Stop(100);
}
catch (Exception)
{
}
while (MailBox?.TryTake(out _) == true) { }
MailBox = null;
}
}
相关测试模型
AccumulationActor.cs
/// <summary>
/// 累加
/// </summary>
public class AccumulationActor : Actor
{
private int Count = 0;
private IActor actor;
public AccumulationActor(IActor actor) : base(nameof(AccumulationActor))
{
Count = 0;
this.actor = actor;
}
/// <summary>
/// 处理信息
/// </summary>
/// <returns></returns>
public override Task ProcessAsync(object msg)
{
try
{
var msgNumber = (int)(msg);
Count += msgNumber;
Console.WriteLine($"处理{this.Name} :{msg} ,累积总数:{Count}");
if (Count >= 100)
{
this.actor.AddMsg(Count);
Count = 0;
}
}
catch (Exception e)
{
Console.WriteLine($"业务处理异常:{e.Message}");
}
return Task.CompletedTask;
}
}
WriteActor.cs
/// <summary>
/// 输出
/// </summary>
public class WriteActor : Actor
{
public WriteActor() : base(nameof(WriteActor))
{
}
/// <summary>
/// 处理信息
/// </summary>
/// <returns></returns>
public override Task ProcessAsync(object msg)
{
try
{
Console.WriteLine($"输出 {this.Name} :{msg}");
}
catch (Exception e)
{
Console.WriteLine($"业务处理异常:{e.Message}");
}
return Task.CompletedTask;
}
}
测试代码
static void Main(string[] args)
{
Console.Title = "Actor Demo by 蓝创精英团队";
//实现一个加法逻辑
//a累加到100,就发送消息到 b里,让b 输出。
var write = new WriteActor();
var User = new AccumulationActor(write);
for (int i = 0; i < 20; i++)
{
User.AddMsg(i * 30);
}
Thread.Sleep(2000);
write.Stop();
User.Stop();
//释放资源
Console.WriteLine("示例完毕!");
Console.ReadLine();
}
运行结果

总结
上节实现了状态机,这节实现了Actor模型,接下来对Orleans 和 Dapr 的核心原理就了解深入一些了,那么,运用这些技术就不会显的很生涩。
代码地址
https://github.com/kesshei/ActorDemo.git
https://gitee.com/kesshei/ActorDemo.git