Kafka学习征途:.NET Core操作Kafka

1 可用的Kafka .NET客户端

作为一个.NET Developer,自然想要在.NET项目中集成Kafka实现发布订阅功能。那么,目前可用的Kafka客户端有哪些呢?

目前.NET圈子主流使用的是 Confluent.Kafka

confluent-kafka-dotnet : https://github.com/confluentinc/confluent-kafka-dotnet

其他主流的客户端还有rdkafka-dotnet项目,但是其已经被并入confluent-kakfa-dotnet项目进行维护了。.

因此,推荐使用confluent-kafka-dotnet,其配置友好,功能也更全面。

NCC千星项目CAP的Kafka扩展包(DotNetCore.CAP.Kafka)内部也是基于Confluent.Kafka来实现的:

Kafka学习征途:.NET Core操作Kafka

接下来,本文就来在.NET Core项目下通过Confluent.Kafka和CAP两个主流开源项目来操作一下Kafka,实现一下发布订阅的功能。

2 基于Confluent.Kafka的Sample

要完成本文示例,首先得有一个启动好的Kafka Broker服务。关于如何搭建Kafka,请参考上一篇:通过Docker部署Kafka集群。
安装相关组件

在.NET Core项目中新建一个类库,暂且命名为EDT.Kafka.Core,安装Confluent.Kafka组件:

PM>Install-Package Confluent.Kafka
编写KafkaService
编写IKafkaService接口:
namespace EDT.Kafka.Core{    public interface IKafkaService    {        Task PublishAsync<T>(string topicName, T message) where T : class;
        Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class;    }}
编写KafkaService实现类:
namespace EDT.Kafka.Core{    public class KafkaService : IKafkaService    {        public static string KAFKA_SERVERS = "127.0.0.1:9091";
        public async Task PublishAsync<T>(string topicName, T message) where T : class        {            var config = new ProducerConfig             {                 BootstrapServers = KAFKA_SERVERS,                BatchSize = 16384, // 修改批次大小为16K                LingerMs = 20 // 修改等待时间为20ms            };            using (var producer = new ProducerBuilder<string, string>(config).Build())            {                await producer.ProduceAsync(topicName, new Message<string, string>                {                    Key = Guid.NewGuid().ToString(),                    Value = JsonConvert.SerializeObject(message)                }); ;            }        }
        public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class        {            var config = new ConsumerConfig            {                BootstrapServers = KAFKA_SERVERS,                GroupId = "Consumer",                EnableAutoCommit = false, // 禁止AutoCommit                Acks = Acks.Leader, // 假设只需要Leader响应即可                AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起            };            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())            {                consumer.Subscribe(topics);                try                {                    while (true)                    {                        try                        {                            var consumeResult = consumer.Consume(cancellationToken);                            Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");                            if (consumeResult.IsPartitionEOF)                            {                                Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");                                continue;                            }                            T messageResult = null;                            try                            {                                messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);                            }                            catch (Exception ex)                            {                                var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";                                Console.WriteLine(errorMessage);                                messageResult = null;                            }                            if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/)                            {                                messageFunc(messageResult);                                try                                {                                    consumer.Commit(consumeResult);                                }                                catch (KafkaException e)                                {                                    Console.WriteLine(e.Message);                                }                            }                        }                        catch (ConsumeException e)                        {                            Console.WriteLine($"Consume error: {e.Error.Reason}");                        }                    }                }                catch (OperationCanceledException)                {                    Console.WriteLine("Closing consumer.");                    consumer.Close();                }            }
            await Task.CompletedTask;        }    }}
为了方便后续的演示,在此项目中再创建一个类 EventData:
public class EventData{    public string TopicName { get; set; }
    public string Message { get; set; }
    public DateTime EventTime { get; set; }}

编写Producer

新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Producer,其主体内容如下:
namespace EDT.Kafka.Demo.Producer{    public class Program    {        static async Task Main(string[] args)        {            KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";            var kafkaService = new KafkaService();            for (int i = 0; i < 50; i++)            {                var eventData = new EventData                {                    TopicName = "testtopic",                    Message = $"This is a message from Producer, Index : {i + 1}",                    EventTime = DateTime.Now                };                await kafkaService.PublishAsync(eventData.TopicName, eventData);            }            Console.WriteLine("Publish Done!");            Console.ReadKey();        }    }}

编写Consumer

新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Consumer,其主体内容如下:
namespace EDT.Kafka.Demo.Consumer{    public class Program    {        static async Task Main(string[] args)        {            KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";            var kafkaService = new KafkaService();            var topics = new List<string> { "testtopic" };            await kafkaService.SubscribeAsync<EventData>(topics, (eventData) =>             {                Console.WriteLine($" - {eventData.EventTime: yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已处理");            });        }    }}

测试Pub/Sub效果

将Producer和Consumer两个项目都启动起来,可以看到当Consumer消费完50条消息并一一确认之后,Producer这边就算发布结束。

Kafka学习征途:.NET Core操作Kafka

3 基于CAP项目的Sample

模拟场景说明

假设我们有两个微服务,一个是Catalog微服务,一个是Basket微服务,当Catalog微服务产生了Product价格更新的事件,就会将其发布到Kafka,Basket微服务作为消费者就会订阅这个消息然后更新购物车中对应商品的最新价格。

Kafka学习征途:.NET Core操作Kafka
Catalog API
新建一个ASP.NET Core WebAPI项目,然后分别安装以下组件:
PM>Install Package DotNetCore.CAPPM>Install Package DotNetCore.CAP.MongoDBPM>Install Package DotNetCore.CAP.Kafka
在Startup中的ConfigureServices方法中注入CAP:
public void ConfigureServices(IServiceCollection services){
    ......    services.AddCap(x =>    {        x.UseMongoDB("mongodb://account:password@mongodb-server:27017/products?authSource=admin");        x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");    });}
新建一个ProductController,实现一个Update产品价格的接口,在其中通过CapPublisher完成发布消息到Kafka:
namespace EDT.Demo.Catalog.API.Controllers{    [ApiController]    [Route("[controller]")]    public class ProductController : ControllerBase    {        private static readonly IList<Product> Products = new List<Product>        {            new Product { Id = "0001", Name = "电动牙刷A", Price = 99.90M,  Introduction = "暂无介绍" },            new Product { Id = "0002", Name = "电动牙刷B", Price = 199.90M,  Introduction = "暂无介绍" },            new Product { Id = "0003", Name = "洗衣机A", Price = 2999.90M,  Introduction = "暂无介绍" },            new Product { Id = "0004", Name = "洗衣机B", Price = 3999.90M,  Introduction = "暂无介绍" },            new Product { Id = "0005", Name = "电视机A", Price = 1899.90M,  Introduction = "暂无介绍" },        };
        private readonly ICapPublisher _publisher;        private readonly IMapper _mapper;
        public ProductController(ICapPublisher publisher, IMapper mapper)        {            _publisher = publisher;            _mapper = mapper;        }
        [HttpGet]        public IList<ProductDTO> Get()        {            return _mapper.Map<IList<ProductDTO>>(Products); ;        }
        [HttpPut]        public async Task<IActionResult> UpdatePrice(string id, decimal newPrice)        {            // 业务代码            var product = Products.FirstOrDefault(p => p.Id == id);            product.Price = newPrice;
            // 发布消息            await _publisher.PublishAsync("ProductPriceChanged",                 new ProductDTO { Id = product.Id, Name = product.Name, Price = product.Price});
            return NoContent();        }    }}
Basket API
 
参照Catalog API项目创建ASP.NET Core WebAPI项目,并安装对应组件,在ConfigureServices方法中注入CAP。
新建一个BasketController,用于订阅Kafka对应Topic:ProductPriceChanged 的消息。
namespace EDT.Demo.Basket.API.Controllers{    [ApiController]    [Route("[controller]")]    public class BasketController : ControllerBase    {        private static readonly IList<MyBasketDTO> Baskets = new List<MyBasketDTO>        {            new MyBasketDTO { UserId = "U001", Catalogs = new List<Catalog>                {                    new Catalog { Product = new ProductDTO { Id = "0001", Name = "电动牙刷A", Price = 99.90M }, Count = 2 },                    new Catalog { Product = new ProductDTO { Id = "0005", Name = "电视机A", Price = 1899.90M }, Count = 1 },                }                },            new MyBasketDTO { UserId = "U002", Catalogs = new List<Catalog>                {                    new Catalog { Product = new ProductDTO { Id = "0002", Name = "电动牙刷B", Price = 199.90M }, Count = 2 },                    new Catalog { Product = new ProductDTO { Id = "0004", Name = "洗衣机B", Price = 3999.90M }, Count = 1 },                }            }        };
        [HttpGet]        public IList<MyBasketDTO> Get()        {            return Baskets;        }
        [NonAction]        [CapSubscribe("ProductPriceChanged")]        public async Task RefreshBasketProductPrice(ProductDTO productDTO)        {            if (productDTO == null)                return;
            foreach (var basket in Baskets)            {                foreach (var catalog in basket.Catalogs)                {                    if (catalog.Product.Id == productDTO.Id)                    {                        catalog.Product.Price = productDTO.Price;                        break;                    }                }            }
            await Task.CompletedTask;        }    }}

测试效果

同时启动Catalog API 和 Basket API两个项目。
首先,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品是199.9元。
Kafka学习征途:.NET Core操作Kafka
然后,通过Swagger在Catalog API中更新Id为0002的商品的价格至499.9元。
Kafka学习征途:.NET Core操作Kafka
最后,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品已更新至499.9元。
Kafka学习征途:.NET Core操作Kafka
End总结
本文总结了.NET Core如何通过对应客户端操作Kafka,基于Confluent.Kafka项目和CAP项目可以方便的实现发布订阅的效果。