给你的后台服务日志增加 CorrelationId

Intro

在我们的 Web 项目里通常会在日志里加一个 requestId/traceId/correlationId 之类的去将 log 串起来,这在 web 项目里通常来说实现起来比较简单,因为我们每个 request 都会有一个 HttpContext 进行关联,而我们的后台服务是没有这样一个东西的,下面是我们项目里的实现

Preface

在我们的项目里,后台服务基本分成了两类,一类是定期的 Job,一类是消息的消费者(包括 RabbitMQ、Kafka、云服务的消息),在我们 job 执行或者处理消息的时候我们通常创建一个 service scope 再从其中获取 service,因为我们很多的服务是注册为 Scoped 的,也正是因为这一点我们可以对我们的服务比较方便地做改造.

CorrelationContext

首先我们需要一个类似于 HttpContext 的概念,我们这里定义为 CorrelationContextCorrelationContext 有一个 CorrelationId 表示这个 context 里的一个 traceId,类似于 HttpContext 中的 TraceIdentifier,其他的信息可以根据需要自行添加

file sealed class CorrelationContext
{
    public CorrelationContext()
    {
        CorrelationId = Guid.NewGuid().ToString();
    }
    
    public string CorrelationId { get; }
}

除此之外我们需要一个类似 HttpContextAccessor 的类型我们定义为 CorrelationContextAccessor,我们需要一个类似于 asp.net 里的 HttpContex.Current 访问方式,因为有一些地方未必支持依赖注入,实现如下:

file sealed class CorrelationContextAccessor
{
    private static readonly AsyncLocal<CorrelationContext> ContextCurrent = new();

    public static CorrelationContext? Context
    {
        get => ContextCurrent.Value;
        set => ContextCurrent.Value = value;
    }
}

这里的实现比较简单,因为我们的 Context 一旦创建基本是不需要修改的,只是用于只读获取 CorrelationId

CorrelationHttpHandler

我们需要对发出去的请求加上 correlationId 以便于我们方便查找我们调用其他团队的 API 的对应 log

public sealed class CorrelationHttpHandler : DelegatingHandler
{
    public const string RequestIdHeaderName = "x-request-id";
    private const string OriginalRequestIdHeaderName = "x-request-id-original";
    
    protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        if (CorrelationContextAccessor.Context != null)
        {
            var correlationId = CorrelationContextAccessor.Context.CorrelationId;
            if (request.Headers.TryGetValues(RequestIdHeaderName, out var originalRequestId))
            {
                request.Headers.Add(OriginalRequestIdHeaderName, originalRequestId);
                request.Headers.Remove(RequestIdHeaderName);
            }
            request.Headers.Add(RequestIdHeaderName, correlationId);
        }
        return base.SendAsync(request, cancellationToken);
    }
}

ServiceProviderExtensions

前面我们说了我们有固定的模式来处理我们的逻辑,我们可以将创建 service scope 的行为替换一下即可,因此我们需要加个扩展方法以加入处理 Correlation 的逻辑

public static class ServiceScopeExtensions
{
    public static void ExecuteWithCorrelationScope(this IServiceProvider serviceProvider, Action<IServiceScope, string> action)
    {
        var scope = serviceProvider.CreateScope();
        try
        {
            var correlationContext = new CorrelationContext();
            CorrelationContextAccessor.Context = correlationContext;
            action.Invoke(scope, correlationContext.CorrelationId);
        }
        finally
        {
            CorrelationContextAccessor.Context = null;
            scope.Dispose();
        }
    }
    
    public static async Task ExecuteWithCorrelationScopeAsync(this IServiceProvider serviceProvider, Func<IServiceScope, string, Task> action)
    {
        var scope = serviceProvider.CreateScope();
        try
        {
            var correlationContext = new CorrelationContext();
            CorrelationContextAccessor.Context = correlationContext;
            await action.Invoke(scope, correlationContext.CorrelationId);
        }
        finally
        {
            CorrelationContextAccessor.Context = null;
            scope.Dispose();
        }
    }
}

这里写了两种委托一种用于同步方法,一种用于异步方法,在开始处理我们的任务之前初始化一个 CorrelationContext,在任务处理结束时将 CorrelationContext 清空

至此我们已经完成了一大半了,剩下就是对接一下对应的日志

Logging extensions

我们的应用里使用的是 NLog,我个人相对更加熟悉和喜欢 Serilog 一些,所以下面示例是基于 Serilog 的,我们可以扩展一个实现 ILogEventEnricher 的 CorrelationEnricher

public sealed class CorrelationEnricher : ILogEventEnricher
{
    public void Enrich(LogEvent logEvent, ILogEventPropertyFactory propertyFactory)
    {
        var correlationContext = CorrelationContextAccessor.Context;
        if (correlationContext != null)
        {
            logEvent.AddPropertyIfAbsent(
                propertyFactory.CreateProperty(
                    nameof(CorrelationContext.CorrelationId), 
                    correlationContext.CorrelationId)
                );
        }
    }
}

当 CorrelationContext 存在时,在日志里添加 CorrelationId 属性

Sample

好了我们的准备工作已经完成,来看我们的示例吧:

SerilogHelper.LogInit(configuration =>
{
    configuration.Enrich.With<CorrelationEnricher>();
    configuration.WriteTo.Console(LogEventLevel.Information
         , "[{Timestamp:HH:mm:ss} {Level:u3}] {CorrelationId} {Message:lj}{NewLine}{Exception}"
    );
});

var serviceCollection = new ServiceCollection()
    .AddLogging(builder => builder.AddSerilog())
    .AddTransient<CorrelationHttpHandler>()
    ;
serviceCollection.AddHttpClient("test", client =>
    {
        client.BaseAddress = new Uri("https://reservation.weihanli.xyz");
    })
    .AddHttpMessageHandler<CorrelationHttpHandler>();
await using var provider = serviceCollection.BuildServiceProvider();

var logger = provider.GetRequiredService<ILoggerFactory>().CreateLogger(typeof(CorrelationIdSample));
logger.LogInformation("Hello 1234");
provider.ExecuteWithCorrelationScope((_, _) =>
{
    logger.LogInformation("Correlation 1-1");
    // do something
    Thread.Sleep(100);
    logger.LogInformation("Correlation 1-2");
});

await provider.ExecuteWithCorrelationScopeAsync(async (scope, _) =>
{
    logger.LogInformation("Correlation 2-1");

    await Task.Delay(100);

    var httpClient = scope.ServiceProvider.GetRequiredService<IHttpClientFactory>()
        .CreateClient("test");
    using var response = await httpClient.GetAsync("/health");
    var httpRequestId = string.Empty;
    if(response.RequestMessage?.Headers.Contains(CorrelationHttpHandler.RequestIdHeaderName) == true);
    {
        httpRequestId = response.RequestMessage.Headers.GetValues(CorrelationHttpHandler.RequestIdHeaderName).StringJoin(",");
    }
    Console.WriteLine($"HttpRequestId: {httpRequestId}");
    var responseText = await response.Content.ReadAsStringAsync();
    logger.LogInformation("ApiResponse: {responseStatus} {responseText}", response.StatusCode.ToString(), responseText);

    logger.LogInformation("Correlation 2-2");
});

logger.LogInformation("Hello 4567");

SerilogHelper.LogInit 是一个 Serilog 的帮助类来初始化 logger 配置,实现代码如下:

public static void LogInit(Action<LoggerConfiguration> configAction)
{
    var loggerConfiguration = new LoggerConfiguration();
    loggerConfiguration.Enrich.FromLogContext();
    configAction?.Invoke(loggerConfiguration);
    Log.Logger = loggerConfiguration.CreateLogger();
}

然后在 AddLogging 的时候注册 Serilog 来记录日志

在注册 HttpClient 的时候通过 AddHttpMessageHandler<CorrelationHttpHandler>() 来注册 Correlation 的逻辑以在发送请求的时候自动加 request-id header,这样我们和别的 team 的日志就能够关联起来了

serviceCollection.AddHttpClient("test",
                client => { client.BaseAddress = new Uri("https://reservation.weihanli.xyz"); })
            .AddHttpMessageHandler<CorrelationHttpHandler>();

然后分别是一个同步方法和一个异步方法的示例,在我们项目原来是直接调用的 CreateScope 方法,现在替换成我们的扩展方法,并将处理逻辑改写成委托的形式即可

在异步方法示例中有一个 HTTP 请求,我们在请求结束后尝试获取 request header 里的 requestId 来看是不是存在并且和当前 correlation context 中的 CorrelationId 一致

var httpClient = scope.ServiceProvider.GetRequiredService<IHttpClientFactory>()
                .CreateClient("test");
using var response = await httpClient.GetAsync("/health");
var httpRequestId = string.Empty;
if (response.RequestMessage?.Headers.Contains(CorrelationHttpHandler.RequestIdHeaderName) == true)
{
    httpRequestId = response.RequestMessage.Headers.GetValues(CorrelationHttpHandler.RequestIdHeaderName)
            .StringJoin(",");
}
Console.WriteLine($"HttpRequestId: {httpRequestId}");

我们来看下整个示例的输出结果:

给你的后台服务日志增加 CorrelationId

output

第一条和最后一层日志在输出的时候没有 CorrelationContext 所以是没有 CorrelationId 的,然后第二条日志和第三条日志是同属于一个 CorrelationContext 拥有同一个 CorrelationId,其他日志属于另外一个 CorrelationContext,有着一样的 CorrelationId 并且可以看到我们发出去的 HTTP 请求里的 header 里也带了我们的 CorrelationId 这样我们的逻辑里有调用其他团队的 API 时也就可以根据这个 CorrelationId 去查日志了

More

这种方式要侵入代码,其实并不是比较理想的方案,我觉得比较理想的方案应该是拦截创建 ServiceScope 的逻辑和 Dispose 的逻辑,在创建 ServiceScope 之后自动的创建一个 CorrelationContext,scope 要销毁之前自动清除,这样我们就不需要定义扩展方法再去修改调用方式了,就可以无侵入代码,但是 ServiceScope 是微软依赖注入的内置的类型,是在 BuildServiceProvider 的时候自动注入的,不支持替换,没有办法通过之前介绍的装饰器模式去做替换,详情可以参考之前提的一个 issue:https://github.com/dotnet/runtime/issues/70882,所以才用了方面的这种方案,而且顺便想吐槽下微软自带的 logging 的 Scope 功能略有鸡肋,不如 Serilog 灵活

这个方案基本可以满足我们日常的需要了,但也还是有些不足,对于 consumer 项目来说还好,每个消息的处理一个 CorrelationContext 也就是一个 CorrelationId,对于 job 项目而言,有些 job 可能一次要处理比较多的数据,如果共用一个 CorrelationId 根据这个 id 查到的数据可能会比较多,不过要细化的话就得多写一些侵入式的代码了,目前我们可以通过一些其他的数据如 customerId 之类的一起查询

另外下一步的改进,想使用 Activity 来代替自定义的 CorrelationContext,这样不仅可以存取更多数据也可以更好的和 OpenTelemetry 结合起来,等改造好了再分享一下基于 Activity 的方案。