1 可用的Kafka .NET客户端
作为一个.NET Developer,自然想要在.NET项目中集成Kafka实现发布订阅功能。那么,目前可用的Kafka客户端有哪些呢?
目前.NET圈子主流使用的是 Confluent.Kafka
confluent-kafka-dotnet : https://github.com/confluentinc/confluent-kafka-dotnet
其他主流的客户端还有rdkafka-dotnet项目,但是其已经被并入confluent-kakfa-dotnet项目进行维护了。.
因此,推荐使用confluent-kafka-dotnet,其配置友好,功能也更全面。
NCC千星项目CAP的Kafka扩展包(DotNetCore.CAP.Kafka)内部也是基于Confluent.Kafka来实现的:

接下来,本文就来在.NET Core项目下通过Confluent.Kafka和CAP两个主流开源项目来操作一下Kafka,实现一下发布订阅的功能。
2 基于Confluent.Kafka的Sample
在.NET Core项目中新建一个类库,暂且命名为EDT.Kafka.Core,安装Confluent.Kafka组件:
PM>Install-Package Confluent.Kafkanamespace EDT.Kafka.Core{public interface IKafkaService{Task PublishAsync<T>(string topicName, T message) where T : class;Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class;}}
namespace EDT.Kafka.Core{public class KafkaService : IKafkaService{public static string KAFKA_SERVERS = "127.0.0.1:9091";public async Task PublishAsync<T>(string topicName, T message) where T : class{var config = new ProducerConfig{BootstrapServers = KAFKA_SERVERS,BatchSize = 16384, // 修改批次大小为16KLingerMs = 20 // 修改等待时间为20ms};using (var producer = new ProducerBuilder<string, string>(config).Build()){await producer.ProduceAsync(topicName, new Message<string, string>{Key = Guid.NewGuid().ToString(),Value = JsonConvert.SerializeObject(message)}); ;}}public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class{var config = new ConsumerConfig{BootstrapServers = KAFKA_SERVERS,GroupId = "Consumer",EnableAutoCommit = false, // 禁止AutoCommitAcks = Acks.Leader, // 假设只需要Leader响应即可AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起};using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()){consumer.Subscribe(topics);try{while (true){try{var consumeResult = consumer.Consume(cancellationToken);Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");if (consumeResult.IsPartitionEOF){Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");continue;}T messageResult = null;try{messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);}catch (Exception ex){var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";Console.WriteLine(errorMessage);messageResult = null;}if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/){messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch (KafkaException e){Console.WriteLine(e.Message);}}}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Closing consumer.");consumer.Close();}}await Task.CompletedTask;}}}
public class EventData{public string TopicName { get; set; }public string Message { get; set; }public DateTime EventTime { get; set; }}
编写Producer
namespace EDT.Kafka.Demo.Producer{public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();for (int i = 0; i < 50; i++){var eventData = new EventData{TopicName = "testtopic",Message = $"This is a message from Producer, Index : {i + 1}",EventTime = DateTime.Now};await kafkaService.PublishAsync(eventData.TopicName, eventData);}Console.WriteLine("Publish Done!");Console.ReadKey();}}}
编写Consumer
namespace EDT.Kafka.Demo.Consumer{public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();var topics = new List<string> { "testtopic" };await kafkaService.SubscribeAsync<EventData>(topics, (eventData) =>{Console.WriteLine($" - {eventData.EventTime: yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已处理");});}}}
测试Pub/Sub效果

3 基于CAP项目的Sample
假设我们有两个微服务,一个是Catalog微服务,一个是Basket微服务,当Catalog微服务产生了Product价格更新的事件,就会将其发布到Kafka,Basket微服务作为消费者就会订阅这个消息然后更新购物车中对应商品的最新价格。

PM>Install Package DotNetCore.CAPPM>Install Package DotNetCore.CAP.MongoDBPM>Install Package DotNetCore.CAP.Kafka
public void ConfigureServices(IServiceCollection services){......services.AddCap(x =>{x.UseMongoDB("mongodb://account:password@mongodb-server:27017/products?authSource=admin");x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");});}
namespace EDT.Demo.Catalog.API.Controllers{[ApiController][Route("[controller]")]public class ProductController : ControllerBase{private static readonly IList<Product> Products = new List<Product>{new Product { Id = "0001", Name = "电动牙刷A", Price = 99.90M, Introduction = "暂无介绍" },new Product { Id = "0002", Name = "电动牙刷B", Price = 199.90M, Introduction = "暂无介绍" },new Product { Id = "0003", Name = "洗衣机A", Price = 2999.90M, Introduction = "暂无介绍" },new Product { Id = "0004", Name = "洗衣机B", Price = 3999.90M, Introduction = "暂无介绍" },new Product { Id = "0005", Name = "电视机A", Price = 1899.90M, Introduction = "暂无介绍" },};private readonly ICapPublisher _publisher;private readonly IMapper _mapper;public ProductController(ICapPublisher publisher, IMapper mapper){_publisher = publisher;_mapper = mapper;}[HttpGet]public IList<ProductDTO> Get(){return _mapper.Map<IList<ProductDTO>>(Products); ;}[HttpPut]public async Task<IActionResult> UpdatePrice(string id, decimal newPrice){// 业务代码var product = Products.FirstOrDefault(p => p.Id == id);product.Price = newPrice;// 发布消息await _publisher.PublishAsync("ProductPriceChanged",new ProductDTO { Id = product.Id, Name = product.Name, Price = product.Price});return NoContent();}}}
namespace EDT.Demo.Basket.API.Controllers{[ApiController][Route("[controller]")]public class BasketController : ControllerBase{private static readonly IList<MyBasketDTO> Baskets = new List<MyBasketDTO>{new MyBasketDTO { UserId = "U001", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0001", Name = "电动牙刷A", Price = 99.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0005", Name = "电视机A", Price = 1899.90M }, Count = 1 },}},new MyBasketDTO { UserId = "U002", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0002", Name = "电动牙刷B", Price = 199.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0004", Name = "洗衣机B", Price = 3999.90M }, Count = 1 },}}};[HttpGet]public IList<MyBasketDTO> Get(){return Baskets;}[NonAction][CapSubscribe("ProductPriceChanged")]public async Task RefreshBasketProductPrice(ProductDTO productDTO){if (productDTO == null)return;foreach (var basket in Baskets){foreach (var catalog in basket.Catalogs){if (catalog.Product.Id == productDTO.Id){catalog.Product.Price = productDTO.Price;break;}}}await Task.CompletedTask;}}}
测试效果


