分布式(一致性协议)之领导人选举(DotNext.Net.Cluster实现Raft选举)

继分布式锁之后的又一高可用技术爽文之分布式领导选举 或者说 分布式一致性协议的实现

分布式选举是实现高可用的必备技术,想实现主从,就必须得有选举的策略,有主从才会有一个真正的管理端进行资源的协调分配。.

首先需要明确的是一致性算法的目标是什么,主要面对的问题是在只使用单个服务器时由于发生错误导致数据丢失等事情发生。解决这个问题的思路也很简单,就是备份,集群,多个服务器,将操作重复到多个机器上就不怕单个机器出错了。但随之而来的就是,数据不一致、乱序等问题,一致性算法想要做到的是即使有结点出错,对外仍是一个完整的可以正常工作的整体。

选举算法

实现一致性协议(选举)的主要算法有两种

  1. 1. Raft

  2. 2. Paxos

相当于 Paxos 来讲, Raft协议相对来讲简单一点。但是,Raft 实现起来也不是很容易,如果有朋友试图想去实现可以参考,这个地方

地址 :https://zinglix.xyz/2020/06/25/raft/

我个人也是 简单的理解了一下。

Raft 是一个非拜占庭的一致性算法,即所有通信是正确的而非伪造的。N 个结点的情况下(N为奇数)可以最多容忍 (N−1)/2个结点故障

为啥需要单独的选举算法

我曾经试图实现一个WEB服务功能,我不能保证这个服务的高可用,我又不想用其他的现有服务,我就想让服务自己本身能支持高可用。

当时,因为自己认知的问题,并没有短时间找到一个可用的方案,不过现在有了。

要是当时有,可能就是别样风景了,不好说。

分布式选举的大致算法

简单的来讲就是找到一个领头的,假设有一个leader key,redis里,谁抢到了,谁就是leader,也是可以实现的。这种分布式锁实现的领导选举也可以适用于简单的项目中,并且,支持单个服务主机。

想用分布式选举算法,机器最少得2台以上,或者是概念上的两台。

Raft中主要有三个角色 Leader(领导人)、Follower (跟随者)和 Candidate(候选人),当某台机器成为了领导人,就会成为主要对外对接人,然后把对接的事情同步给下边的跟随者或者候选人同步信息。

因为对外只能有一个主服务,起到协调管理的作用。

这样,就会把相应的指令(日志)分配各个客户端,起到数据一致性的作用 ,这样,当领导人废了,下个人接任,还能继续起到作用。

Raft 选举的实例

我找了很多.Net 实现的Raft,很多只能说是个玩具,不能用于生产。

不过幸好,还真有生产级别的。

那就是 DotNext.Net.Cluster 和 DotNext.AspNetCore.Cluster (支持http ) 。

主要是基于 DotNext 中的组件。

DotNext.Net.Cluster 和 DotNext.AspNetCore.Cluster

  1. 1. DotNext.Net.Cluster包含集群编程模型、Raft 算法的传输无关实现、Raft 的 TCP 和 UDP 传输绑定、HyParView membersip 协议的传输无关实现,用于基于 Gossip 的消息传递

  2. 2. DotNext.AspNetCore.Cluster是基于DotNext.Net.Cluster库的 Raft 和 HyParView 算法的具体实现,用于构建 ASP.NET Core 应用程序

支持的功能列表

支持的功能列表:

  1. 1. 网络传输:TCP、UDP、HTTP 1.1、HTTP/2、HTTP/3

  2. 2. TLS 支持:TCP、HTTP 1.1、HTTP/2、HTTP/3

  3. 3. 支持日志压缩的高性能、通用Persistent Write-Ahead Log

  4. 4. 跨集群节点复制日志条目

  5. 5. 与 ASP.NET Core 框架紧密集成

  6. 6. 对 Docker/LXC/Windows 容器友好

  7. 7. 一切都是可扩展的

  • • 7.1 自定义预写日志

  • • 7.2 自定义网络传输

  • • 7.3 集群成员发现

基于 DotNext.Net.Cluster 的TCP 选举实例

其实他也是支持http的,当然,更多姿势,得大佬自己去挖掘了。

项目大致结构

分布式(一致性协议)之领导人选举(DotNext.Net.Cluster实现Raft选举)

细心的小伙伴就会发现,这个是个.Net 6的项目,因为它的nuget包,只支持.Net 6的。

有需要的可以自己改改。

项目是参考源示例,改了一下,有需要的朋友直接去看官方案例

项目重点

Install-Package DotNext.Net.Cluster -Version 4.6.0

DataModifier.cs

internal sealed class DataModifier : BackgroundService
{
    private readonly IRaftCluster cluster;
    private readonly ISupplier<long> valueProvider;

    public DataModifier(IRaftCluster cluster, ISupplier<long> provider)
    {
        this.cluster = cluster;
        valueProvider = provider;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            await Task.Delay(1000, stoppingToken).ConfigureAwait(false);

            var leadershipToken = cluster.LeadershipToken;
            TitleInfo.Show(!leadershipToken.IsCancellationRequested);
            if (!leadershipToken.IsCancellationRequested)
            {
                var newValue = valueProvider.Invoke() + 500L;
                Console.WriteLine("保存领导节点生成的值 {0}", newValue);

                var source = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, leadershipToken);
                try
                {
                    var entry = new Int64LogEntry { Content = newValue, Term = cluster.Term };
                    await cluster.ReplicateAsync(entry, source.Token);
                }
                catch (Exception e)
                {
                    Console.WriteLine("未知异常 {0}", e);
                }
                finally
                {
                    source?.Dispose();
                }
            }
        }
    }
}

这个应该是核心服务,会与其他客户端进行通信和具体的选举,以及日志的传输

Program.cs

class Program
{
    static async Task Main(string[] args)
    {
        await UseTcpTransport(Path.Combine(AppContext.BaseDirectory, "raftConfig"));
    }
    static Task UseTcpTransport(string path)
    {
        //获取所有配置
        var jsonConfiguration = new ConfigurationBuilder().SetBasePath(Environment.CurrentDirectory).AddJsonFile("appsettings.json", optional: true, reloadOnChange: true).Build();
        var NodeInfo = new NodeInfo();
        jsonConfiguration.Bind("NodeInfo", NodeInfo);
        Console.WriteLine($"MainNode:{NodeInfo.MainNode}");
        TitleInfo.Node = NodeInfo.MainNode;
        var configuration = new RaftCluster.TcpConfiguration(IPEndPoint.Parse(NodeInfo.MainNode))
        {
            RequestTimeout = TimeSpan.FromMilliseconds(140),
            LowerElectionTimeout = 150,
            UpperElectionTimeout = 300,
            TransmissionBlockSize = 4096,
            ColdStart = false,
        };

        //加载全部地址
        //线上环境自己重写服务
        var builder = configuration.UseInMemoryConfigurationStorage().CreateActiveConfigurationBuilder();
        foreach (var item in NodeInfo.Nodes)
        {
            var address = IPEndPoint.Parse(item);
            builder.Add(ClusterMemberId.FromEndPoint(address), address);
        }
        builder.Build();

        TitleInfo.Show();
        return UseConfiguration(configuration, path);
    }
    static async Task UseConfiguration(RaftCluster.NodeConfiguration config, string? persistentStorage)
    {
        var loggerFactory = new LoggerFactory();
        var loggerOptions = new ConsoleLoggerOptions
        {
            LogToStandardErrorThreshold = LogLevel.Warning
        };
        loggerFactory.AddProvider(new ConsoleLoggerProvider(new FakeOptionsMonitor<ConsoleLoggerOptions>(loggerOptions)));
        config.LoggerFactory = loggerFactory;
        using var cluster = new RaftCluster(config);
        cluster.LeaderChanged += ClusterConfigurator.LeaderChanged;
        var modifier = default(DataModifier?);
        if (!string.IsNullOrEmpty(persistentStorage))
        {
            var state = new SimplePersistentState(persistentStorage, new AppEventSource());
            cluster.AuditTrail = state;
            modifier = new DataModifier(cluster, state);
        }
        await cluster.StartAsync(CancellationToken.None);
        await (modifier?.StartAsync(CancellationToken.None) ?? Task.CompletedTask);
        //控制台等待取消
        using var handler = new CancelKeyPressHandler();
        Console.CancelKeyPress += handler.Handler;
        await handler.WaitAsync();
        Console.CancelKeyPress -= handler.Handler;

        //停止服务
        await (modifier?.StopAsync(CancellationToken.None) ?? Task.CompletedTask);
        await cluster.StopAsync(CancellationToken.None);
    }
}

总体来说,项目还是很简单的。

我把客户端的地址给配置了

appsettings.json

这个结构应该很容易理解,一个是当前端的地址,一个是所有节点的地址,当然也包含当前地址。

{
  "NodeInfo": {
    "MainNode": "127.0.0.1:6001" ,
    "Nodes": [
      "127.0.0.1:6001",
      "127.0.0.1:6002",
      "127.0.0.1:6003"
    ]
  }
}

运行方式

我自己是把Bin目录复制三份,每份的 appsettings.json 修改下,然后,双击 RaftDemo.exe 就运行起来了。

注意

如果起用一个节点没个卵用,最少得两个节点。

运行效果

分布式(一致性协议)之领导人选举(DotNext.Net.Cluster实现Raft选举)

总结

这个库是可以用在生产环境的,所以,还是值得研究一下下的。

代码地址

https://github.com/kesshei/RaftDemo.git

https://gitee.com/kesshei/RaftDemo.git

参考文档

https://zinglix.xyz/2020/06/25/raft/

https://github.com/dotnet/dotNext/tree/master/src/cluster