RabbitMQ系列文章之C#利用RabbitMQ实现点对点消息传输

RabbitMQ做为消息代理,负责接收和转发消息,可以将RabbitMQ比喻为一个邮筒、一个邮局和一个邮递员。本文主要以一个简单的小例子,简述RabbitMQ实现消息传输的相关内容,仅供学习分享使用,如有不足之处,还请指正。.

RabbitMQ系列文章之C#利用RabbitMQ实现点对点消息传输

消息队列模型


所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

RabbitMQ系列文章之C#利用RabbitMQ实现点对点消息传输

RabbitMQ设置


RabbitMQ是通过交换机将消息转发到对应队列,所以队列需要和交换机进行绑定。本例将队列绑定到默认的amq.direct交换机,并设置Routing key,如下图所示:

RabbitMQ系列文章之C#利用RabbitMQ实现点对点消息传输

RabbitMQ动态库安装


通过NuGet包管理器进行安装RabbitMQ.Client,如下所示:

RabbitMQ系列文章之C#利用RabbitMQ实现点对点消息传输

RabbitMQ.Client相关知识点


  • ConnectionFactory:构造一个实例,主要创建连接。

  • IConnection:表示一个基于AMQP协议的连接。

  • IModel:表示一个RabbitMQ通道,可用于声明一个队列,然后开始消费。

  • EventingBasicConsumer:基于独立事件监听的基础消费者,可以监听并接收消息。

  • 生产者基本步骤:1. 创建连接 2. 基于连接创建通道 3. 基于通道声明队列,4. 开始生产并发布消息

  • 消费者基本步骤:1. 创建连接 2. 基于连接创建通道 3. 基于通道声明队列,4. 创建消费者,5. 绑定通道和消费者,并开始消费

示例效果图


本例主要有一个生产者,一个消费者,通过消息队列进行消息转发和接收。生产者负责消息发送,如下图所示:

RabbitMQ系列文章之C#利用RabbitMQ实现点对点消息传输

消费者负责消息接收,如下图所示:

RabbitMQ系列文章之C#利用RabbitMQ实现点对点消息传输

核心代码


代码结构:主要包括生产者,消费者,公共基础代码,如下所示:

RabbitMQ系列文章之C#利用RabbitMQ实现点对点消息传输

RabbitMqHelper主要创建连接,如下所示:

public class RabbitMqHelper{  /// <summary>  /// 创建连接  /// </summary>  /// <returns></returns>  public IConnection GetConnection()  {    try    {      var factory = new ConnectionFactory()      {        HostName = "127.0.0.1",        Port = 5672,        UserName = "guest",        Password = "guest",        VirtualHost = "/ShortMsgHost"      };      var conn = factory.CreateConnection();      return conn;    }    catch (Exception ex) {      throw ex;    }  }}

RabbmitMqSendHelper用于发送消息,如下所示:

public class RabbmitMqSendHelper : RabbitMqHelper{  /// <summary>  /// 发送消息  /// </summary>  /// <param name="msg"></param>  /// <returns></returns>  public bool SendMsg(string msg)  {    try    {      using (var conn = GetConnection())      {        using (var channel = conn.CreateModel())        {          channel.QueueDeclare(queue: "ShortMsgQueue",                 durable: true,                 exclusive: false,                 autoDelete: false,                 arguments: null);          var body = Encoding.UTF8.GetBytes(msg);          channel.BasicPublish(exchange: "amq.direct",                     routingKey: "ShortMsgKey",                     basicProperties: null,                     body: body);          //Console.WriteLine(" [x] Sent {0}", message);        };      };      return true;    }    catch (Exception ex)    {      throw ex;    }  }}

RabbitMqReceiveHelper主要用于接收信息,如下所示:

public class RabbitMqReceiveHelper : RabbitMqHelper{  public RabbitMqReceiveEventHandler OnReceiveEvent;  private IConnection conn;  private IModel channel;  private EventingBasicConsumer consumer;  public bool StartReceiveMsg()  {    try    {      conn = GetConnection();      channel = conn.CreateModel();      channel.QueueDeclare(queue: "ShortMsgQueue",              durable: true,              exclusive: false,              autoDelete: false,              arguments: null);      consumer = new EventingBasicConsumer(channel);      consumer.Received += (model, ea) =>      {        var body = ea.Body.ToArray();        var message = Encoding.UTF8.GetString(body);        //Console.WriteLine(" [x] Received {0}", message);        if (OnReceiveEvent != null)        {          OnReceiveEvent(message);        }      };      channel.BasicConsume(queue: "ShortMsgQueue",                  autoAck: true,                  consumer: consumer);      return true;    }    catch (Exception ex)    {      throw ex;    }  }}

关于RabbitMQ的基础知识介绍,可参考前几篇博文。