ASP.NET Core知识之RabbitMQ组件使用(一)

RabbitMQ是一种越来越流行的开源,快速消息代理,它使用Erlang编写并基于Open Telecom Platform框架构建。它实现了高级消息队列协议(AMQP),用于在进程,应用程序和服务器之间交换数据。它特别具有吸引力,
因为它可以通过插件支持进行扩展,支持许多协议,并提供高性能,可靠性,集群和高可用性队列。
通过编写代码,通过管理用户界面或通过PowerShell在RabbitMQ中创建队列。.

RabbitMQ条款
使用RabbitMQ时,应注意两个术语:

  • 甲队列是一种数据结构,FIFO的基础上(在第一出第一)上工作。在这种情况下,队列是可以容纳数据的大型消息缓冲区。

  • 甲生产者是产生被推到队列一些数据的组件; 它将数据发送到队列,而使用者是使用队列中存储的数据的使用者。(生产者-消费者模式是并行编程中使用最广泛的模式之一。)

安装与设定
设置RabbitMQ非常简单。在安装RabbitMQ之前,您应该安装Erlang。根据您使用的操作系统下载正确版本的Erlang。接下来,下载并安装RabbitMQ服务器。

用C#编程RabbitMQ
已经在系统中安装了Erlang和RabbitMQ,则需要安装RabbitMQ .Net客户端以连接到RabbitMQ服务并与之一起使用。您可以通过NuGet软件包管理器安装RabbitMQ客户端。

在Visual Studio中创建一个新的控制台应用程序。接下来,使用RabbitMQ默认的交换机做演示。
通过NuGet软件包管理器安装RabbitMQ.Client软件包。假设RabbitMQ服务器在系统中本地运行,则以下代码段可用于创建与RabbitMQ服务器的连接。

ConnectionFactory connectionFactory = new ConnectionFactory();IConnection connection = connectionFactory.CreateConnection();

现在,假设RabbitMQ服务正在远程系统中运行。这是一种使您返回到RabbitMQ服务的连接实例的方法。

public IConnection GetConnection(string hostName, string userName, string password)        {            ConnectionFactory connectionFactory = new ConnectionFactory();            connectionFactory.HostName = hostName;            connectionFactory.UserName = userName;            connectionFactory.Password = password;            return connectionFactory.CreateConnection();        }

发送和接收消息

在系统本地运行RabbitMQ服务的情况下,请使用以下方法将消息发送到队列。请注意,已使用默认设置建立了与RabbitMQ服务的连接。

public static void Send(string queue, string data){            using (IConnection connection = new ConnectionFactory().CreateConnection())            {                using (IModel channel = connection.CreateModel())                {               channel.QueueDeclare(queue, false, false, false, null);        channel.BasicPublish(string.Empty, queue, null, Encoding.UTF8.GetBytes(data));                }            }        }

通道用于与服务器建立通信以发送和接收消息。使用此方法发送到队列的消息并不持久-我已将false第二个参数传递给该QueueDeclare方法。

因此,使用此方法发送的消息仅保留在内存中,并且无法在服务器重启后幸免。

以下方法说明了如何使用先前存储在队列中的数据。

public static void Receive(string queue)        {            using (IConnection connection = new ConnectionFactory().CreateConnection())            {                using (IModel channel = connection.CreateModel())                {               channel.QueueDeclare(queue, false, false, false, null);               var consumer = new EventingBasicConsumer(channel);               BasicGetResult result = channel.BasicGet(queue, true);                    if (result != null)                    {                      string data =                      Encoding.UTF8.GetString(result.Body);                        Console.WriteLine(data);                    }                }            }        }

下一个代码段显示了如何调用Send以及Receive我们在本文中创建的方法:

static void Main(string[] args){     Send("IDG","Hello World!");     Receive("IDG");     Console.ReadLine();}

Copy实验结果

ASP.NET Core知识之RabbitMQ组件使用(一)

程序代码贴图

ASP.NET Core知识之RabbitMQ组件使用(一)

public class Program{  static ConnectionFactory factory = new ConnectionFactory();  static void Main(string[] args)  {    // var factory = new ConnectionFactory();      //连接 RabbitMQ 工厂实例    factory.HostName = "127.0.0.1";                //要连接到的主机,默认为 localhost    factory.Port = 5672;                        //连接断开,默认为 -1(5672)    factory.UserName = "admin";                 //RabbitMQ 连接用户名,默认为 guest    factory.Password = "123456";                 //RabbitMQ 连接密码,默认为 guest    //循环发送    for (int i = 1; i < 11; i++)    {      Console.WriteLine(i);      SendData(i);    }    Console.ReadLine();  }  public static ConnectionFactory ConFactory()  {    var Newfactory = new ConnectionFactory();    Newfactory = factory;    return Newfactory;  }  public static void SendData(int i)  {    var factory = Program.ConFactory();    using (var connection = factory.CreateConnection())    {      //创建一个新的通道、会话和模型      using (var channel = connection.CreateModel())      {        channel.QueueDeclare("myQueue1", false, false, false, null);        var properties = channel.CreateBasicProperties();        properties.DeliveryMode = 1;        string message = $"Hello_RabbitMQ,This is the sender. Sorting {i}";  //消息内容        byte[] body = Encoding.UTF8.GetBytes(message);        channel.BasicPublish("", "myQueue1", properties, body);              //发送(生产)消息        Console.WriteLine($"Send_Messages: {message}");        //   channel.Close();      }      // connection.Close();    }  }}

接收端

ASP.NET Core知识之RabbitMQ组件使用(一)

代码如下:

class Program{  static void Main(string[] args)  {    var factory = new ConnectionFactory();      //连接 RabbitMQ 工厂实例    factory.HostName = "127.0.0.1";             //要连接到的主机,默认为 localhost    factory.Port = 5672;                        //连接断开,默认为 -1(5672)    factory.UserName = "admin";                //RabbitMQ 连接用户名,默认为 guest    factory.Password = "123456";               //RabbitMQ 连接密码,默认为 guest    //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发    var connection = factory.CreateConnection();    var channel = connection.CreateModel();    channel.QueueDeclare("myQueue1", false, false, false, null);    var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)    channel.BasicConsume("myQueue1", true, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)    //该事件在接收到消息时触发    consumer.Received += (sender, e) =>    {      byte[] body = e.Body.ToArray();   //消息字节数组      string message = Encoding.UTF8.GetString(body); //消息内容      Console.WriteLine($"Receive_Messages: {message}");    };    Console.ReadLine();    connection.Close();    channel.Close();  }}

在RabbitMQ中的持久性

RabbitMQ为持久性提供了出色的支持。您可以有两种类型的队列:持久队列和非持久队列。持久队列使消息保留在磁盘上,而非持久队列仅保留在内存中。因此,服务器重新启动后,“持久”队列中的消息可用,而“非持久”队列中的消息丢失。

您可以在三个级别上设置持久性:队列,交换和消息。

附:
RabbitMQ 用户常用命令

  • 进 RabbitMQ 命令:
    CMD: CD C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.1\sbin

  • 创建 admin 用户并指定密码:
    rabbitmqctl add_user admin adminli

  • 设置 admin 用户读写所有队列的权限:
    rabbitmqctl set_permissions admin ".*" ".*" ".*"

  • 设置 admin 用户所属的用户组:
    rabbitmqctl set_user_tags admin administrator

  • 查看用户列表:
    rabbitmqctl list_users

  • 修改 admin 用户的密码为 admin123:
    rabbitmqctl change_password admin admin123

  • 删除 admin 用户:
    rabbitmqctl delete_user admin