async/await 到底是如何工作的(一)

数周之前,.NET Blog 发表了一篇文章 什么是 .NET 为什么要选择它(What is .NET, and why should you choose it?)它介绍了 .NET 平台大体的一个概述,总结了很多组件和设计决策并且承诺相关的领域会有更多深入的文章介绍。这篇文章是第一次这样的跟进,深入探讨异步的历史、背后的设计决策和实现细节。.

对于 async/await 的支持已经有十几年的历史了。在那段时间里,它改变了为 .NET 编写可扩展代码的方式,并且在不确切了解幕后情况的情况下利用该功能既可行又极其普遍。

你起初使用了类似下面这样的一个同步方法(这个方式是"同步"的[synchronous] 因为调用方不能做任何其他事情直到整个操作完成并且控制权回到调用方)

// Synchronously copy all data from source to destination.
public void CopyStreamToStream(Stream source, Stream destination)
{
    var buffer = new byte[0x1000];
    int numRead;
    while ((numRead = source.Read(buffer, 0, buffer.Length)) != 0)
    {
        destination.Write(buffer, 0, numRead);
    }
}

然后你添加几个关键字,更改几个方法名称,最终得到以下异步方法(这个方法是"异步"的[asynchronous],因为控制权应该很快返回给它的调用者,并且可能在与 整个操作已经完成):

// Asynchronously copy all data from source to destination.
public async Task CopyStreamToStreamAsync(Stream source, Stream destination)
{
    var buffer = new byte[0x1000];
    int numRead;
    while ((numRead = await source.ReadAsync(buffer, 0, buffer.Length)) != 0)
    {
        await destination.WriteAsync(buffer, 0, numRead);
    }
}

在语法上基本一致,仍然能够利用所有相同的控制流结构,但现在实际上是非阻塞的,具有显着不同的底层执行模型,并且所有繁重的工作都由 C# 编译器和框架类库完成。

虽然在不知道这之下到底发生了什么的情况下使用这种支持很常见,但我坚信了解某些东西的实际工作原理可以帮助您更好地利用它。特别是对于 async/await,当你想深入了解时,了解所涉及的机制特别有用,例如当你试图调试出错的地方或提高原本正确的事情的性能时。那么,在这篇文章中,我们将深入探讨 await 在语言、编译器和库级别的确切工作原理,以便您可以充分利用这些有价值的功能。

但是,要做到这一点,我们需要回到 async/await 之前,以了解在没有它的情况下最新的异步代码是什么样子的。善意的提醒一下,它并不漂亮。

最初的模样

早在.NET] Framework 1.0时期,就有了异步编程模型(Asynchronous Programming Model pattern),也称为 APM 模式、Begin/End模式或 IAsyncResult 模式。从高层次来看,该模式很简单。

对于一个同步的操作 DoStuff

class Handler
{
    public int DoStuff(string arg);
}

该模式中包含两个相应的方法:BeginDoStuff 方法和 EndDoStuff 方法:

class Handler
{
    public int DoStuff(string arg);

    public IAsyncResult BeginDoStuff(string arg, AsyncCallback? callback, object? state);
    public int EndDoStuff(IAsyncResult asyncResult);
}

BeginDoStuff 方法将会接受所有与 DoStuff 相同的参数,但除此之外,它还将接受 AsyncCallback 委托和一个未知的状态 object,其中一个或两个都可以为 null

Begin 方法负责启动异步操作,如果提供了回调(通常称为初始操作的“继续”),它还负责确保在异步操作完成时调用回调。Begin 方法还将构造一个实现 IAsyncResult 的类型的实例,使用可选的 state 来填充 IAsyncResult 的 AsyncState 属性:

namespace System
{
    public interface IAsyncResult
    {
        object? AsyncState { get; }
        WaitHandle AsyncWaitHandle { get; }
        bool IsCompleted { get; }
        bool CompletedSynchronously { get; }
    }

    public delegate void AsyncCallback(IAsyncResult ar);
}

然后,该 IAsyncResult 实例将从 Begin 方法返回,并在最终调用时传递给 AsyncCallback。当准备好使用操作结果时,调用方将该 IAsyncResult 实例传递给 End 方法,End 方法负责确保操作完成(如果未完成,则通过阻塞同步等待),然后返回操作的任何结果,包括传播可能发生的任何错误/异常。因此,不需要编写以下代码以同步执行操作:

try
{
    int i = handler.DoStuff(arg); 
    Use(i);
}
catch (Exception e)
{
    ... // handle exceptions from DoStuff and Use
}

可以使用以下方式使用Begin/End方法来异步执行相同的操作:

try
{
    handler.BeginDoStuff(arg, iar =>
    {
        try
        {
            Handler handler = (Handler)iar.AsyncState!;
            int i = handler.EndDoStuff(iar);
            Use(i);
        }
        catch (Exception e2)
        {
            ... // handle exceptions from EndDoStuff and Use
        }
    }, handler);
}
catch (Exception e)
{
    ... // handle exceptions thrown from the synchronous call to BeginDoStuff
}

对于任何处理过任何语言中基于回调的 API 的人来说,这应该会感到很熟悉。

然而,情况只会变得更加复杂。例如,“堆栈深入”(stack dives)的问题。堆栈深入是指代码反复调用时在堆栈上越来越深,可能导致堆栈溢出的情况。如果操作同步完成,Begin方法可以立即同步调用回调函数,这意味着对Begin的调用本身可能直接调用回调函数。实际上,完成同步的“异步”操作非常普遍;它们之所以不是“同步”的,不是因为它们保证异步完成,而是允许异步完成。例如,考虑从一些网络操作中异步读取数据,例如从套接字接收。如果您每个单独操作仅需要少量数据(例如从响应中读取某些标头数据),则可以放置一个缓冲区,以避免进行大量系统调用的开销。而不是仅针对需要立即使用的数据量进行小型读取,您执行了一个较大的读取操作,并将数据读入缓存,然后从该缓存中消耗数据,直到其被用尽;这样可以减少与套接字实际交互所需的昂贵系统调用数量。这样的缓冲区可能存在于您正在使用的任何异步抽象背后,使得您执行的第一个“异步”操作(填充缓冲区)异步完成,但在底层缓冲区耗尽之前,所有后续操作都不需要进行任何I/O,而是从缓冲区中提取,因此全部可以同步完成。当Begin方法执行其中一个这些操作并发现它同步完成时,它可以立即同步调用回调函数。这意味着您有一个调用Begin方法的堆栈帧,另一个堆栈帧用于Begin方法本身,现在还有一个用于回调函数的堆栈帧。现在,如果回调函数再次调用Begin会发生什么?如果该操作同步完成并且其回调被同步调用,则堆栈会再次深入几个帧。依此类推,直到最终用尽堆栈。

这是一个很容易重现的实际可能性。在 .NET Core 上尝试运行以下程序:

using System.Net;
using System.Net.Sockets;

using Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
listener.Listen();

using Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
client.Connect(listener.LocalEndPoint!);

using Socket server = listener.Accept();
_ = server.SendAsync(new byte[100_000]);

var mres = new ManualResetEventSlim();
byte[] buffer = new byte[1];

var stream = new NetworkStream(client);

void ReadAgain()
{
    stream.BeginRead(buffer, 0, 1, iar =>
    {
        if (stream.EndRead(iar) != 0)
        {
            ReadAgain(); // uh oh!
        }
        else
        {
            mres.Set();
        }
    }, null);
};
ReadAgain();

mres.Wait();

在这里,我设置了一个简单的客户端套接字和服务器套接字相互连接。服务器向客户端发送了100,000个字节,然后客户端使用BeginRead/EndRead 一次“异步”地消耗它们(这非常低效,只是出于教学目的而已)。传递给 BeginRead 的回调函数通过调用 EndRead 完成读取,然后如果成功读取所需的字节(在这种情况下它还没有到达流的末尾),则通过递归调用 ReadAgain 本地函数发出另一个BeginRead。但是 在.NET Core中,套接字操作比在 .NET Framework 上快得多,并且如果操作能够同步满足,则会同步完成(注意内核本身具有用于满足套接字接收操作的缓冲区)。因此,这个堆栈会溢出:

async/await 到底是如何工作的(一)
Stack overflow due to improper handling of synchronous completion

因此,APM模型中已经内置了对此的补偿。有两种可能的补偿方式:

  1. 不允许同步调用 AsyncCallback。如果始终异步调用它,即使操作同步完成,那么堆栈深入的风险也消失了。但是,性能也会受到影响,因为同步完成(或者非常快速,无法观察到区别)的操作非常普遍,强制每个操作在队列中排队回调会增加一定量的开销。
  2. 采用机制,使得调用者而不是回调函数在操作同步完成时执行后续工作。这样,您就可以避免额外的方法帧,并继续在堆栈上深入执行后续工作。

APM模式采用了第二个选项。为此,IAsyncResult 接口公开了两个相关但不同的成员:IsCompleted 和 CompletedSynchronously 。

IsCompleted 告诉您操作是否已经完成:您可以多次检查它,最终它将从 false 转换为 true,然后保持在那里。

相比之下,CompletedSynchronously 从不更改(或者如果更改,那么它是一个等待发生的恶意bug);它用于在Begin方法的调用方和 AsyncCallback 之间通信,哪个负责执行任何继续工作。如果 CompletedSynchronously 为 false,则操作正在异步完成,并且在操作完成后对任何继续工作应该留给回调处理;毕竟,如果工作没有同步完成,则Begin的调用方无法真正处理它,因为操作尚未完成(如果调用者只调用End,它将阻塞直到操作完成)。

然而,如果 CompletedSynchronously 为 true,则如果回调函数处理继续工作,那么它有堆栈深入的风险,因为它将在比其开始位置更深的堆栈上执行该继续工作。因此,任何关心这种堆栈深入的实现都需要检查 CompletedSynchronously,并在其为 true 时让 Begin 方法的调用方执行继续工作,这意味着回调函数随后不需要执行继续工作。这也是为什么 CompletedSynchronously 不能改变的原因:调用方和回调函数需要看到相同的值,以确保无论竞态条件如何,都执行一次且仅执行一次继续工作。

在我们之前的 DoStuff 示例中,这将导致以下代码:

try
{
    IAsyncResult ar = handler.BeginDoStuff(arg, iar =>
    {
        if (!iar.CompletedSynchronously)
        {
            try
            {
                Handler handler = (Handler)iar.AsyncState!;
                int i = handler.EndDoStuff(iar);
                Use(i);
            }
            catch (Exception e2)
            {
                ... // handle exceptions from EndDoStuff and Use
            }
        }
    }, handler);
    if (ar.CompletedSynchronously)
    {
        int i = handler.EndDoStuff(ar);
        Use(i);
    }
}
catch (Exception e)
{
    ... // handle exceptions that emerge synchronously from BeginDoStuff and possibly EndDoStuff/Use
}

这真是一大堆。到目前为止,我们只看过如何使用模式......我们还没有看如何实现模式。虽然大多数开发人员不需要关心叶子操作(例如实现与操作系统交互的实际 Socket.BeginReceive/EndReceive 方法),但许多开发人员需要关注组合这些操作(执行多个异步操作,这些操作共同形成一个更大的操作),这意味着不仅要使用其他Begin/End方法,而且还要自己实现它们,以便您的组合本身可以在其他地方使用。并且,您会注意到我的先前 DoStuff 示例中没有控制流程。引入多个操作,特别是带有简单控制流程(例如循环)的操作,突然之间这就成为喜欢痛苦的专家或博客文章作者试图表明观点的领域了。

为了强调这一点,让我们实现一个完整的示例。在本文开头,我展示了一个 CopyStreamToStream方法,它将所有数据从一个流复制到另一个流(类似于 Stream.CopyTo,但是为了解释起见,假设它不存在):

public void CopyStreamToStream(Stream source, Stream destination)
{
    var buffer = new byte[0x1000];
    int numRead;
    while ((numRead = source.Read(buffer, 0, buffer.Length)) != 0)
    {
        destination.Write(buffer, 0, numRead);
    }
}

很简单:我们重复从一个流中读取,然后将得到的数据写入另一个流中,从一个流中读取并写入另一个流中,直到我们没有更多数据可读为止。现在,我们如何使用APM模式异步实现它?类似于这样:

public IAsyncResult BeginCopyStreamToStream(
    Stream source, Stream destination,
    AsyncCallback callback, object state)
{
    var ar = new MyAsyncResult(state);
    var buffer = new byte[0x1000];

    Action<IAsyncResult?> readWriteLoop = null!;
    readWriteLoop = iar =>
    {
        try
        {
            for (bool isRead = iar == null; ; isRead = !isRead)
            {
                if (isRead)
                {
                    iar = source.BeginRead(buffer, 0, buffer.Length, static readResult =>
                    {
                        if (!readResult.CompletedSynchronously)
                        {
                            ((Action<IAsyncResult?>)readResult.AsyncState!)(readResult);
                        }
                    }, readWriteLoop);

                    if (!iar.CompletedSynchronously)
                    {
                        return;
                    }
                }
                else
                {
                    int numRead = source.EndRead(iar!);
                    if (numRead == 0)
                    {
                        ar.Complete(null);
                        callback?.Invoke(ar);
                        return;
                    }

                    iar = destination.BeginWrite(buffer, 0, numRead, writeResult =>
                    {
                        if (!writeResult.CompletedSynchronously)
                        {
                            try
                            {
                                destination.EndWrite(writeResult);
                                readWriteLoop(null);
                            }
                            catch (Exception e2)
                            {
                                ar.Complete(e);
                                callback?.Invoke(ar);
                            }
                        }
                    }, null);

                    if (!iar.CompletedSynchronously)
                    {
                        return;
                    }

                    destination.EndWrite(iar);
                }
            }
        }
        catch (Exception e)
        {
            ar.Complete(e);
            callback?.Invoke(ar);
        }
    };

    readWriteLoop(null);

    return ar;
}

public void EndCopyStreamToStream(IAsyncResult asyncResult)
{
    if (asyncResult is not MyAsyncResult ar)
    {
        throw new ArgumentException(null, nameof(asyncResult));
    }

    ar.Wait();
}

private sealed class MyAsyncResult : IAsyncResult
{
    private bool _completed;
    private int _completedSynchronously;
    private ManualResetEvent? _event;
    private Exception? _error;

    public MyAsyncResult(object? state) => AsyncState = state;

    public object? AsyncState { get; }

    public void Complete(Exception? error)
    {
        lock (this)
        {
            _completed = true;
            _error = error;
            _event?.Set();
        }
    }

    public void Wait()
    {
        WaitHandle? h = null;
        lock (this)
        {
            if (_completed)
            {
                if (_error is not null)
                {
                    throw _error;
                }
                return;
            }

            h = _event ??= new ManualResetEvent(false);
        }

        h.WaitOne();
        if (_error is not null)
        {
            throw _error;
        }
    }

    public WaitHandle AsyncWaitHandle
    {
        get
        {
            lock (this)
            {
                return _event ??= new ManualResetEvent(_completed);
            }
        }
    }

    public bool CompletedSynchronously
    {
        get
        {
            lock (this)
            {
                if (_completedSynchronously == 0)
                {
                    _completedSynchronously = _completed ? 1 : -1;
                }

                return _completedSynchronously == 1;
            }
        }
    }

    public bool IsCompleted
    {
        get
        {
            lock (this)
            {
                return _completed;
            }
        }
    }
}

哇哦。即使有所有这些令人费解的代码,它仍然不是一个很好的实现。例如,IAsyncResult实现在每个操作上进行锁定,而不是在可能的情况下以更无锁的方式进行操作;异常存储为原始数据,而不是作为ExceptionDispatchInfo存储,后者可以在传播时增强其调用堆栈;每个单独操作都涉及大量分配(例如,为每个BeginWrite调用分配委托),等等。现在,想象一下你必须为每个方法都要编写这样的代码。每当您想编写可重用的方法来消耗另一个异步操作时,您都需要完成所有这些工作。如果您想编写可重用的组合器,以高效地操作多个离散的IAsyncResults(想一想Task.WhenAll),那么这是另一种难度级别;每个操作实现和公开其自己特定于该操作的API意味着没有统一的语言可以以相似的方式谈论它们(尽管一些开发人员编写了库,试图通过另一层回调来减轻负担,该回调使API能够向Begin方法提供适当的AsyncCallback)。

所有这些复杂性意味着很少有人尝试使用 APM 模式,而那些尝试的人,其实 bug 异常普遍。公平地说,这不是对 APM 模式本身的批评,而是针对基于回调的异步性的一般批评。我们都非常习惯于现代语言中的控制流结构所提供的功能和简单性,而基于回调的方法通常会在引入任何合理数量的复杂性时违反这种结构。没有其他主流语言可以提供更好的替代方案。

我们需要一种更好的方式,从 APM 模式中学习,并吸取它的优点,避免它的缺陷。有趣的是,APM 模式只是一个模式;运行时、核心库和编译器没有提供任何帮助来使用或实现该模式。

基于事件的异步模型(Event-Based Asynchronous Pattern)

.NET Framework 2.0 看到一些 API 的引入,实现了处理异步操作的不同模式,这种模式主要用于在客户端应用程序中执行操作。这种基于事件的异步模式(Event-based Asynchronous Pattern,EAP)也成对出现,这次是一个用于启动异步操作的方法和一个用于监听其完成的事件(可能还有更多成员)。因此,我们先前的 DoStuff 示例可能作为以下成员公开:

class Handler
{
    public int DoStuff(string arg);

    public void DoStuffAsync(string arg, object? userToken);
    public event DoStuffEventHandler? DoStuffCompleted;
}

public delegate void DoStuffEventHandler(object sender, DoStuffEventArgs e);

public class DoStuffEventArgs : AsyncCompletedEventArgs
{
    public DoStuffEventArgs(int result, Exception? error, bool canceled, object? userToken) :
        base(error, canceled, usertoken) => Result = result;

    public int Result { get; }
}

您将使用 DoStuffCompleted 事件注册后续工作,然后调用 DoStuffAsync 方法;它将启动操作,并在该操作完成时异步从调用方触发 DoStuffCompleted 事件。然后处理程序可以运行其继续工作,通常验证用户提供的 userToken 是否与其期望的匹配,从而使多个处理程序同时连接到事件。

这种模式使得某些用例变得更容易,但对其他用例却显着增加了难度(考虑到之前的APM CopyStreamToStream 示例,这是一件值得玩味的事情)。它没有被广泛地推广,而且实际上只在 .NET Framework 的一个版本中发布和消失,尽管它留下了在其任期内添加的 API,例如 Ping.SendAsync/Ping.PingCompleted

public class Ping : Component
{
    public void SendAsync(string hostNameOrAddress, object? userToken);
    public event PingCompletedEventHandler? PingCompleted;
    ...
}

然而,它的确增加了一个显著的进步,APM 模式完全没有考虑到它,并且它一直持续到我们今天所采用的模型中:SynchronizationContext

SynchronizationContext 在 .NET Framework 2.0 中也被引入,作为一个通用调度器的抽象。其中,SynchronizationContext 最常用的方法是 Post,它将一个工作项排队到由该上下文表示的任何调度器中。例如,SynchronizationContext 的基本实现仅代表线程池,因此 [SynchronizationContext.Post 的基本实现](runtime/SynchronizationContext.cs at 95df571be36ed8973d09746b61fae16b2e3f251f · dotnet/runtime (github.com))只是委派给 ThreadPool.QueueUserWorkItem,该方法用于请求线程池使用其线程之一调用提供的回调与关联状态。然而,SynchronizationContext 的核心不仅仅是支持任意调度器,而是支持以满足各种应用程序模型的需求进行调度的方式。

考虑一个像 Windows Forms 这样的 UI 框架。和 Windows 上的大多数 UI 框架一样,控件与特定线程关联,并且该线程运行一个消息泵,该泵运行能够与这些控件进行交互的工作:只有该线程应尝试操作这些控件,任何其他想与控件交互的线程都应通过发送消息来实现,以便由 UI 线程的消息泵消耗该消息。Windows Forms 使用 Control.BeginInvoke 这样的方法可以轻松地实现这一点,该方法将提供的委托和参数排队运行在与该 Control 关联的任何线程上。因此,您可以编写如下代码:

private void button1_Click(object sender, EventArgs e)
{
    ThreadPool.QueueUserWorkItem(_ =>
    {
        string message = ComputeMessage();
        button1.BeginInvoke(() =>
        {
            button1.Text = message;
        });
    });
}

这将把 ComputeMessage() 工作卸载到线程池线程上(以保持 UI 的响应性,同时进行工作),然后当该工作完成时,将一个委托排队回到与 button1 关联的线程中以更新 button1 的标签。非常简单。WPF 也有类似的东西,只是它使用 Dispatcher 类型:

private void button1_Click(object sender, RoutedEventArgs e)
{
    ThreadPool.QueueUserWorkItem(_ =>
    {
        string message = ComputeMessage();
        button1.Dispatcher.InvokeAsync(() =>
        {
            button1.Content = message;
        });
    });
}

.NET MAUI 也有类似的东西。但是,如果我想将这个逻辑放在帮助器方法中怎么办?例如:

// Call ComputeMessage and then invoke the update action to update controls.
internal static void ComputeMessageAndInvokeUpdate(Action<string> update) 
{ ... }

然后我可以像这样使用它:

private void button1_Click(object sender, EventArgs e)
{
    ComputeMessageAndInvokeUpdate(message => button1.Text = message);
}

但是,如何实现 ComputeMessageAndInvokeUpdate 方法,使其能够在任何这些应用程序中都能工作呢?它需要硬编码以了解每个可能的UI框架吗?这就是 SynchronizationContext 的优点。我们可以像这样实现该方法:

internal static void ComputeMessageAndInvokeUpdate(Action<string> update)
{
    SynchronizationContext? sc = SynchronizationContext.Current;
    ThreadPool.QueueUserWorkItem(_ =>
    {
        string message = ComputeMessage();
        if (sc is not null)
        {
            sc.Post(_ => update(message), null);
        }
        else
        {
            update(message);
        }
    });
}

该方法使用 SynchronizationContext 作为抽象来定位应该用于返回到与 UI 进行交互所需的环境的“调度程序”。每个应用程序模型然后确保它发布为 SynchronizationContext.Current,即一个派生自 SynchronizationContext 的类型,它会执行“正确的事情”。例如,Windows Forms 有以下实现:

public sealed class WindowsFormsSynchronizationContext : SynchronizationContext, IDisposable
{
    public override void Post(SendOrPostCallback d, object? state) =>
        _controlToSendTo?.BeginInvoke(d, new object?[] { state });
    ...
}

WPF 有这样的实现:

public sealed class DispatcherSynchronizationContext : SynchronizationContext
{
    public override void Post(SendOrPostCallback d, Object state) =>
        _dispatcher.BeginInvoke(_priority, d, state);
    ...
}

ASP.NET 以前有一个实现,它实际上并不关心工作运行在哪个线程上,而是要求与给定请求相关的工作被序列化,以便多个线程不会同时访问给定的 HttpContext

internal sealed class AspNetSynchronizationContext : AspNetSynchronizationContextBase
{
    public override void Post(SendOrPostCallback callback, Object state) =>
        _state.Helper.QueueAsynchronous(() => callback(state));
    ...
}

这也不仅限于这些主要应用程序模型。例如,xUnit 是一个流行的单元测试框架,它是 .NET 核心代码库用于单元测试的框架之一,它还使用多个自定义 SynchronizationContext。例如,您可以允许测试并行运行,但限制允许同时运行的测试的数量。如何实现?通过 SynchronizationContext

public class MaxConcurrencySyncContext : SynchronizationContext, IDisposable
{
    public override void Post(SendOrPostCallback d, object? state)
    {
        var context = ExecutionContext.Capture();
        workQueue.Enqueue((d, state, context));
        workReady.Set();
    }
}

MaxConcurrencySyncContext 的 Post 方法将工作项仅排队到其自己的内部工作队列中,然后在自己的工作线程上处理它们,它控制线程的数量取决于所需的最大并发数。您应该明白了。

这和基于事件的异步模式(EAP)有什么关系呢?EAP 和 SynchronizationContext 同时被引入,EAP规定在启动异步操作时应该将完成事件排队到当前的 SynchronizationContext。为了稍微简化一下(可能不足以证明额外复杂性),在 System.ComponentModel 中还引入了一些辅助类型,特别是 AsyncOperation 和 AsyncOperationManager。前者只是一个元组,包装了用户提供的状态对象和捕获的 SynchronizationContext,而后者则作为一个简单的工厂来实现捕获和创建 AsyncOperation 实例。然后,EAP 实现会使用它们,例如 Ping.SendAsync 调用 AsyncOperationManager.CreateOperation 来捕获 SynchronizationContext,当操作完成时,AsyncOperation 的 PostOperationCompleted 方法将被调用以调用存储的 SynchronizationContext 的 Post 方法。

SynchronizationContext 提供了一些值得一提的小功能,它们稍后会再次出现。特别是,它公开了 OperationStarted 和 OperationCompleted 方法。这些虚方法的基本实现为空,什么也不做,但派生实现可能会重写这些方法来了解正在进行的操作。这意味着 EAP 实现还将在每个操作的开始和结束时调用这些 OperationStarted/OperationCompleted 方法,以通知任何当前存在的 SynchronizationContext,并允许其跟踪工作。这对于 EAP 模式特别相关,因为启动异步操作的方法返回 void:您没有任何返回值来单独跟踪工作。我们会回到这个问题。

因此,我们需要比 APM 模式更好的东西,而接下来出现的 EAP 引入了一些新东西,但并没有真正解决我们面临的核心问题。我们仍然需要更好的东西。

接下来是任务(Tasks)

.NET Framework 4.0 引入了 System.Threading.Tasks.Task 类型。正如它听起来那样,任务(Task)只是一个数据结构,代表某些异步操作的最终完成(其他框架将类似类型称为“promise”或“future”)。创建 Task 来表示某个操作,当它所表示的操作逻辑上已经完成时,结果则存储到该 Task 中。很简单。但是,使得 Task 比 IAsyncResult 更加有用的关键功能是,它内置了连续性的概念。这一特点意味着你可以随时请求任何 Task 在完成时以异步方式通知你,而 Task 本身处理同步以确保无论任务已经完成、尚未完成还是与通知请求并发完成,都会调用连续体。为什么这么有影响力?好吧,如果您回想一下我们对旧的 APM 模式的讨论,其中有两个主要问题:

  1. 你必须为每个操作实现一个自定义的 IAsyncResult 实现:没有内置的 IAsyncResult 实现可以供任何人使用。
  2. 在调用 Begin 方法之前,你必须知道何时完成并想要做什么。这使得为使用和组合任意异步实现实现组合和其他通用例程成为一个重大挑战。

相比之下,使用 Task,则可以在已经启动操作后向异步操作提供一个连续体,而无需将该连续体提供给启动操作的方法。每个拥有异步操作的人都可以产生一个 Task,每个消费异步操作的人都可以消费一个 Task,不需要进行任何自定义操作来连接两者:Task 成为了使异步操作的生成者和消费者之间进行通信的共同语言。这改变了 .NET 的面貌。稍后会详细说明…

现在,让我们更好地理解这实际上意味着什么。我们不会深入研究 Task 的复杂代码,而是通过实现一个简单版本来做教学。这并不意味着它是一个伟大的实现,而只是足够完成功能以帮助理解 Task 的核心,最终,Task 其实只是一个处理设置和接收完成信号协调的数据结构。我们将从几个字段开始:

class MyTask
{
    private bool _completed;
    private Exception? _error;
    private Action<MyTask>? _continuation;
    private ExecutionContext? _ec;
    ...
}

我们需要一个字段来知道任务是否已完成(_completed),并且我们需要一个字段来存储导致任务失败的任何错误(_error);如果我们还要实现一个泛型 MyTask<TResult>,则还将有一个 private TResult _result 字段,用于存储操作成功的结果。到目前为止,这看起来很像我们之前自定义的 IAsyncResult 实现(当然这不是巧合)。但是现在到了最后,_continuation 字段。在这个简单的实现中,我们仅支持一个连续体,但这足以用于说明的目的(真正的 Task 使用一个对象字段,它可以是单个连续体对象或连续体对象的 List<>)。这是一个委托,当任务完成时将被调用。

现在,一些表面上的内容。正如前面提到的,相对于之前的模型,Task 最基本的进步之一是能够在操作被启动后提供连续性工作(回调)。我们需要一个方法让我们做到这一点,所以让我们添加 ContinueWith 方法:

public void ContinueWith(Action<MyTask> action)
{
    lock (this)
    {
        if (_completed)
        {
            ThreadPool.QueueUserWorkItem(_ => action(this));
        }
        else if (_continuation is not null)
        {
            throw new InvalidOperationException("Unlike Task, this implementation only supports a single continuation.");
        }
        else
        {
            _continuation = action;
            _ec = ExecutionContext.Capture();
        }
    }
}

如果任务在调用 ContinueWith 时已经标记为完成,那么 ContinueWith 将仅排队委托的执行。否则,该方法会存储委托,以便在任务完成时排队连续性(它还存储 ExecutionContext,然后在稍后调用委托时使用它,但现在不要担心这一部分…我们会讲到它)。很简单。

然后我们需要能够将 MyTask 标记为已完成,这意味着它所代表的任何异步操作已经完成。为此,我们将公开两个方法,一个用于成功地标记它为已完成(“SetResult”),另一个用于用错误标记它为已完成(“SetException”):

public void SetResult() => Complete(null);

public void SetException(Exception error) => Complete(error);

private void Complete(Exception? error)
{
    lock (this)
    {
        if (_completed)
        {
            throw new InvalidOperationException("Already completed");
        }

        _error = error;
        _completed = true;

        if (_continuation is not null)
        {
            ThreadPool.QueueUserWorkItem(_ =>
            {
                if (_ec is not null)
                {
                    ExecutionContext.Run(_ec, _ => _continuation(this), null);
                }
                else
                {
                    _continuation(this);
                }
            });
        }
    }
}

我们存储任何错误,标记任务已经完成,然后如果之前注册了连续体,则将其排队以进行调用。

最后,我们需要一种方法来传播可能发生的任务异常(如果这是一个泛型 MyTask<T>,则返回其 _result);为了促进某些场景,我们还允许此方法阻塞等待任务完成,我们可以通过 ContinueWith 实现(连续体只是信号 ManualResetEventSlim,然后调用方会在等待完成时阻塞)

public void Wait()
{
    ManualResetEventSlim? mres = null;
    lock (this)
    {
        if (!_completed)
        {
            mres = new ManualResetEventSlim();
            ContinueWith(_ => mres.Set());
        }
    }

    mres?.Wait();
    if (_error is not null)
    {
        ExceptionDispatchInfo.Throw(_error);
    }
}

基本上就是这样了。现实中的 Task 肯定更加复杂,具有更高效的实现,支持任意数量的连续体,具有关于其行为的多个旋钮(例如,连续体应该像在这里一样排队还是作为任务完成的一部分同步调用),能够存储多个异常而不仅仅是一个,具有针对取消的特殊知识,具有用于执行常见操作的大量助手方法(例如,Task.Run,它创建一个代表委托的 Task,并将其排队以在线程池上调用)。但其中没有任何魔法;其核心,就是我们在这里看到的东西。

你可能还注意到,我的简单 MyTask 具有公开的 SetResult/SetException 方法,而 Task 没有。实际上,Task 也有这样的方法,它们只是内部方法,通过 System.Threading.Tasks.TaskCompletionSource 类型作为与任务和其完成分开的“生产者”。这样做并非出于技术上的必要性,而是为了将完成方法从仅用于消费的对象中分离出来。然后可以分发一个Task,而无需担心它在你不知情的情况下被完成;完成信号是创建任务的任何东西的实现细节,并且通过保持 TaskCompletionSource 使完成它的权利保留给自己。(CancellationToken 和 CancellationTokenSource 遵循类似的模式:CancellationToken 只是一个包装 CancellationTokenSource 的结构体,提供与取消信号相关联的公共表面区域,但没有能力产生信号,该能力限制为具有 CancellationTokenSource 访问权限的人员。)

当然,我们可以为这个 MyTask 实现类似于 Task 提供的组合器和助手函数。想要一个简单的 MyTask.WhenAll 吗?给你:

public static MyTask WhenAll(MyTask t1, MyTask t2)
{
    var t = new MyTask();

    int remaining = 2;
    Exception? e = null;

    Action<MyTask> continuation = completed =>
    {
        e ??= completed._error; // just store a single exception for simplicity
        if (Interlocked.Decrement(ref remaining) == 0)
        {
            if (e is not null) t.SetException(e);
            else t.SetResult();
        }
    };

    t1.ContinueWith(continuation);
    t2.ContinueWith(continuation);

    return t;
}

想要一个 MyTask.Run 吗?来了:

public static MyTask Run(Action action)
{
    var t = new MyTask();

    ThreadPool.QueueUserWorkItem(_ =>
    {
        try
        {
            action();
            t.SetResult();
        }
        catch (Exception e)
        {
            t.SetException(e);
        }
    });

    return t;
}

那 MyTask.Delay 呢?当然可以:

public static MyTask Delay(TimeSpan delay)
{
    var t = new MyTask();

    var timer = new Timer(_ => t.SetResult());
    timer.Change(delay, Timeout.InfiniteTimeSpan);

    return t;
}

你懂的。

有了 Task,.NET 中以前的所有异步模式都成为了过去。在以前使用 APM 模式或 EAP 模式实现异步实现的任何地方,都会公开新的返回 Task 的方法。

接着是值任务(ValueTasks)

到目前为止,Task 仍然是 .NET 中异步处理的主力,每个版本都会公开新的方法,并且在整个生态系统中通常返回 Task 和Task<TResult>。但是,Task 是一个类,这意味着创建它会带来一些分配开销。对于大多数情况下长时间运行的异步操作而言,多分配一个对象的开销微不足道,并且除了最敏感的操作外,不会显着影响性能。但是,正如之前所述,同步完成异步操作相当普遍。Stream.ReadAsync 被引入以返回一个 Task<int>,但是如果您从 BufferedStream 中读取数据,则由于只需从内存缓冲区中提取数据而不是执行 syscalls 和真实 I/O,很可能有很多读取操作将同步完成。必须额外分配一个对象才能返回此类数据是不幸的(请注意,在 APM 中也是如此)。对于不返回泛型 Task 的方法,该方法可以返回一个已经完成的单例任务,实际上,Task.CompletedTask 就提供了这样的单例。但对于 Task<TResult>,不可能为每个可能的 TResult 缓存一个 Task。我们该如何使得这样的同步完成更快?

有可能缓存一些 Task<TResult>。例如,Task<bool> 非常常见,并且只有两个有意义的事情可以缓存:当 Result 为 true 时的 Task<bool> 和当 Result 为 false 时的 Task<bool>。虽然我们不想尝试缓存四十亿个 Task<int> 来容纳每个可能的 Int32 结果,但小的 Int32 值非常常见,因此我们可以为 -1 到 8 的值缓存几个。对于任意类型,default 是一个相当常见的值,因此我们可以为每种相关类型缓存 Task<TResult>,其中 Result 是 default(TResult)。实际上,Task.FromResult 在最近的 .NET 版本中就这样做了,使用可重用的 Task<TResult> 单例缓存,并在适当时返回其中之一,否则为提供的确切结果值分配新的 Task<TResult>。其他方案可以创建以处理其他合理常见的情况。例如,在处理 Stream.ReadAsync 时,通常会多次在同一流上调用它,所有调用都使用相同的字节数进行读取。而且通常实现能够完全满足该计数请求。这意味着 Stream.ReadAsync 可能会重复返回相同的 int 结果值。为避免在这种情况下发生多个分配,多个 Stream 类型(如 MemoryStream)将缓存它们成功返回的上一个 Task<int>,如果下一次读取也以相同的结果同步完成,则可以再次返回相同的 Task<int> 而不是创建新的任务。但其他情况呢?在那些性能开销真正重要的情况下,如何更普遍地避免同步完成时的分配呢?

这就是 ValueTask<TResult> 登场的地方(也有一个更详细的 ValueTask<TResult> 探讨)。ValueTask<TResult> 最初是 TResult 和 Task<TResult> 之间的辨别联合。归根结底,忽略所有花里胡哨的东西,这就是它的全部内容(或者说,它曾经是这样),它要么是一个立即可用的结果,要么是未来某个时刻结果的承诺:

public readonly struct ValueTask<TResult>
{
   private readonly Task<TResult>? _task;
   private readonly TResult _result;
   ...
}

然后一个方法可以返回这样的 ValueTask<TResult> 而不是 Task<TResult>,虽然返回类型更大,并且有一些更多的间接性开销,但如果 TResult 在需要返回时已知,则可以避免分配 Task<TResult>

然而,有一些超级高性能的极端场景,在这些场景中,即使在异步完成的情况下也要避免 Task<TResult> 分配。例如,Socket 位于网络堆栈的底部,而 Socket 上的 SendAsync 和 ReceiveAsync 处理着许多服务的超热路径,同步和异步完成都非常普遍(大多数发送操作都会同步完成,由于数据已经缓存在内核中,因此很多接收操作也会同步完成)。如果我们可以在给定的 Socket 上使这样的发送和接收操作不需要分配,无论操作是同步完成还是异步完成,那不是很好吗?

这就是 System.Threading.Tasks.Sources.IValueTaskSource<TResult> 登场的地方:

public interface IValueTaskSource<out TResult>
{
    ValueTaskSourceStatus GetStatus(short token);
    void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags);
    TResult GetResult(short token);
}

IValueTaskSource<TResult> 接口允许实现为 ValueTask<TResult> 提供自己的后备对象,使该对象可以实现像 GetResult 这样的方法来检索操作的结果,并使用 OnCompleted 来钩取一个延续到操作上。有了这个,ValueTask<TResult> 的定义发生了一个小变化,它的 Task<TResult>? _task 字段被 object? _obj 字段所替代:

public readonly struct ValueTask<TResult>
{
   private readonly object? _obj;
   private readonly TResult _result;
   ...
}

而 _task 字段要么是 Task<TResult>,要么为 null_obj 字段现在可以是 IValueTaskSource<TResult>。一旦 Task<TResult> 标记为已完成,就无法再次转换为未完成状态。相反,实现 IValueTaskSource<TResult> 的对象完全控制着实现,并且可以自由地双向转换在完成和未完成状态之间,因为 ValueTask<TResult> 的约定是给定的实例只能被消耗一次,因此构造函数不应观察底层实例的后续更改(这就是为什么存在 CA2012 等分析规则)。这使得像 Socket 这样的类型可以池化 IValueTaskSource<TResult> 实例以供重复调用时使用。Socket 最多缓存两个这样的实例,一个用于读取,一个用于写入,因为 99.999% 的情况下最多同时有一个接收和一个发送在进行中。

我提到了 ValueTask<TResult>,但没有提到 ValueTask。当仅涉及避免同步完成分配时,非泛型的 ValueTask(表示无结果的 void 操作)几乎没有性能优势,因为可以使用 Task.CompletedTask 来表示相同的条件。但是,一旦我们关心使用可池化的基础对象来避免在异步完成的情况下进行分配,那么这对于非泛型的也很重要。因此,当引入 IValueTaskSource<TResult> 时,也同时引入了 IValueTaskSource 和 ValueTask

因此,我们有 TaskTask<TResult>ValueTask 和 ValueTask<TResult>。我们可以以各种方式与它们交互,表示任意异步操作,并链接延续以处理这些异步操作的完成。而且,我们可以在操作完成之前或之后这样做。

是的,你说得对。延续仍然是回调函数,我们仍然被绑定到传递延续的风格来编码异步控制流。这使得编写正确且可维护的异步代码变得困难。

为了解决这个问题,在 C# 5.0 中引入了 async/await 特性,作为编写异步代码的一种方式,相比使用显式延续更易于理解和维护。使用 async/await,您可以编写异步代码,它看起来像普通的同步代码,这使得阅读和编写代码变得容易。

async/await 的基本想法是允许开发人员使用标准控制流构造,如 if/else 语句和循环,而不必使用回调函数和手动延续传递来编写异步代码。当您使用 async/await 时,C# 编译器为您生成必要的延续代码,允许您编写看起来和行为像同步代码的异步代码。

因此,如果您想编写更易读和易于维护的异步代码,async/await 绝对是解决方案。

C# 迭代器来解决问题

这个解决方案的曙光实际上出现在 Task 推出之前几年,即在 C# 3.0 中添加了对迭代器的支持。

“迭代器?”你问道。“你是说 IEnumerable<T> 的迭代器吗?”没错。迭代器允许您编写一个单独的方法,然后由编译器用它来实现 IEnumerable<T> 和/或 IEnumerator<T>。例如,如果我想创建一个可枚举对象,以产生斐波那契数列,我可以编写类似于下面这样的代码:

public static IEnumerable<int> Fib()
{
    int prev = 0, next = 1;
    yield return prev;
    yield return next;

    while (true)
    {
        int sum = prev + next;
        yield return sum;
        prev = next;
        next = sum;
    }
}

然后我可以用 foreach 枚举它:

foreach (int i in Fib())
{
    if (i > 100) break;
    Console.Write($"{i} ");
}

我可以通过 System.Linq.Enumerable 上的组合器将它与其他 IEnumerable<T> 组合起来:

using IEnumerator<int> e = Fib().GetEnumerator();
while (e.MoveNext())
{
    int i = e.Current;
    if (i > 100) break;
    Console.Write($"{i} ");
}

以上所有结果都会产生此输出:

0 1 1 2 3 5 8 13 21 34 55 89

这里真正有趣的是,为了实现上述操作,我们需要能够多次进入和退出 Fib 方法。我们调用 MoveNext,它进入该方法,然后该方法执行,直到遇到 yield return 语句,在该语句处,对 MoveNext 的调用需要返回 true,并且对 Current 的后续访问需要返回生成的值。然后我们再次调用 MoveNext,我们需要能够从我们上一次离开的位置重新开始 Fib,而且所有先前调用的状态都需要保持不变。迭代器实际上是由 C# 语言/编译器提供的协程,编译器将我的 Fib 迭代器扩展为一个完整的状态机:

public static IEnumerable<int> Fib() => new <Fib>d__0(-2);

[CompilerGenerated]
private sealed class <Fib>d__0 : IEnumerable<int>, IEnumerable, IEnumerator<int>, IEnumerator, IDisposable
{
    private int <>1__state;
    private int <>2__current;
    private int <>l__initialThreadId;
    private int <prev>5__2;
    private int <next>5__3;
    private int <sum>5__4;

    int IEnumerator<int>.Current => <>2__current;
    object IEnumerator.Current => <>2__current;

    public <Fib>d__0(int <>1__state)
    {
        this.<>1__state = <>1__state;
        <>l__initialThreadId = Environment.CurrentManagedThreadId;
    }

    private bool MoveNext()
    {
        switch (<>1__state)
        {
            default:
                return false;
            case 0:
                <>1__state = -1;
                <prev>5__2 = 0;
                <next>5__3 = 1;
                <>2__current = <prev>5__2;
                <>1__state = 1;
                return true;
            case 1:
                <>1__state = -1;
                <>2__current = <next>5__3;
                <>1__state = 2;
                return true;
            case 2:
                <>1__state = -1;
                break;
            case 3:
                <>1__state = -1;
                <prev>5__2 = <next>5__3;
                <next>5__3 = <sum>5__4;
                break;
        }
        <sum>5__4 = <prev>5__2 + <next>5__3;
        <>2__current = <sum>5__4;
        <>1__state = 3;
        return true;
    }

    IEnumerator<int> IEnumerable<int>.GetEnumerator()
    {
        if (<>1__state == -2 &&
            <>l__initialThreadId == Environment.CurrentManagedThreadId)
        {
            <>1__state = 0;
            return this;
        }
        return new <Fib>d__0(0);
    }

    IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable<int>)this).GetEnumerator();
    void IEnumerator.Reset() => throw new NotSupportedException();
    void IDisposable.Dispose() { }
}

现在,Fib 的所有逻辑都在 MoveNext 方法中,但作为跳转表的一部分,让实现可以分支到其上次离开的位置,并在枚举器类型上生成一个状态字段来跟踪它。而我写为局部变量的变量,比如 prevnext 和 sum,已经被“提升”为枚举器的字段,以便它们可以在 MoveNext 调用之间持久存在。

(请注意,之前展示的 C# 编译器生成实现的代码片段不能直接编译。C# 编译器合成“无法命名”的名称,这意味着它将创建的类型和成员命名为 IL 可以识别但 C# 不允许的方式,以避免与任何用户命名的类型和成员发生命名冲突。我保留了编译器命名的所有名称,但如果您想尝试编译它,请将名称重命名为使用有效的 C# 名称。)

在我之前的示例中,最后一种枚举形式涉及手动使用 IEnumerator<T>。在这个级别上,我们手动调用 MoveNext(),并决定何时重新进入协程。但是...如果我可以不像那样手动调用它,而是让下一次 MoveNext 调用实际上成为异步操作完成时执行的延续工作的一部分呢?如果我可以 yield return 一个表示异步操作的对象,并且消费者代码将继续连接到该生成的对象上,其中该继续执行 MoveNext?通过这种方法,我可以编写一个类似于以下内容的帮助程序方法:

static Task IterateAsync(IEnumerable<Task> tasks)
{
    var tcs = new TaskCompletionSource();

    IEnumerator<Task> e = tasks.GetEnumerator();

    void Process()
    {
        try
        {
            if (e.MoveNext())
            {
                e.Current.ContinueWith(t => Process());
                return;
            }
        }
        catch (Exception e)
        {
            tcs.SetException(e);
            return;
        }
        tcs.SetResult();
    };
    Process();

    return tcs.Task;
}

现在变得有趣了。我们得到了一个可枚举的任务,可以通过迭代来遍历。每次我们 MoveNext 到下一个任务并获取一个任务时,我们将继续连接到该任务;当该任务完成时,它将立即调用回相同的逻辑,执行 MoveNext,获取下一个任务,以此类推。这是基于 Task 作为任何异步操作的单个表示的思想构建的,因此我们接收到的可枚举序列可以是任何异步操作的序列。这样的序列可能来自哪里?当然是来自一个迭代器。还记得我们之前的 CopyToStreamToStream 示例和基于 APM 的实现有多么糟糕吗?考虑使用以下代码替代:

static Task CopyStreamToStreamAsync(Stream source, Stream destination)
{
    return IterateAsync(Impl(source, destination));

    static IEnumerable<Task> Impl(Stream source, Stream destination)
    {
        var buffer = new byte[0x1000];
        while (true)
        {
            Task<int> read = source.ReadAsync(buffer, 0, buffer.Length);
            yield return read;
            int numRead = read.Result;
            if (numRead <= 0)
            {
                break;
            }

            Task write = destination.WriteAsync(buffer, 0, numRead);
            yield return write;
            write.Wait();
        }
    }
}

哇,这几乎是可读的。我们调用了 IterateAsync 帮助程序方法,而我们提供给它的可枚举对象是由一个处理所有控制流的迭代器生成的。它调用了 Stream.ReadAsync 然后 yield return 该任务;在调用 MoveNext 后,将传递该生成的任务给 IterateAsyncIterateAsync 将为该任务连接一个延续,在任务完成时再次调用 MoveNext,并回到此处的迭代器中 yield 后面的位置。此时,Impl逻辑获取了该方法的结果,调用 WriteAsync,并再次返回它产生的任务。以此类推。

朋友们,这就是 C# 和 .NET 中 async/await 的开始。在 C# 编译器中,大约有 95% 的支持迭代器和 async/await 的逻辑是共享的。不同的语法,涉及不同的类型,但基本上是相同的转换。仔细看看 yield return,你几乎可以看到 await 取而代之的样子。

事实上,在 async/await 推出之前,[一些有创造力的开发人员就已经将迭代器用于异步编程](Concurrent Affairs: Simplified APM with the AsyncEnumerator | Microsoft Learn)。在实验性的 [Axum 编程语言](Axum (programming language) - Wikipedia)中也进行了类似的转换,这成为 C# 异步支持的关键灵感来源。Axum 提供了一个可以放置在方法上的 async 关键字,就像现在在 C# 中可以使用 async 一样。由于 Task 还没有普及,因此在 async 方法内部,Axum 编译器启发式地匹配同步方法调用与其 APM 对应方法,例如如果它看到您调用 stream.Read,它会找到并利用相应的 stream.BeginRead 和 stream.EndRead 方法,并生成适当的委托以传递给 Begin 方法,同时还为正在定义的异步方法生成完整的 APM 实现,使其具备组合性。它甚至与 SynchronizationContext 集成!虽然最终 Axum 放弃了,但它作为最终成为 C# async/await 的神奇和激励原型。

async/await 的幕后

现在我们知道我们是如何到达这里的,让我们深入了解它的实际工作原理。作为参考,这里又是我们的示例同步方法:

public void CopyStreamToStream(Stream source, Stream destination)
{
    var buffer = new byte[0x1000];
    int numRead;
    while ((numRead = source.Read(buffer, 0, buffer.Length)) != 0)
    {
        destination.Write(buffer, 0, numRead);
    }
}

下面是使用 async/await 的相应方法的样子:

public async Task CopyStreamToStreamAsync(Stream source, Stream destination)
{
    var buffer = new byte[0x1000];
    int numRead;
    while ((numRead = await source.ReadAsync(buffer, 0, buffer.Length)) != 0)
    {
        await destination.WriteAsync(buffer, 0, numRead);
    }
}

与迄今所见的一切相比,这是一股清新的气息。签名从 void 变成了 async Task,我们分别调用 ReadAsync 和 WriteAsync 而不是 Read 和 Write,并且这两个操作都以 await 作为前缀。就是这样。编译器和核心库接管了剩下的部分,从根本上改变了代码实际执行的方式。让我们深入探讨它的原理。

编译器转换

正如我们已经看到的那样,与迭代器一样,编译器将异步方法重写为基于状态机的方法。我们仍然有一个开发人员编写的具有相同签名的方法(public Task CopyStreamToStreamAsync(Stream source, Stream destination)),但该方法的主体完全不同:

[AsyncStateMachine(typeof(<CopyStreamToStreamAsync>d__0))]
public Task CopyStreamToStreamAsync(Stream source, Stream destination)
{
    <CopyStreamToStreamAsync>d__0 stateMachine = default;
    stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
    stateMachine.source = source;
    stateMachine.destination = destination;
    stateMachine.<>1__state = -1;
    stateMachine.<>t__builder.Start(ref stateMachine);
    return stateMachine.<>t__builder.Task;
}

private struct <CopyStreamToStreamAsync>d__0 : IAsyncStateMachine
{
    public int <>1__state;
    public AsyncTaskMethodBuilder <>t__builder;
    public Stream source;
    public Stream destination;
    private byte[] <buffer>5__2;
    private TaskAwaiter <>u__1;
    private TaskAwaiter<int> <>u__2;

    ...
}

请注意,与开发人员编写的唯一签名差异是缺少 async 关键字本身。async 实际上并不是方法签名的一部分;就像 unsafe 一样,当您将其放在方法签名中时,您正在表达方法的实现细节,而不是实际作为合同的一部分公开的内容。使用 async/await 实现返回 Task 的方法是一项实现细节。

编译器生成了一个名为 <CopyStreamToStreamAsync>d__0 的结构体,并在堆栈上将该结构体的实例初始化为零。重要的是,如果异步方法同步完成,则状态机永远不会离开堆栈。这意味着除非该方法需要异步完成(即它 await 此时尚未完成的某些内容),否则与状态机相关联的分配就不存在了。稍后再详细讲解。

这个结构体是方法的状态机,它不仅包含了开发人员编写的转换逻辑,还包含用于跟踪该方法中当前位置以及编译器从方法中提取并需要在 MoveNext 调用之间保留的所有“局部”状态的字段。它是 IEnumerable<T>/IEnumerator<T> 实现的逻辑等效物,就像我们在迭代器中看到的那样。(请注意,我展示的代码来自发布版本;在调试版本中,C# 编译器实际上会将这些状态机类型生成为类,因为这样做可以帮助某些调试操作)。

在初始化状态机之后,我们看到一个调用 AsyncTaskMethodBuilder.Create()。虽然我们目前专注于 Task,但是 C# 语言和编译器允许从异步方法返回任意类型(“task-like” 类型),例如,我可以编写一个 public async MyTask CopyStreamToStreamAsync 方法,并且只要我们通过适当的方式扩展先前定义的 MyTask,就可以正常编译。这包括声明关联的“builder”类型并通过 AsyncMethodBuilder attribute 将其与类型关联:

[AsyncMethodBuilder(typeof(MyTaskMethodBuilder))]
public class MyTask
{
    ...
}

public struct MyTaskMethodBuilder
{
    public static MyTaskMethodBuilder Create() { ... }

    public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine { ... }
    public void SetStateMachine(IAsyncStateMachine stateMachine) { ... }

    public void SetResult() { ... }
    public void SetException(Exception exception) { ... }

    public void AwaitOnCompleted<TAwaiter, TStateMachine>(
        ref TAwaiter awaiter, ref TStateMachine stateMachine)
        where TAwaiter : INotifyCompletion
        where TStateMachine : IAsyncStateMachine { ... }
    public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(
        ref TAwaiter awaiter, ref TStateMachine stateMachine)
        where TAwaiter : ICriticalNotifyCompletion
        where TStateMachine : IAsyncStateMachine { ... }

    public MyTask Task { get { ... } }
}

在这个上下文中,这样的“builder”是知道如何创建该类型实例(Task 属性),如果合适就可以成功地完成它并带有结果(SetResult)或异常(SetException),并处理连接到尚未完成的等待事物的连续性(AwaitOnCompleted/AwaitUnsafeOnCompleted)。对于 System.Threading.Tasks.Task,它默认与 AsyncTaskMethodBuilder 关联。通常情况下,通过应用于类型的 [AsyncMethodBuilder(...)] attribute 提供此关联,但是 Task 对 C# 知道特别重要,因此实际上没有使用该属性进行修饰。因此,编译器已经找到了用于此异步方法的构建器,并正在使用模式的一部分 Create 方法构造其实例。请注意,与状态机一样,AsyncTaskMethodBuilder 也是一个结构体,因此这里也没有分配。

然后,状态机使用该入口点方法的参数进行填充。这些参数需要在移动到 MoveNext 的方法主体中可用,因此这些参数需要存储在状态机中,以便可以在对 MoveNext 的后续调用中由代码引用。状态机还初始化为处于初始状态 -1。如果调用 MoveNext 并且状态为 -1,则我们将逻辑上从方法的开头开始。

现在是最不起眼但最具有影响力的一行:调用构建器的 Start 方法。这是异步方法返回位置所使用的类型中必须公开的另一部分模式,它用于对状态机执行初始 MoveNext。构建器的 Start 方法实际上就是这样的:

public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
    stateMachine.MoveNext();
}

因此,调用 stateMachine.<>t__builder.Start(ref stateMachine) 其实就是调用 stateMachine.MoveNext()。既然如此,为什么编译器不直接发出呢?为什么要有 Start 呢?答案是 Start 比我所述的多一些微小细节。但是,为了了解它,我们需要简要地介绍 ExecutionContext

ExecutionContext

我们都熟悉从方法到方法传递状态的过程。您调用一个方法,如果该方法指定了参数,则使用参数调用该方法,以便将数据提供给被调用者。这是显式地传递数据。但还有其他更隐含的方式。例如,方法可以没有参数,但可以指示在进行方法调用之前可能填充某些特定的静态字段,并且该方法将从那里提取状态。方法的签名并不表示它接受参数,因为它不接受:调用方和被调用方之间只有一个隐式契约,即调用方可能会填充某些内存位置,而被调用方可能会从那些内存位置读取。如果它们是中介人,被调用方和调用方甚至可能意识不到正在发生什么,例如方法 A 可能填充静态字段,然后调用 B,B 调用 C,C 调用 D,最终调用 E 读取这些静态字段的值。这通常称为“环境”数据:它不是通过参数传递给您的,而是只是在那里“闲逛”,如果需要,就可用于您消费。

我们可以进一步采用线程本地状态。线程本地状态,使用 [ThreadStatic] 属性的静态字段或 ThreadLocal<T> 类型在 .NET 中实现,可以以相同的方式使用,但数据仅限于当前执行线程,并且每个线程都能够拥有自己隔离的这些字段的副本。因此,您可以填充线程静态字段,进行方法调用,然后在方法完成时恢复对线程静态字段的更改,从而实现完全隔离的隐式传递数据的形式。

但是,异步情况下该怎么办呢?如果我们进行异步方法调用,并且异步方法内部的逻辑想要访问该环境数据,该怎么做呢?如果数据存储在常规静态字段中,异步方法将能够访问它,但您一次只能有一个这样的方法在运行,因为多个调用者可能会在写入这些共享静态字段时互相覆盖状态。如果数据存储在线程静态字段中,异步方法将能够访问它,但只能在停止在调用线程上同步运行之前;如果它连接了一个继续某个由它启动的操作的回调函数,而该回调函数最终在其他线程上运行,那么它将不再访问线程静态信息。即使它恰好在同一线程上运行,无论是因为机会还是因为调度器强制要求,当它运行时,很可能数据已被该线程启动的某些其他操作删除和/或覆盖。对于异步性,我们需要一种机制,允许任意环境数据在这些异步点之间流动,这样在异步方法的逻辑中,无论在何处和何时运行,它都可以访问相同的数据。

这时候,就需要 ExecutionContext 来实现。ExecutionContext 类型是异步操作之间环境数据流动的载体。它位于 [ThreadStatic] 中,但是当某个异步操作被启动时,它被“捕获”(一种说法是“从该线程静态读取一个副本”),存储起来,然后在运行该异步操作的连续性时,首先将 ExecutionContext 恢复到将要运行该操作的线程上的 [ThreadStatic] 中。ExecutionContext 是实现 AsyncLocal<T> 的机制(事实上,在 .NET Core 中,ExecutionContext 完全与 AsyncLocal<T> 相关,没有其他作用),因此如果您将值存储到 AsyncLocal<T> 中,然后例如将工作项排队以在线程池上运行,那么该值将在池中运行的该工作项内部的 AsyncLocal<T> 中可见:

var number = new AsyncLocal<int>();

number.Value = 42;
ThreadPool.QueueUserWorkItem(_ => Console.WriteLine(number.Value));
number.Value = 0;

Console.ReadLine();

这将每次运行时都打印出 42。重置 AsyncLocal<int> 的值回到 0 在我们排队委托的那一刻之后并不重要,因为在 QueueUserWorkItem 调用的捕获期间,ExecutionContext 被捕获,该捕获包括 AsyncLocal<int> 在那个确切时刻的状态。我们可以通过自己实现简单的线程池来更详细地了解这一点:

using System.Collections.Concurrent;

var number = new AsyncLocal<int>();

number.Value = 42;
MyThreadPool.QueueUserWorkItem(() => Console.WriteLine(number.Value));
number.Value = 0;

Console.ReadLine();

class MyThreadPool
{
    private static readonly BlockingCollection<(Action, ExecutionContext?)> s_workItems = new();

    public static void QueueUserWorkItem(Action workItem)
    {
        s_workItems.Add((workItem, ExecutionContext.Capture()));
    }

    static MyThreadPool()
    {
        for (int i = 0; i < Environment.ProcessorCount; i++)
        {
            new Thread(() =>
            {
                while (true)
                {
                    (Action action, ExecutionContext? ec) = s_workItems.Take();
                    if (ec is null)
                    {
                        action();
                    }
                    else
                    {
                        ExecutionContext.Run(ec, s => ((Action)s!)(), action);
                    }
                }
            })
            { IsBackground = true }.UnsafeStart();
        }
    }
}

这里,MyThreadPool 有一个 BlockingCollection<(Action, ExecutionContext?)>,表示其工作项队列,每个工作项都是要调用的工作委托以及与该工作关联的 ExecutionContext。池的静态构造函数启动了一堆线程,每个线程仅在一个无限循环中,获取下一个工作项并运行它。如果没有为给定委托捕获 ExecutionContext,则直接调用该委托。但是如果已捕获到 ExecutionContext,我们不会直接调用该委托,而是调用 ExecutionContext.Run 方法,在运行委托之前将提供的 ExecutionContext 恢复为当前上下文,并在之后重置该上下文。此示例包括先前显示的具有 AsyncLocal<int> 的完全相同代码,但这次使用 MyThreadPool 而不是 ThreadPool,但每次仍将输出 42,因为池正确地流动了 ExecutionContext

顺便提一下,您会注意到我在 MyThreadPool 的静态构造函数中调用了 UnsafeStart。启动新线程正是应该流动 ExecutionContext 的异步点,实际上,Thread 的 Start 方法使用 ExecutionContext.Capture 来捕获当前上下文,在 Thread 上存储它,然后在最终调用 ThreadStart 委托时使用该捕获的上下文。但是,在此示例中,我不想这样做,因为我不想让线程在静态构造函数运行时捕获任何现有的 ExecutionContext(这样做可能会使关于 ExecutionContext 的演示变得更加复杂),因此我改用了 UnsafeStart 方法。以 Unsafe 开头的与线程相关的方法与相应缺乏 Unsafe 前缀的方法完全相同,只是它们不会捕获 ExecutionContext,例如,Thread.Start 和 Thread.UnsafeStart 完成相同的工作,但是 Start 捕获 ExecutionContext,而 UnsafeStart 不会。

回到 Start

当我写关于 AsyncTaskMethodBuilder.Start 的实现时,我们绕道讨论了 ExecutionContext,我说过它实际上是这样的:

public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
    stateMachine.MoveNext();
}

然后,我建议简化一下。这种简化忽略了该方法实际上需要考虑 ExecutionContext 的事实,因此更像是这样的:

public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
    ExecutionContext previous = Thread.CurrentThread._executionContext; // [ThreadStatic] field
    try
    {
        stateMachine.MoveNext();
    }
    finally
    {
        ExecutionContext.Restore(previous); // internal helper
    }
}

我们不仅仅是调用 stateMachine.MoveNext(),而是在这里进行了一些操作:获取当前 ExecutionContext,然后调用 MoveNext,最后在 MoveNext 完成后将当前上下文重置回之前的状态。

这么做的原因是防止环境数据从异步方法泄漏到其调用者中。一个示例方法可以说明为什么这很重要:

async Task ElevateAsAdminAndRunAsync()
{
    using (WindowsIdentity identity = LoginAdmin())
    {
        using (WindowsImpersonationContext impersonatedUser = identity.Impersonate())
        {
            await DoSensitiveWorkAsync();
        }
    }
}

“Impersonation(模拟)”是将当前用户的环境信息更改为其他人的行为;这使得代码可以代表其他人使用他们的权限和访问。在 .NET 中,这种模拟跨异步操作进行流动,这意味着它是 ExecutionContext 的一部分。现在想象一下,如果 Start 没有恢复先前的上下文,并考虑以下代码:

Task t = ElevateAsAdminAndRunAsync();
PrintUser();
await t;

此代码可能发现 ElevateAsAdminAndRunAsync 内部修改的 ExecutionContext 在 ElevateAsAdminAndRunAsync 返回到其同步调用者后仍然存在(这发生在该方法第一次等待尚未完成的任务时)。因为在调用 Impersonate 后,我们调用 DoSensitiveWorkAsync 并等待它返回的任务。假设该任务还没有完成,它将导致 ElevateAsAdminAndRunAsync 调用暂停并返回给调用方,当前线程上的模拟仍然有效。这不是我们想要的结果。因此,Start 建立了这个保护机制,确保对 ExecutionContext 的任何修改不会流出同步方法调用,并且只会随着方法执行的任何后续工作一起流动。