搞懂事件——C# 的event的机制深度理解

1. 前言

搞懂事件——C# 的event的机制深度理解
为什么忽然对Event感兴趣了?

因为进入Web时代以后,很少使用它了,忽然想起这个知识点,感到非常的熟悉而陌生。

不知道你有没有类似的感觉:好像对某个点大脑很熟悉,而手又很陌生?就像多年未曾谋面的老朋友,一个瞬间涌入大脑很多往日嬉戏打闹的碎片,但念及当先,却又音信全无。.

那么,你有没有事件相关的疑惑呢?譬如:

  • Event 是同步还是异步执行的?

  • 如果是多个订阅,事件执行的顺序是什么?

  • 如果事件执行中发生异常,会发生什么事情?

  • 事件支持异步执行吗?

  • 事件触发后,跨进程可以触发到吗?

  • 事件总线/领域事件 为啥不使用event实现呢?

如果你也有类似的疑惑,那么不妨和我探究一番。
搞懂事件——C# 的event的机制深度理解

2. 定义和特性

事件作为类的成员,一般是通过事件向其他类或对象通知发生的相关事情。 发送事件的类称为发布者,接收事件的类称为订阅者。

在典型的C# Windows 窗体或Web应用程序中,可订阅由按钮和列表框等控件引发的事件。 这也是我们觉得熟悉的原因吧?

MSDN中对事件有如下界定

  • 发布者确定何时引发事件;订阅者确定对事件作出何种响应

  • 一个事件可以有多个订阅者。 订阅者可以处理来自多个发行者的多个事件。

  • 没有订阅者的事件永远也不会引发。

  • 事件通常用于表示用户操作,例如单击按钮或图形用户界面中的菜单选项。

  • 当事件具有多个订阅者时,引发该事件时会同步调用事件处理程序。 若要异步调用事件,请参阅 “使用异步方式调用同步方法”。

  • 在 .NET 类库中,事件基于 EventHandler 委托和 EventArgs 基类。

从上述定义中,应该能解决是否同步的问题吧?这里再明确的说下:

  • 事件是同步的, 因此触发事件时,会被阻塞(如果订阅事件不是异步的)!

  • 如果是多个订阅,他们会一个个串行执行。

  • 如果其中一个订阅者抛出异常,尚未执行的订阅者将不会执行。

哎,什么,看的更糊涂了?

是啊,怎么又引入了订阅事件可以是异步的概念?

别急,都是理论会让人蒙的,我们来看看正常的例子吧!

3. 关于事件的一个小例子

以下例子均在**.net6** 环境下进行,望知悉。

namespace Event1
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var instance  = new Demo();
            instance.DemoEvent += (sender, args) =>
            {
                Console.WriteLine("执行事件1!");                
            };

            instance.DemoEvent += (sender, args) =>
            {
                Console.WriteLine("执行事件2!");
            };
            Console.WriteLine("*开始发起事件!");
            instance.Raise();
            Console.WriteLine("*事件执行完毕,继续下一项工作!");
        }
    }

    public class Demo
    {
        public event EventHandler DemoEvent;
        public void Raise()
        {
            try
            {
                this.DemoEvent?.Invoke(this, EventArgs.Empty);
                Console.WriteLine("所有的事件处理已经被执行!");
            }
            catch(Exception ex)
            {
                Console.WriteLine("事件处理中发生异常!", ex.Message);
            }
        }
    }

}

这里我们先定义一个Demo类,其内部有个事件是 DemoEvent,我们给他开放了一个接口Raise,如果谁敢调用它,那么,它就触发报警事件DemoEvent

这里模拟了2个订阅者,分别处理报警事件DemoEvent
程序执行的结果是什么呢?你有没有猜对?

*开始发起事件!
执行事件1!
执行事件2!
所有的事件处理已经被执行!
*事件执行完毕,继续下一项工作!

嗯,多次运行,仍然是此结果! 可见事件的确是顺序执行的,并且其为同步执行。

现在,我们加入异常,再看看:

instance.DemoEvent += (sender, args) =>
            {
                Console.WriteLine("执行事件1!");
                throw new Exception("执行事件1,错误");
            };
 
 ## 结果如下 ##
 *开始发起事件!
执行事件1!
事件处理中发生异常!
*事件执行完毕,继续下一项工作!           

可见,如果你想让每个订阅者都可以好好执行处理的话,那每个订阅者在订阅程序内,必须自己处理好程序异常,不要抛出来哦!

另外,注意一点,如果程序需要保持稳健,那么你还需要考虑取消订阅,以便防止内存泄漏哦!

instance.DemoEvent += Instance_DemoEvent;
instance.DemoEvent-= Instance_DemoEvent;

4. 事件的异步处理

上面提到的有关事件的异步处理,这又是一个什么鬼东西呢?

先说明下,这里的例子都是基于.net6的非UI编程,有关UI处理按钮点击事件等,机制并不一样,它们的内部做了处理,因此ui处理异步事件的方式与这里并不同。UI为它的异步事件提供了一个SynchronizationContext,使它们能够在UI线程上恢复。它从不“等待”事件。切记切记。

我们在订阅者1前面再增加一个异步订阅者

      instance.DemoEvent += async (sender, args) =>
      {
          Console.WriteLine("执行事件1开始??");
          await Task.Delay(10);
          Console.WriteLine("执行事件1结束??");
      };
// 为了等待这个慢家伙,需要在事件执行完毕的后面增加一行代码,让主程序等会退出。
Console.ReadLine();

现在执行以下,看看谁是第一呢?

*开始发起事件!
执行事件1开始??
执行事件1!
执行事件2!
所有的事件处理已经被执行!
*事件执行完毕,继续下一项工作!
执行事件1结束??

是的,你没看错,新增加的异步事件处理,的确是第一个被触发的,只不过它没有阻塞处理进程。

一个小知识点, 我们以前都不推崇定义一个类似的async void xxxx(){}函数,因为这样的函数无法被主程序捕获结果或异常。 但凡是总有例外,而这个异步事件处理恰恰就是这个函数的最佳使用场景。

5. 等待所有异步处理订阅者

眼尖的朋友们,应该在上面的结果输出中,看到了一个不和谐的信息。
没有看到的朋友,该去检查眼睛了。

所有的事件处理已经被执行!
*事件执行完毕,继续下一项工作!
执行事件1结束??

这个异步事件还没执行完,就打印出来了所有的事件处理已经被执行!的信息了。

嗯,是有这种应用场景,反正只要触发了处理就行,什么时候处理完,那都是订阅者的事情。

当然,也有一种场景,是需要等待所有的订阅者处理完消息,有异步的,也有同步的。

如果是后一种情景的话,那么我们还有什么办法呢?

这就涉及到async和await内部机制的问题了,有关知识,在这里不赘述。这里只将实现思路。

我们需要引入 SynchronizationContext的内容,自定义一个继承类,来实现相关操作。

如下,我们先实现一个天真无邪的同步上下文类。

public class NaiveSynchronizationContext :  SynchronizationContext
    {
        private readonly Action completed;

        public NaiveSynchronizationContext( Action completed)
        {
            this.completed = completed;
        }

        public override SynchronizationContext CreateCopy()
        {
            return new NaiveSynchronizationContext(this.completed);
        }

        public override void OperationStarted()
        {
            Console.WriteLine("同步上下文: 开始");
        }

        public override void OperationCompleted()
        {
            Console.WriteLine("同步上下文: 完成");
            this.completed();
        }
    }

为了方便使用,我们再定义一个扩展函数

public static class NaiveExtension
    {
        public static Task NaiveRaiseAsync( this EventHandler @this, object sender, EventArgs eventArgs)
        {
            // 如果没有事件处理,那么立即结束
            if (@this == null)
            {
                return Task.CompletedTask;
            }
            var tcs = new TaskCompletionSource<bool>();
            var sc = new NaiveSynchronizationContext(() => tcs.SetResult(true));

            SynchronizationContext.SetSynchronizationContext(sc);

            @this.Invoke(sender, eventArgs);

            return tcs.Task;
        }
    }

真正的使用,需要修改Raise函数,让事件的触发处在我们自定义的同步上下文内。

public void Raise()
        {
            try
            {
                this.DemoEvent?.NaiveRaiseAsync(this,EventArgs.Empty).GetAwaiter().GetResult();
                //this.DemoEvent?.Invoke(this, EventArgs.Empty);
                Console.WriteLine("所有的事件处理已经被执行!");
            }
            catch(Exception ex)
            {
                Console.WriteLine("事件处理中发生异常!", ex.Message);
            }
        }

嗯,感觉大功告成了,我们再来看看结果:

*开始发起事件!
同步上下文: 开始
执行事件1开始??
执行事件1!
执行事件2!
执行事件1结束??
同步上下文: 完成
所有的事件处理已经被执行!
*事件执行完毕,继续下一项工作!

哈哈,开心啊,搞定了这个小小需求!
来,多加几个异步处理试试!

什么,有异常了?

System.InvalidOperationException:
“An attempt was made to transition a task to a final state when it had already completed.”

仔细阅读异常,原来是TaskCompletionSource.SetResult被太早的调用所致,正确的做法需要等待所有异步处理完成后,再进行调用,当然需要修正下同步和异步的不同之处。
那就好办了,我们引入Interlocked类,计算跟踪每个异步处理就可以了。

public static Task NaiveRaiseAsync( this EventHandler @this, object sender, EventArgs eventArgs)
        {
            // 如果没有事件处理,那么立即结束
            if (@this == null)
            {
                return Task.CompletedTask;
            }
            var delegates = @this.GetInvocationList();
            var count = delegates.Length;

            var tcs = new TaskCompletionSource<bool>();
            foreach (var @delegate in @this.GetInvocationList())
            {                
                // 检查AsyncStateMachineAttribute属性,判断是否异步处理函数
                var async = @delegate.Method.GetCustomAttributes(typeof(AsyncStateMachineAttribute),false).Any();

                // 定义 'completed' action
                var completed = new Action(() =>
                {
                    if (Interlocked.Decrement(ref count) == 0)
                    {
                        tcs.SetResult(true);
                    }
                });

                if (async)
                {
                    SynchronizationContext.SetSynchronizationContext(new NaiveSynchronizationContext(completed));
                }

                @delegate.DynamicInvoke(sender, eventArgs);

                if (!async)
                {
                    // 如果不是异步,手工调用完成
                    completed();
                }
            }
            return tcs.Task;
        }

再次执行结果,啊哈哈,看吧:

*开始发起事件!
同步上下文: 开始
执行事件1开始??
同步上下文: 开始
执行事件2开始??
执行事件1!
执行事件2!
执行事件2结束??
执行事件1结束??
同步上下文: 完成
同步上下文: 完成
所有的事件处理已经被执行!
*事件执行完毕,继续下一项工作!

6. 捕获异常处理中的异常

上面的处理已经非常好了,当然,我说的是正常逻辑,那么当我们在异常处理中引入异常,会发生什么呢?

说干就干,是我的风格,来吧,让暴风雨来的更猛烈些吧!

instance.DemoEvent += async (sender, args) =>
   {
       Console.WriteLine("执行事件1开始??");
       throw new InvalidOperationException("Sabotage!");
       await Task.Delay(10);
       Console.WriteLine("执行事件1结束??");
   };

虽然我们在触发事件时增加了异常捕获,但好像捕获了个寂寞!

*开始发起事件!
同步上下文: 开始
执行事件1开始??
同步上下文: 完成
同步上下文: 开始
Unhandled exception. 执行事件2开始??
执行事件1!
执行事件2!
执行事件2结束??
同步上下文: 完成
所有的事件处理已经被执行!
*事件执行完毕,继续下一项工作!
System.InvalidOperationException: Sabotage!

程序直接退出了。

这里的原因是:

在基本synchronnizationcontext类中,Send和Post方法是使用应用程序ThreadPool实现的。因此,在事件处理程序中抛出的异常,实际上在打印上述消息的ThreadPool线程中抛出。

那么我们试着重载 Post和Send看看。

public class NaiveSynchronizationContext : SynchronizationContext
    {
        private readonly Action completed;


        private readonly Action<Exception> failed;

        public NaiveSynchronizationContext(Action completed, Action<Exception> failed)
        {
            this.completed = completed;
            this.failed = failed;
        }       
        public override void Post(SendOrPostCallback d, object state)
        {
            if (state is ExceptionDispatchInfo edi)
            {
                Console.WriteLine("正捕获异常");
                this.failed(edi.SourceException);
            }
            else
            {
                Console.WriteLine("Posting");
                base.Post(d, state);
            }
        }

        public override void Send( SendOrPostCallback d, object state)
        {
            if (state is ExceptionDispatchInfo edi)
            {
                Console.WriteLine("正捕获异常");
                this.failed(edi.SourceException);
            }
            else
            {
                Console.WriteLine("Sending");
                base.Send(d, state);
            }
        }        

        public override SynchronizationContext CreateCopy()
        {
            return new NaiveSynchronizationContext(this.completed,this.failed);
        }

        public override void OperationStarted()
        {
            Console.WriteLine("同步上下文: 开始");
        }

        public override void OperationCompleted()
        {
            Console.WriteLine("同步上下文: 完成");
            this.completed();
        }
    }
    public static class NaiveExtension
    {
        public static Task NaiveRaiseAsync( this EventHandler @this, object sender, EventArgs eventArgs)
        {
            // 如果没有事件处理,那么立即结束
            if (@this == null)
            {
                return Task.CompletedTask;
            }
            var delegates = @this.GetInvocationList();
            var count = delegates.Length;

            var tcs = new TaskCompletionSource<bool>();
            var exception = (Exception)null;
            foreach (var @delegate in @this.GetInvocationList())
            {                
                // 检查AsyncStateMachineAttribute属性,判断是否异步处理函数
                var async = @delegate.Method.GetCustomAttributes(typeof(AsyncStateMachineAttribute),false).Any();

                // 定义 'completed' action
                var completed = new Action(() =>
                {
                    if (Interlocked.Decrement(ref count) == 0)
                    {
                        if (exception is null)
                        {
                            tcs.SetResult(true);
                        }
                        else
                        {
                            tcs.SetException(exception);
                        }
                    }
                });
                var failed = new Action<Exception>(e =>
                {
                    Interlocked.CompareExchange( ref exception, e, null);
                });
                if (async)
                {
                    SynchronizationContext.SetSynchronizationContext(new NaiveSynchronizationContext(completed, failed));
                }

                ry
                {
                    @delegate.DynamicInvoke(sender, eventArgs);
                }
                catch (TargetInvocationException e) 
  when (e.InnerException != null)
{
  failed(e.InnerException);
}
catch (Exception e)
{
  failed(e);
}

                if (!async)
                {
                    // 如果不是异步,手工调用完成
                    completed();
                }
            }
            return tcs.Task;
        }
    }

再次执行,看看是怎么样的?

*开始发起事件!
同步上下文: 开始
执行事件1开始??
正捕获异常
同步上下文: 完成
同步上下文: 开始
执行事件2开始??
执行事件1!
执行事件2!
Posting
执行事件2结束??
同步上下文: 完成
事件处理中发生异常!
*事件执行完毕,继续下一项工作!

正如你看到的,这里的实现剔除了短路行为,即使你的某个处理函数有异常,它依然可以向下分发事件。

7. 事件总线和领域事件

领域事件是领域专家所关心的(需要跟踪的、希望被通知的、会引起其他模型对象改变状态的)发生在领域中的一些事情。 简而言之,领域事件是用来捕获领域中发生的具有业务价值的一些事情。 它的本质就是事件,如果不牵涉到微服务和存储事件,我觉得你可以考虑使用event来实现它。

不过,由于大部分的领域事件可能都需要考虑存储或者跨服务的行为,因此我们很少看见类似的实现。

而事件总线总是和分布式应用或微服务联系在一起,因此跨进程成了刚需,在这个前提下,才不得不引入了类似Rabbitmq的消息服务器。

8. web中的应用

WEB中应用也是有的,只是需要找到合适的场景。

  • 进程内事件!

  • 不需要跨进程!

我在abp的框架里搜到了这个。
搞懂事件——C# 的event的机制深度理解

9. 小结

好文章值得你收藏,看了这么多,不知道你搞懂没有,我感觉我好像讲明白了…