RabbitMQ Introduction

RabbitMQ is an open source, complete and reusable enterprise messaging team based on the Advanced Message Queuing Protocol (AMQP) protocol. RabbitMQ can implement point-to-point, publish and subscribe message processing modes.

RabbitMQ is an open source AMQP implementation. The server is written in Erlang and supports Linux, windows, macOS, FreeBSD and other operating systems. It also supports many languages, such as Python, Java, Ruby, PHP, C #, JavaScript, Go, Elixir , Objective-C, Swift, etc.

RabbitMQ installation

The environment I use is ubuntu18.04,

  • RabbitMq requires Erlang language support. Erlang needs to be installed before installing RabbitMq
    sudo apt-get install erlang-nox
  • Update source
    sudo apt-get update
  • Install RabbitMq
    sudo apt-get install rabbitmq-server
  • Add users, password is set to admin
    sudo rabbitmqctl add_user  users  admin
  • Grant permissions to added users
    sudo rabbitmqctl set_user_tags users administrator
  • Give configuration, write, and read permissions to all resources in the virtual host in order to manage the resources in it
    rabbitmqctl set_permissions -p users ‘.*’ ‘.*’ ‘.*’
  • An official webm management tool (rabbitmq_management), locate the Rabbitmq installation directory and start the web console
    sudo  rabbitmq-plugins enable rabbitmq_management
  • After successful, you can enter http://localhost:15672/RabbitMq information in the browser
    DotNET_Core_uses_RabbitMQ_0.png

    RabbitMQ common commands

  • start up
    sudo rabbitmq-server start
  • stop
    sudo rabbitmq-server stop
  • Restart
    sudo rabbitmq-server restart
  • View status
    sudo rabbitmqctl status
  • View all users
    rabbitmqctl list_users
  • View user permissions
    rabbitmqctl list_user_permissions users
  • Remove user permissions
    rabbitmqctl clear_permissions [-p VHostPath] users
  • delete users
    rabbitmqctl delete_user users
  • Modify user password
    rabbitmqctl change_password users newpassword

.NET Core uses RabbitMQ

  • install-package rabbitmq.clientInstall rabbitmq.clientpackages via command or nuget

Producer realization

using System;
using System.Text;
using RabbitMQ.Client;

namespace RabbitMQ
{
    class MainClass
    {
        static void Main(string[] args)
        {
            Console.WriteLine("producer");
            IConnectionFactory factory = new ConnectionFactory//factory object
            {
                HostName = "106.12.90.208",//IP
                Port = 5672,//port
                UserName = "admin",
                Password = "admin"
            };
            IConnection con = factory.CreateConnection();
            IModel channel = con.CreateModel();
            string name = "demo";
            
            channel.QueueDeclare(
              queue: name,
              durable: false,
              exclusive: false,
              autoDelete: false,
              arguments: null 
               );
            string str = string.Empty;
            do
            {
                Console.WriteLine("sending:");
                str = Console.ReadLine();
                
                byte[] body = Encoding.UTF8.GetBytes(str);
                
                channel.BasicPublish("", name, null, body);
                Console.WriteLine("Sending OK:" + str);
            }while(str.Trim().ToLower() != "exit");
            con.Close();
            channel.Close();
        }
    }
}

Consumer realization

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RabbitMQ
{
    class MainClass
    {

        static void Main(string[] args)
        {
            Console.WriteLine("Consumer");
            IConnectionFactory factory = new ConnectionFactory
            {
                HostName = "106.12.90.208",//IP
                Port = 5672,
                UserName = "admin",
                Password = "admin"
            };
            IConnection conn = factory.CreateConnection();
            IModel channel = conn.CreateModel();
            string name = "demo";
            
            channel.QueueDeclare(
              queue: name,
              durable: false,
              exclusive: false,
              autoDelete: false,
              arguments: null 
               );

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                byte[] message = ea.Body;/
                        Console.WriteLine("received:" + Encoding.UTF8.GetString(message));
            };
            
            channel.BasicConsume(name, true, consumer);
            Console.ReadKey();
            channel.Dispose();
            conn.Close();
        }

    }
}

Run two projects at the same time to see the effect

RabbitMQ’s Worker mode

The worker mode is actually a one-to-many mode.We define two consumers to see the effect.

 

By default, RabbitMQ will sequentially send messages to the next consumer. Each consumer will get an average number of messages. This method is called round-robin (polling).
However, in many cases, you don’t want the message to be evenly distributed. Instead, you need to consume more quickly and consume less and consume less. In many cases, once one of them is down, , Then the other receiver cannot receive the data that the receiver originally wanted to receive.
We modify one of the consumer codes and let it wait for 3 seconds. Stop running while waiting to see the effect

consumer.Received += (model, ea) =>
{
    Thread.Sleep(3000);
    byte[] message = ea.Body;
    Console.WriteLine("received:" + Encoding.UTF8.GetString(message));
};

 

When the consumer went down, consumer 1 did not accept the data after the downtime. So we need message confirmation to solve this problem.

RabbitMQ message acknowledgement

There are two message confirmation modes in Rabbit

  • Automatic mode-As long as the message is obtained from the queue, regardless of whether the consumer successfully consumed the message after receiving it, it is considered that the message was successfully consumed.
  • Manual mode-After the consumer gets a message from the queue, the server will put the message in an unavailable state and wait for consumer feedback. If the consumer experiences an exception during the consumption process and does not send a response after disconnecting, RabbitMQ will re-deliver the message.

Modify two consumer codes and delay confirmation in one of them.

consumer.Received += (model, ea) =>
{
    byte[] message = ea.Body;
    Console.WriteLine("received:" + Encoding.UTF8.GetString(message));
    Thread.Sleep(3000); 
    channel.BasicAck(ea.DeliveryTag, true);
};
////autoAck=false clase autoAck
channel.BasicConsume(name, false, consumer);

 

RabbitMQ will re-deliver unconfirmed messages if the consumer disconnects during the delay

‘Able to do more’ mode

Being able to do more is to give more information to consumers who consume faster. Less responsibility is to consume less news. Being able to do more is based on manual confirmation.
Add BasicQos to deferred confirmations

channel.QueueDeclare(
    queue: name,
    durable: false,
    exclusive: false,
    autoDelete: false,.
    arguments: null 
    );
//send only one message before ack
channel.BasicQos(0,1,false);

 

It can be seen that consumers who consume fast have received more news.

Exchange mode

In RabbitMQ’s Exchange mode, producers do not send messages directly to Queues, but instead send messages to Exchange (switches) .Consumers create their own queues and bind them to switches.

Publish and Subscribe Model (fanout)

DotNET_Core_uses_RabbitMQ_6.png

Producer implementation, replace the queue with a switch, tell RabbitMQ the switch name when publishing the message, and set the switch to fanout publish and subscribe mode

using System;
using System.Text;
using RabbitMQ.Client;

namespace RabbitMQ
{
    class MainClass
    {
        static void Main(string[] args)
        {
            Console.WriteLine("producer");
            IConnectionFactory factory = new ConnectionFactory
            {
                HostName = "106.12.90.208",
                Port = 5672,
                UserName = "admin",
                Password = "admin"
            };
            IConnection con = factory.CreateConnection();
            IModel channel = con.CreateModel();
            sstring exchangeName = "exchange1";
            
            channel.ExchangeDeclare(name, type: "fanout");
            string str;
            do
            {
                str = Console.ReadLine();
                
                byte[] body = Encoding.UTF8.GetBytes(str);
                
                channel.BasicPublish(exchangeName, "", null, body);
            }while(str.Trim().ToLower() != "exit");
            con.Close();
            channel.Close();
        }
    }
}

Consumer realization

using System;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace mq
{
    class MainClass
    {

        static void Main(string[] args)
        {
            IConnectionFactory factory = new ConnectionFactory
            {
                HostName = "106.12.90.208",
                Port = 5672,
                UserName = "admin",
                Password = "admin"
            };
            IConnection conn = factory.CreateConnection();
            IModel channel = conn.CreateModel();
            
            string exchangeName = "exchange1";
            
            channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
            
            string queueName = DateTime.Now.Second.ToString();
            
            channel.QueueDeclare(queueName, false, false, false, null);
            
            channel.QueueBind(queueName, exchangeName, "", null);
            
            var consumer = new EventingBasicConsumer(channel);
            Console.WriteLine($"Queue:{queueName}");
            //Event
            consumer.Received += (model, ea) =>
            {
                byte[] message = ea.Body;/
                Console.WriteLine($"Received:{Encoding.UTF8.GetString(message)}");
                //Confirm
                channel.BasicAck(ea.DeliveryTag, true);
            };
           
            channel.BasicConsume(queueName, false, consumer);
            Console.ReadKey();
        }
    }
}

 

When consumers are bound to the same switch, you can see that two different consumers can receive all messages sent by the producer.

Route Mode (Direct Exchange)

DotNET_Core_uses_RabbitMQ_8.png

In routing mode, different routeKeys are specified when publishing messages, and switches will distribute messages to different queues based on different routeKeys.

Producer realization

using System;
using System.Text;
using RabbitMQ.Client;

namespace RabbitMQ
{
    class MainClass
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Producer");
            IConnectionFactory factory = new ConnectionFactory
            {
                HostName = "106.12.90.208",
                Port = 5672,
                UserName = "admin",
                Password = "admin"
            };
            IConnection con = factory.CreateConnection();
            IModel channel = con.CreateModel();
            string exchangeName = "exchange1"; 
            string routeKey = "key1"; ,
            
            channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
            string str;
            do
            {
                str = Console.ReadLine();
                
                byte[] body = Encoding.UTF8.GetBytes(str);
               
                channel.BasicPublish(exchangeName, routeKey, null, body);
            }while(str.Trim().ToLower() != "exit");
            con.Close();
            channel.Close();
        }

    }
}

Affirmed that a routeKey value was key1, and told RabbitMQ when the message was published that the routeKey must match when the message was passed before it was received by the queue or the message would be discarded.

Consumer realization

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace mq
{
    class MainClass
    {

        static void Main(string[] args)
        {
            Console.WriteLine($"input key:");
            string routeKey = Console.ReadLine();
            IConnectionFactory factory = new ConnectionFactory
            {
                HostName = "106.12.90.208",
                Port = 5672,
                UserName = "admin",
                Password = "admin"
            };
            IConnection conn = factory.CreateConnection();
            IModel channel = conn.CreateModel();
            
            string exchangeName = "exchange11";
            
            channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
            
            string queueName = DateTime.Now.Second.ToString();
            
            channel.QueueDeclare(queueName, false, false, false, null);
           
            channel.QueueBind(queueName, exchangeName, routeKey, null);
            
            var consumer = new EventingBasicConsumer(channel);
            Console.WriteLine($"Queue:{queueName}");
            
            consumer.Received += (model, ea) =>
            {
                byte[] message = ea.Body;
                Console.WriteLine($"Received:{Encoding.UTF8.GetString(message)}");
                //confirm
                channel.BasicAck(ea.DeliveryTag, true);
            };
           
            channel.BasicConsume(queueName, false, consumer);
            Console.ReadKey();
        }
    }

}

 

The message will only be received when the routeKey matches. The receiver message queue can declare multiple routeKeys to be bound to the switch.

 

If the routeKey does not match, no message is received.

Wildcard pattern (Topic Exchange)

The wildcard pattern is similar to the routing pattern, and the routes different from the match pattern can be declared as fuzzy queries.

The symbol “#” matches one or more words. The
symbol “*” matches a word.

Wildcards in RabbitMQ use “.” To separate strings. For example, a. * Can only match ab, ac, while a. # Can match aac, aab

Generator implementation

using System;
using System.Text;
using RabbitMQ.Client;

namespace RabbitMQ
{
    class MainClass
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Producer");
            IConnectionFactory factory = new ConnectionFactory
            {
                HostName = "106.12.90.208",
                Port = 5672,
                UserName = "admin",
                Password = "admin"
            };
            IConnection con = factory.CreateConnection();
            IModel channel = con.CreateModel();
            string exchangeName = "exchange114"; 
            string routeKey = "key.a"; 
            
            channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
            string str;
            do
            {
                str = Console.ReadLine();
                
                byte[] body = Encoding.UTF8.GetBytes(str);
               
                channel.BasicPublish(exchangeName, routeKey, null, body);
            }while(str.Trim().ToLower() != "exit");
            con.Close();
            channel.Close();
        }

    }
}

Consumer realization

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace mq
{
    class MainClass
    {

        static void Main(string[] args)
        {
            Console.WriteLine($"input key name:");
            string routeKey = "key.*"; 
            IConnectionFactory factory = new ConnectionFactory
            {
                HostName = "106.12.90.208",
                Port = 5672,
                UserName = "admin",
                Password = "admin"
            };
            IConnection conn = factory.CreateConnection();
            IModel channel = conn.CreateModel();
            
            string exchangeName = "exchange114";
            
            channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
            
            string queueName = DateTime.Now.Second.ToString();
            
            channel.QueueDeclare(queueName, false, false, false, null);
            
            channel.QueueBind(queueName, exchangeName, routeKey, null);
            
            var consumer = new EventingBasicConsumer(channel);
            Console.WriteLine($"Queue:{queueName}");
            //Event
            consumer.Received += (model, ea) =>
            {
                byte[] message = ea.Body;
                Console.WriteLine($"Received:{Encoding.UTF8.GetString(message)}");
               
                channel.BasicAck(ea.DeliveryTag, true);
            };
            
            channel.BasicConsume(queueName, false, consumer);
            Console.ReadKey();
        }
    }

}

Receive messages only if wildcard matches pass,

Orignal link:https://www.cnblogs.com/stutlh/p/12017453.html