RabbitMQ系列文章之C#利用RabbitMQ实现消息订阅与发布

在消息队列模型中,如何将消息广播到所有的消费者,这种模式成为“发布/订阅”。本文主要以一个简单的小例子,简述通过fanout交换机,实现消息的发布与订阅,仅供学习分享使用,如有不足之处,还请指正。.

RabbitMQ系列文章之C#利用RabbitMQ实现消息订阅与发布

Fanout交换机模型


扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

RabbitMQ系列文章之C#利用RabbitMQ实现消息订阅与发布

RabbitMQ控制台操作


1. 新增两个队列

在同一个Virtual host下新增两个队列Q1,Q2,如下图所示:

RabbitMQ系列文章之C#利用RabbitMQ实现消息订阅与发布

2. 绑定fanout交换机

将两个队列绑定到系统默认的fanout交换机,如下所示:

RabbitMQ系列文章之C#利用RabbitMQ实现消息订阅与发布

示例效果图


生产者,采用Fanout类型交换机发布消息,如下图所示:

RabbitMQ系列文章之C#利用RabbitMQ实现消息订阅与发布

 当生产者发布 一条消息时,Q1,Q2两个队列均会收到,如下图所示:

RabbitMQ系列文章之C#利用RabbitMQ实现消息订阅与发布

当启动消费者后,两个消费者,均会订阅到相关消息,如下图所示:

RabbitMQ系列文章之C#利用RabbitMQ实现消息订阅与发布

核心代码


1. 消息发布

建立连接后,将通道声明类型为Fanout的交换机,如下所示:

/// <summary>/// fanout类型交换机,发送消息/// </summary>public class RabbitMqFanoutSendHelper : RabbitMqHelper {  /// <summary>  /// 发送消息  /// </summary>  /// <param name="msg"></param>  /// <returns></returns>  public bool SendMsg(string msg)  {    try    {      using (var conn = GetConnection("/Alan.hsiang"))      {        using (var channel = conn.CreateModel())        {          channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);          var body = Encoding.UTF8.GetBytes(msg);          channel.BasicPublish(exchange: "amq.fanout",                     routingKey: "",                     basicProperties: null,                     body: body);          //Console.WriteLine(" [x] Sent {0}", message);        };      };      return true;    }    catch (Exception ex)    {      throw ex;    }  }}

2. 消息订阅

建立连接后,通道声明类型为Fanout的交换机,并绑定队列进行订阅,如下所示:

/// <summary>/// 扇形交换机接收消息/// </summary>public class RabbitMqFanoutReceiveHelper : RabbitMqHelper{  public RabbitMqReceiveEventHandler OnReceiveEvent;  private IConnection conn;  private IModel channel;  private EventingBasicConsumer consumer;  public bool StartReceiveMsg(string queueName)  {    try    {      conn = GetConnection("/Alan.hsiang");      channel = conn.CreateModel();      channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);      //此处随机取出交换机下的队列      //var queueName = channel.QueueDeclare().QueueName;      channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");      consumer = new EventingBasicConsumer(channel);      consumer.Received += (model, ea) =>      {        var body = ea.Body.ToArray();        var message = Encoding.UTF8.GetString(body);        //Console.WriteLine(" [x] Received {0}", message);        if (OnReceiveEvent != null)        {          OnReceiveEvent(queueName+"::"+message);        }      };      channel.BasicConsume(queue: queueName,                  autoAck: true,                  consumer: consumer);      return true;    }    catch (Exception ex)    {      throw ex;    }  }}

至此,关于RabbitMQ系列文章已经全部完成。关于RabbitMQ的基础知识介绍,可参考前几篇博文。