RabbitMQ是当前最流行的消息队列之一,也是面试官常考的基本功面试题之一,前一段时间小白面试的几家公司都问了RabbitMQ的相关问题,小白虽然使用过,但是也仅限于消息的订阅。本文将讲述RabbitMQ最简入门。
一、RabbitMQ是做啥的?
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件,RabbitMQ 是一个消息中间件:它接收消息并且转发,就类似于一个快递站,卖家把快递通过快递站,送到我们的手上,MQ也是这样,接收并存储消息,再转发。.
用它干啥呢?主要有三个方面:
1、流量消峰:这个是最常用的功能,比如遇到双十一秒杀活动,突然来了几千万订单,如果直接访问服务器或者数据库估计支撑不了10秒,有了消息队列之后我们就可以先进入消息队列慢慢的处理并转发请求,当达到一定的量后就可以限制访问,订阅者处理完成后再返回给客户端,虽然有点慢,但比报错强吧。这时使用消息中间件采用队列的形式减少突然访问的压力。
2、应用解耦:主要是系统各个模块之间的解耦,在多个模块之间的耦合情况下不能因为某个模块有问题而整个流程结束。比如学生选课系统,选课后如果耦合会调用支付系统和通知系统,如果这两个任意一个模块出问题就会影响整个系统的运行,这时候MQ就派上用场了。我们可以把报名的处理交给消息队列,由消息队列对接支付系统和通知系统,而不影响整个流程,从而达到系统的解耦。
3、异步处理:异步的操作传统的做法是,前端发送异步请求,后端处理完成后返回,假如是一个很长时间的请求,那么前端怎么知道后端处理完成呢,无非是后端写个API,前端间隔一段时间去调用服务端的API是否处理完成。这种方法不是很完美,使用RabbitMQ的消息总线可以很方便的解决这个问题,服务端处理完成后直接,会发送一条消息给 MQ,MQ 会将此消息转发给客户端,就不用循环访问。
二、环境安装
讲完理论再多也不如实践一次,使用MQ首先得搭建环境,在windows下安装需要首先安装Eralng,因为RabbitMQ是基于Eralng语言开发的,然后再官网下载RabbitMQ。
-
安装Erlang运行环境:https://www.erlang.org/downloads
-
安装RabbitMQ :https://www.rabbitmq.com/install-windows.html
在linux下有两种方式,一是直接下载安装,也同样分两步骤;二是在docker安装,这种方式很简单,三行代码解决问题,这里我们使用docker的方式安装,命令如下:
//docker 安装RabbitMQ
docker search rabbitmq //搜索
docker pull rabbitmq: management //拉取镜像
//*# 创建容器并运行(15672是管理界面的端口,5672是服务的端口。这里顺便将管理系统的用户名和密码设置为admin admin)
docker run - dit--name rabbitmq -e RABBITMQ_DEFAULT_USER = admin - e RABBITMQ_DEFAULT_PASS = admin - p 15672:15672 - p 5672:5672 rabbitmq: management
安装成功
输入admin账号登录后的效果
看到这个界面,这可以说明安装成功了。接下来就是使用它了。
三、在.NET7中使用RabbitMQ
RabbitMQ有几种消费模型,主要分类是基本消费模型,Work消费模型,订阅消费模型。由于篇幅原因我们这里主要介绍基本消费模型。
基本消费模型:它的角色是一个消息代理,它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。它接受的是存储和转发数据消息的二进制数据块。
它分为生产者和消费者,生产者是发送消息的应用,而消费者是一个主要用来等待接收消息的用户应用程序。下面案例就是基于基本消费模型实现。
1、首先创建两个.NET7控制台程序,一个是生产者,一个是消费者。主要用来模拟消息队列。
2、添加RabbiMQ驱动
通过Nuget的方式分别向两个项目添加RabbitMQ.Client包。也可以使用nuget命令模式添加。
Install-Package RabbitMQ.Client
3、编写生产者代码,模拟请求
using RabbitMQ.Client;
using System.Text;
Console.WriteLine("生产者, FirstMQ!");
//调用
Send();
static void Send()
{
//创建Factory连接请求 下面这块可以新建一个类或者方法。这里为了更直观
ConnectionFactory conFactory = new ConnectionFactory
{
UserName = "test",
Password = "123456",
HostName= "192.168.240.160",
// Port = 15672;//如果是默认不用设置
DispatchConsumersAsync = false//是否异步请求
};
//模拟请求10000次 处理
int[] intlist = new int[10000];
for (int i = 0; i < 10000; i++)
{
intlist[i] = i;
}
var NameKey = "MQfirst";//名称
var msg = "";//记录总的消费
using (var connection = conFactory.CreateConnection())//创建链接
{
using (var channel = connection.CreateModel())//创建信道,通信管道
{
//创建队列,参数1队列名称,参数2是否持久化, 队列的声明默认是存放到内存中的
//参数3队列中的数据消费完成后是否自动删除队列,参数4是否排外的队列,参数5是否等待服务器返回
channel.QueueDeclare(NameKey, false, false, false, null);
//没绑定交换机,使用rabbitmq默认交换机
foreach (var row in intlist)
{
string message = row.ToString();
var body = Encoding.UTF8.GetBytes(message);
msg += message + ",";
//发布消息队列 参数1交换器名称这里为空,参数2路由名称,参数3发送消息附带属性,参数4就是要处理数据
channel.BasicPublish("", NameKey, null, body);//
Console.WriteLine(row);
Thread.Sleep(100);
}
}
}
Console.WriteLine(msg);
}
运行代码之后可以在RabbitMQ后台看到产生的队列,显示总共65个,如下图:
处理流程
a、首先我们打开一个连接和创建一个通道。
b、然后通过channel.QueueDeclare声明一个队列。
c、接下来通过channel.BasicPublish来发布队列
详细说明已经在代码中体现了。
4、编写消费者代码,模拟消费队列
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
Console.WriteLine("消费者调用!");
Consumption();
Console.ReadLine();
static void Consumption()
{
//创建Factory连接请求 下面这块可以新建一个类或者方法。这里为了更直观
ConnectionFactory conFactory = new ConnectionFactory
{
UserName = "test",
Password = "123456",
HostName = "192.168.240.160",
// Port = 15672;//如果是默认不用设置
DispatchConsumersAsync = false//是否异步请求
};
var NameKey = "MQfirst";
using (var connection = conFactory.CreateConnection())
{
using (var channel = connection.CreateModel())//创建信道,通信管道
{
//声明一个队列
channel.QueueDeclare(NameKey, false, false, false, null);
int count = 1;
while (true)//循环作用便于测试用
{
//绑定交换机
var consumer = new EventingBasicConsumer(channel);
//设置消费者权重 参数一是可接收消息的大小的
//第二个参数是处理消息最大的数量
//第三个参数则设置了是不是针对整个Connection的,如果是false则说明只是针对于这个Channel
channel.BasicQos(0, 1, false);
consumer.Received += (model, ea) =>
{
var message = "";
message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine(message);
count++;
//确认该消息已被消费
channel.BasicAck(ea.DeliveryTag, false);
};
// BasicConsume方法会将信道(Channel)置为接收模式,直到取消队列的订阅
channel.BasicConsume(NameKey, false, consumer);
Thread.Sleep(1000);
}
}
}
}
处理流程是:
a、首先我们打开一个连接和一个通道,并声明我们将要消费的队列,跟生产者相同。这里必须与生产者发布到的队列相匹配。
b、然后通过EventingBasicConsumer.Received事件处理的回调方法告诉服务器从队列中传递消息给我们,这里使用了循环来捕获消息。
c、channel.BasicAck这里是消费者确认接收到的消息,RabbitMQ会从队列中删除相应已经被确认消费的消息
d、BasicConsume方法会将信道(Channel)置为接收模式,RabbitMQ会不断地推送消息给消费者,直到取消队列的订阅。
说明已经在代码中体现了。
5、测试效果
上面还已经启用生产者,这时候我们再启用消费者模块,然后再看看队列的运行情况。如下图:
从后台可以看出,队列里的任务已经完成。
这样我们简单的实现了一次“基本消费模型”,具体在项目中的实现还需要重构和优化,由于篇幅关系,这里就不多讲。
结语
本文讲述了在.NET 7中使用RabbitMQ的简单实现,RabbitMQ是流行的消息队列中间件之一,它的核心功能有死信队列、延迟队列,感兴趣可以研究一下。它的优点是性能高,可持久化,高并发,对错误的处理可以不断重试等。希望本文对您有所帮助,水平有限,欢迎斧正。