C# 实现 Actor并发模型 (案例版)

啥是Actor模型

Actor (英语翻译) 这个概念要回溯到面向对象程序设计的本身上来,更偏向于现实世界,现实世界就是由单个个体(人)与其他个体或(人)通讯(消息)组成的现实世界,所以,它的好处是显而易见的,生活就是Actor。

.

现有的面向对象编程模型是基于内存共享线程模型的,而Actor是基于消息模型的,Actor 之间是完全隔离的,不会共享任何变量。

基于内存,那么,内存会溢出,异常,基于消息的话,则没有这种困扰。

又由于它变相的来讲是单线程的,自己处理自己的事务,又实现了与其他业务的隔离。

Erlang语言,天生支持Actor模型,是一个好语言啊。

Actor 的优势

  1. 1. Actor不共享状态

  2. 2. 高并发,无锁(无共享状态)

  3. 3. Actor效率高(同一时刻只处理一个任务,属于单线程处理)

Actor 框架有

  1. 1. Orleans

  2. 2. Dapr

  3. 3. Akka.NET

  4. 4. Proto.Actor

Actor模型的实现

为啥要实现Actor模型,一个是为了更深入的了解它,一个是想实现一下。

可能在某些地方直接就用了。没必要搞那么复杂引用。

Actor模型的原理

按照自己的理解画了一下。

C# 实现 Actor并发模型 (案例版)

简单来讲,就是各个服务都可以投递消息到Actor实例里,Actor会从邮箱里把消息取出来,然后,消费掉。这么简单的一件事情。

Actor 代码逻辑的实现

IActor.cs

/// <summary>
/// 无锁并行编程模型(暂时用来处理串行任务,任务串行执行)
/// </summary>
public interface IActor
{
    /// <summary>
    /// 增加消息
    /// </summary>
    /// <returns></returns>
    bool AddMsg(object message);
    /// <summary>
    /// 启动服务
    /// </summary>
    /// <returns></returns>
    Task Start();
    /// <summary>
    /// 停止服务运行,等待毫秒数
    /// </summary>
    /// <param name="WatingTimeout"></param>
    /// <returns></returns>
    bool Stop(int WatingTimeout);
}

Actor.cs

/// <summary>
/// Actor抽象
/// </summary>
public abstract class Actor : IDisposable, IActor
{
    public Actor(string name)
    {
        Name = name;
        MailBox = new BlockingCollection<object>();
    }
    /// <summary>
    /// 名称
    /// </summary>
    public string Name { get; set; }
    /// <summary>
    /// 是否启用
    /// </summary>
    public bool Active { get; private set; }
    /// <summary>
    /// 是否长时间运行。长时间运行任务使用独立线程,默认true
    /// </summary>
    public bool LongRunning { get; set; } = true;
    /// <summary>
    /// 处理的消息邮箱
    /// </summary>
    public BlockingCollection<object> MailBox { get; set; }
    /// <summary>
    /// 内置任务
    /// </summary>
    private Task _task;

    public virtual Task Start()
    {
        if (Active) return _task;
        Active = true;
        // 启动异步
        if (_task == null)
        {
            lock (this)
            {
                if (_task == null)
                {
                    _task = Task.Factory.StartNew(DoActorWork, LongRunning ? TaskCreationOptions.LongRunning : TaskCreationOptions.None);
                }
            }
        }
        return _task;
    }

    public virtual bool Stop(int WatingTimeout = 100)
    {
        MailBox?.CompleteAdding();
        Active = false;
        if (WatingTimeout == 0 || _task == null) return true;

        return _task.Wait(WatingTimeout);
    }
    public virtual bool AddMsg(object message)
    {
        // 自动开始
        if (!Active)
        {
            Start();
        }

        if (!Active)
        {
            return false;
        }
        MailBox.Add(message);
        return true;
    }
    /// <summary>
    /// 循环消费消息
    /// </summary>
    private void DoActorWork()
    {
        while (!MailBox.IsCompleted)
        {
            try
            {
                var ctx = MailBox.Take();
                var task = ProcessAsync(ctx);
                if (task != null)
                {
                    task.Wait();
                }
            }
            catch (InvalidOperationException) { }
            catch (Exception ex)
            {
                Console.WriteLine($"DoActorWork Error : {ex.Message}");
            }
        }

        Active = false;
    }
    /// <summary>
    /// 处理消息
    /// </summary>
    /// <returns></returns>
    public abstract Task ProcessAsync(object msg);
    public void Dispose()
    {
        try
        {
            Stop(100);
        }
        catch (Exception)
        {
        }
        while (MailBox?.TryTake(out _) == true) { }
        MailBox = null;
    }
}

相关测试模型

AccumulationActor.cs

/// <summary>
/// 累加
/// </summary>
public class AccumulationActor : Actor
{
    private int Count = 0;
    private IActor actor;
    public AccumulationActor(IActor actor) : base(nameof(AccumulationActor))
    {
        Count = 0;
        this.actor = actor;
    }   
    /// <summary>
    /// 处理信息
    /// </summary>
    /// <returns></returns>
    public override Task ProcessAsync(object msg)
    {
        try
        {
            var  msgNumber = (int)(msg);
            Count += msgNumber;
            Console.WriteLine($"处理{this.Name} :{msg} ,累积总数:{Count}");

            if (Count >= 100)
            {
                this.actor.AddMsg(Count);
                Count = 0;
            }
        }
        catch (Exception e)
        {
            Console.WriteLine($"业务处理异常:{e.Message}");
        }
        return Task.CompletedTask;
    }
}

WriteActor.cs

    /// <summary>
    /// 输出
    /// </summary>
    public class WriteActor : Actor
    {
        public WriteActor() : base(nameof(WriteActor))
        {
        }
        /// <summary>
        /// 处理信息
        /// </summary>
        /// <returns></returns>
        public override Task ProcessAsync(object msg)
        {
            try
            {
                Console.WriteLine($"输出 {this.Name} :{msg}");
            }
            catch (Exception e)
            {
                Console.WriteLine($"业务处理异常:{e.Message}");
            }
            return Task.CompletedTask;
        }
    }

测试代码

static void Main(string[] args)
{
    Console.Title = "Actor Demo by 蓝创精英团队";

    //实现一个加法逻辑
    //a累加到100,就发送消息到 b里,让b 输出。
    var write = new WriteActor();
    var User = new AccumulationActor(write);
    for (int i = 0; i < 20; i++)
    {
        User.AddMsg(i * 30);
    }
    Thread.Sleep(2000);
    write.Stop();
    User.Stop();
    //释放资源
    Console.WriteLine("示例完毕!");
    Console.ReadLine();
}

运行结果

C# 实现 Actor并发模型 (案例版)

总结

上节实现了状态机,这节实现了Actor模型,接下来对Orleans 和 Dapr 的核心原理就了解深入一些了,那么,运用这些技术就不会显的很生涩。

代码地址

https://github.com/kesshei/ActorDemo.git

https://gitee.com/kesshei/ActorDemo.git