DotNET Core uses RabbitMQ
RabbitMQ Introduction
RabbitMQ is an open source, complete and reusable enterprise messaging team based on the Advanced Message Queuing Protocol (AMQP) protocol. RabbitMQ can implement point-to-point, publish and subscribe message processing modes.
RabbitMQ is an open source AMQP implementation. The server is written in Erlang and supports Linux, windows, macOS, FreeBSD and other operating systems. It also supports many languages, such as Python, Java, Ruby, PHP, C #, JavaScript, Go, Elixir , Objective-C, Swift, etc.
RabbitMQ installation
The environment I use is ubuntu18.04,
- RabbitMq requires Erlang language support. Erlang needs to be installed before installing RabbitMq
sudo apt-get install erlang-nox - Update source
sudo apt-get update - Install RabbitMq
sudo apt-get install rabbitmq-server - Add users, password is set to admin
sudo rabbitmqctl add_user users admin - Grant permissions to added users
sudo rabbitmqctl set_user_tags users administrator - Give configuration, write, and read permissions to all resources in the virtual host in order to manage the resources in it
rabbitmqctl set_permissions -p users ‘.*’ ‘.*’ ‘.*’ - An official webm management tool (rabbitmq_management), locate the Rabbitmq installation directory and start the web console
sudo rabbitmq-plugins enable rabbitmq_management - After successful, you can enter http://localhost:15672/RabbitMq information in the browser
RabbitMQ common commands
- start up
sudo rabbitmq-server start - stop
sudo rabbitmq-server stop - Restart
sudo rabbitmq-server restart - View status
sudo rabbitmqctl status - View all users
rabbitmqctl list_users - View user permissions
rabbitmqctl list_user_permissions users - Remove user permissions
rabbitmqctl clear_permissions [-p VHostPath] users - delete users
rabbitmqctl delete_user users - Modify user password
rabbitmqctl change_password users newpassword
.NET Core uses RabbitMQ
- install-package rabbitmq.clientInstall rabbitmq.clientpackages via command or nuget
Producer realization
using System; using System.Text; using RabbitMQ.Client; namespace RabbitMQ { class MainClass { static void Main(string[] args) { Console.WriteLine("producer"); IConnectionFactory factory = new ConnectionFactory//factory object { HostName = "106.12.90.208",//IP Port = 5672,//port UserName = "admin", Password = "admin" }; IConnection con = factory.CreateConnection(); IModel channel = con.CreateModel(); string name = "demo"; channel.QueueDeclare( queue: name, durable: false, exclusive: false, autoDelete: false, arguments: null ); string str = string.Empty; do { Console.WriteLine("sending:"); str = Console.ReadLine(); byte[] body = Encoding.UTF8.GetBytes(str); channel.BasicPublish("", name, null, body); Console.WriteLine("Sending OK:" + str); }while(str.Trim().ToLower() != "exit"); con.Close(); channel.Close(); } } }
Consumer realization
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace RabbitMQ { class MainClass { static void Main(string[] args) { Console.WriteLine("Consumer"); IConnectionFactory factory = new ConnectionFactory { HostName = "106.12.90.208",//IP Port = 5672, UserName = "admin", Password = "admin" }; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); string name = "demo"; channel.QueueDeclare( queue: name, durable: false, exclusive: false, autoDelete: false, arguments: null ); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { byte[] message = ea.Body;/ Console.WriteLine("received:" + Encoding.UTF8.GetString(message)); }; channel.BasicConsume(name, true, consumer); Console.ReadKey(); channel.Dispose(); conn.Close(); } } }
Run two projects at the same time to see the effect
RabbitMQ’s Worker mode
The worker mode is actually a one-to-many mode.We define two consumers to see the effect.
By default, RabbitMQ will sequentially send messages to the next consumer. Each consumer will get an average number of messages. This method is called round-robin (polling).
However, in many cases, you don’t want the message to be evenly distributed. Instead, you need to consume more quickly and consume less and consume less. In many cases, once one of them is down, , Then the other receiver cannot receive the data that the receiver originally wanted to receive.
We modify one of the consumer codes and let it wait for 3 seconds. Stop running while waiting to see the effect
consumer.Received += (model, ea) => { Thread.Sleep(3000); byte[] message = ea.Body; Console.WriteLine("received:" + Encoding.UTF8.GetString(message)); };
When the consumer went down, consumer 1 did not accept the data after the downtime. So we need message confirmation to solve this problem.
RabbitMQ message acknowledgement
There are two message confirmation modes in Rabbit
- Automatic mode-As long as the message is obtained from the queue, regardless of whether the consumer successfully consumed the message after receiving it, it is considered that the message was successfully consumed.
- Manual mode-After the consumer gets a message from the queue, the server will put the message in an unavailable state and wait for consumer feedback. If the consumer experiences an exception during the consumption process and does not send a response after disconnecting, RabbitMQ will re-deliver the message.
Modify two consumer codes and delay confirmation in one of them.
consumer.Received += (model, ea) => { byte[] message = ea.Body; Console.WriteLine("received:" + Encoding.UTF8.GetString(message)); Thread.Sleep(3000); channel.BasicAck(ea.DeliveryTag, true); }; ////autoAck=false clase autoAck channel.BasicConsume(name, false, consumer);
RabbitMQ will re-deliver unconfirmed messages if the consumer disconnects during the delay
‘Able to do more’ mode
Being able to do more is to give more information to consumers who consume faster. Less responsibility is to consume less news. Being able to do more is based on manual confirmation.
Add BasicQos to deferred confirmations
channel.QueueDeclare( queue: name, durable: false, exclusive: false, autoDelete: false,. arguments: null ); //send only one message before ack channel.BasicQos(0,1,false);
It can be seen that consumers who consume fast have received more news.
Exchange mode
In RabbitMQ’s Exchange mode, producers do not send messages directly to Queues, but instead send messages to Exchange (switches) .Consumers create their own queues and bind them to switches.
Publish and Subscribe Model (fanout)
Producer implementation, replace the queue with a switch, tell RabbitMQ the switch name when publishing the message, and set the switch to fanout publish and subscribe mode
using System; using System.Text; using RabbitMQ.Client; namespace RabbitMQ { class MainClass { static void Main(string[] args) { Console.WriteLine("producer"); IConnectionFactory factory = new ConnectionFactory { HostName = "106.12.90.208", Port = 5672, UserName = "admin", Password = "admin" }; IConnection con = factory.CreateConnection(); IModel channel = con.CreateModel(); sstring exchangeName = "exchange1"; channel.ExchangeDeclare(name, type: "fanout"); string str; do { str = Console.ReadLine(); byte[] body = Encoding.UTF8.GetBytes(str); channel.BasicPublish(exchangeName, "", null, body); }while(str.Trim().ToLower() != "exit"); con.Close(); channel.Close(); } } }
Consumer realization
using System; using System.Text; using System.Threading; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace mq { class MainClass { static void Main(string[] args) { IConnectionFactory factory = new ConnectionFactory { HostName = "106.12.90.208", Port = 5672, UserName = "admin", Password = "admin" }; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); string exchangeName = "exchange1"; channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout); string queueName = DateTime.Now.Second.ToString(); channel.QueueDeclare(queueName, false, false, false, null); channel.QueueBind(queueName, exchangeName, "", null); var consumer = new EventingBasicConsumer(channel); Console.WriteLine($"Queue:{queueName}"); //Event consumer.Received += (model, ea) => { byte[] message = ea.Body;/ Console.WriteLine($"Received:{Encoding.UTF8.GetString(message)}"); //Confirm channel.BasicAck(ea.DeliveryTag, true); }; channel.BasicConsume(queueName, false, consumer); Console.ReadKey(); } } }
When consumers are bound to the same switch, you can see that two different consumers can receive all messages sent by the producer.
Route Mode (Direct Exchange)
In routing mode, different routeKeys are specified when publishing messages, and switches will distribute messages to different queues based on different routeKeys.
Producer realization
using System; using System.Text; using RabbitMQ.Client; namespace RabbitMQ { class MainClass { static void Main(string[] args) { Console.WriteLine("Producer"); IConnectionFactory factory = new ConnectionFactory { HostName = "106.12.90.208", Port = 5672, UserName = "admin", Password = "admin" }; IConnection con = factory.CreateConnection(); IModel channel = con.CreateModel(); string exchangeName = "exchange1"; string routeKey = "key1"; , channel.ExchangeDeclare(exchangeName, ExchangeType.Direct); string str; do { str = Console.ReadLine(); byte[] body = Encoding.UTF8.GetBytes(str); channel.BasicPublish(exchangeName, routeKey, null, body); }while(str.Trim().ToLower() != "exit"); con.Close(); channel.Close(); } } }
Affirmed that a routeKey value was key1, and told RabbitMQ when the message was published that the routeKey must match when the message was passed before it was received by the queue or the message would be discarded.
Consumer realization
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace mq { class MainClass { static void Main(string[] args) { Console.WriteLine($"input key:"); string routeKey = Console.ReadLine(); IConnectionFactory factory = new ConnectionFactory { HostName = "106.12.90.208", Port = 5672, UserName = "admin", Password = "admin" }; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); string exchangeName = "exchange11"; channel.ExchangeDeclare(exchangeName, ExchangeType.Direct); string queueName = DateTime.Now.Second.ToString(); channel.QueueDeclare(queueName, false, false, false, null); channel.QueueBind(queueName, exchangeName, routeKey, null); var consumer = new EventingBasicConsumer(channel); Console.WriteLine($"Queue:{queueName}"); consumer.Received += (model, ea) => { byte[] message = ea.Body; Console.WriteLine($"Received:{Encoding.UTF8.GetString(message)}"); //confirm channel.BasicAck(ea.DeliveryTag, true); }; channel.BasicConsume(queueName, false, consumer); Console.ReadKey(); } } }
The message will only be received when the routeKey matches. The receiver message queue can declare multiple routeKeys to be bound to the switch.
If the routeKey does not match, no message is received.
Wildcard pattern (Topic Exchange)
The wildcard pattern is similar to the routing pattern, and the routes different from the match pattern can be declared as fuzzy queries.
The symbol “#” matches one or more words. The
symbol “*” matches a word.
Wildcards in RabbitMQ use “.” To separate strings. For example, a. * Can only match ab, ac, while a. # Can match aac, aab
Generator implementation
using System; using System.Text; using RabbitMQ.Client; namespace RabbitMQ { class MainClass { static void Main(string[] args) { Console.WriteLine("Producer"); IConnectionFactory factory = new ConnectionFactory { HostName = "106.12.90.208", Port = 5672, UserName = "admin", Password = "admin" }; IConnection con = factory.CreateConnection(); IModel channel = con.CreateModel(); string exchangeName = "exchange114"; string routeKey = "key.a"; channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); string str; do { str = Console.ReadLine(); byte[] body = Encoding.UTF8.GetBytes(str); channel.BasicPublish(exchangeName, routeKey, null, body); }while(str.Trim().ToLower() != "exit"); con.Close(); channel.Close(); } } }
Consumer realization
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace mq { class MainClass { static void Main(string[] args) { Console.WriteLine($"input key name:"); string routeKey = "key.*"; IConnectionFactory factory = new ConnectionFactory { HostName = "106.12.90.208", Port = 5672, UserName = "admin", Password = "admin" }; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); string exchangeName = "exchange114"; channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); string queueName = DateTime.Now.Second.ToString(); channel.QueueDeclare(queueName, false, false, false, null); channel.QueueBind(queueName, exchangeName, routeKey, null); var consumer = new EventingBasicConsumer(channel); Console.WriteLine($"Queue:{queueName}"); //Event consumer.Received += (model, ea) => { byte[] message = ea.Body; Console.WriteLine($"Received:{Encoding.UTF8.GetString(message)}"); channel.BasicAck(ea.DeliveryTag, true); }; channel.BasicConsume(queueName, false, consumer); Console.ReadKey(); } } }
Receive messages only if wildcard matches pass,
Orignal link:https://www.cnblogs.com/stutlh/p/12017453.html