First, the introduction

RabbitMQ is a complete, reusable enterprise messaging system based on AMQP, following the Mozilla Public License open source protocol.
MQ stands for Message Queue, which is an application-to-application communication method.
Applications communicate by reading messages written to the queue (data for the application) without a dedicated connection to link them.
Messaging refers to the communication between programs by sending data communications in messages rather than directly calling each other, directly calling techniques that are typically used for procedures such as remote procedures.
Queuing refers to the application communicating through a queue.
The use of queues removes the requirement to receive and send applications simultaneously.

Application scenario:

1, system integration, design of distributed systems.
Various subsystems are connected through messages, and this solution has gradually evolved into an architectural style, namely ‘architecture through messaging’

2. When the synchronization processing method in the system seriously affects the throughput, such as logging.
If you need to record all the user behavior logs in the system, if you log the logs in a synchronous manner, it will affect the response speed of the system. When we send the log messages to the message queue, the subsystem that records the logs will get the logs asynchronously. Message.

3, the system’s high availability, such as the e-commerce spike scenario, when a certain time application server or database server receives a large number of requests, there will be system downtime.
If the request can be forwarded to the message queue, then the server will get the message, which will make the request smooth and improve the system availability.

Second, download and install

1, install erlang, download on the official website, and then click the next step to install

2, install RabbitMQ, also official website download, direct installation

3, configuration

Use cmd to enter the RabbitMQ Server\rabbitmg_server-3.6.5\sbin directory and type: rabbitmg-plugins enable rabbitmg-management, so that it is configured.
At this point we can open cmd as an administrator, enter: net start rabbitmq to start the service; enter: net stop rabbitmq to close the service

Third, RabbitMQ simple mode

In the process of use, there are always three parts, one is the producer, the other is the consumer, the third is the RabbitMQ Server (which runs on a server), the producer puts the data into the message queue, and consumes The data is taken from the message queue.
We implemented it in Python, so we have to install a pika module to help us connect to the queue.

1, the basic code

Producer, producer.py

Import the PIKA
 # connection RabbitMQ 
Connection = pika.BlockingConnection (pika.ConnectionParameters (Host = ' localhost ' ))
Channel = connection.channel ()
 # create a queue, the queue named 'hello', the name at random 
channel.queue_declare (Queue = ' the Hello ' )
 # to queue to add value, routing_key is we want to go 'hello' to put data queue , body means that we put the data into 'the Hello world' 
channel.basic_publish (the Exchange = '' ,
                      routing_key = ' Hello ' ,
                      Body = ' Hello World! ' ) 
#This is closing the connection
Connection.close()

Consumer, consumer.py

Import the PIKA
 # connection RabbitMQ 
Connection = pika.BlockingConnection (pika.ConnectionParameters (Host = ' localhost ' ))
Channel = connection.channel()
 #Create a queue, here is also the meaning of creating a queue, consumers and producers may decide which one to start first, so whoever starts first will be created, and when another comes in. If the queue exists, it does not create a 
channel.queue_declare (Queue = ' the Hello ' )
 # callback 
DEF callback (CH, Method,, the Properties, body):
     Print (body)
 # determine listen queue event, when the queue has a value, Will take the value and return it to the callback function 
channel.basic_consume( callback,
                       Queue = ' hello ' ,
                       No_ack = True)

Channel.start_consuming()

2, no_ack parameter

2.1 When no_ack=True is the no-answer mode, the response here means that the consumer does not respond to the queue.
In this case, the consumer takes a piece of data from the queue, and the queue immediately deletes the data. When the consumer makes a mistake in processing the data, causing the consumer to disconnect without completing the task, the consumer is It is impossible to get the same data from the queue again, which means that the data has not been processed but disappeared, so that this data will never be processed.

2.2 no-ack=false, in the answer mode, each time the consumer takes a piece of data, when the process is successful, it will give the queue a response. At this time, the queue will receive the response to delete the data; when the consumer fails to process the data, it fails to give Queue response, the queue will not delete this data, waiting for the next consumer to retrieve this data again, will delete this data when the response is received

2.3 code, this process is only a change in the relationship between the consumer and the queue, so only the code that changes the consumer can be used.

Consumer, consumer_ack.py

Import pika
Connection = pika.BlockingConnection(pika.ConnectionParameters(
        Host = ' 10.211.55.4 ' ))
Channel = connection.channel()
Channel.queue_declare(queue = ' hello ' )
 def callback(ch, method, properties, body):
     print (body) 
# Add a response to 
    ch.basic_ack(delivery_tag = method.delivery_tag)
Channel.basic_consume(callback,
                      Queue = ' hello ' ,
                      No_ack = False) #Set no-ack to False
Channel.start_consuming()

When the consumer disconnects for some reason (such as a bug) during the process, the message will not be lost. This data will be given to the next consumer who will get the data.

3, the durable parameter, that is, data persistence storage

The producer puts the data in the queue. When the consumer has not taken the data, the server where the queue is located collapses. At this time, the data in the queue disappears.
If we want to blow this situation, then only the data in the queue is stored persistently. This requires us to declare the queue.

Producer, producer_durable.py

Import pika
Connection = pika.BlockingConnection(pika.ConnectionParameters(host= ' 10.211.55.4 ' ))
Channel = connection.channel() 
# Assign durable to True, that is, let it persist to store 
channel.queue_declare(queue = ' hello ' , durable= True)
Channel.basic_publish(exchange = '' ,
                      routing_key = ' Hello ' ,
                      Body = ' Hello World! ' , 
              # here to set the mode to 2 
                      properties = pika.BasicProperties(
                          Delivery_mode = 2 ,
                      ))
Connection.close()

Consumer, consumer_durable.py

Import pika
Connection = pika.BlockingConnection(pika.ConnectionParameters(host= ' 10.211.55.4 ' ))
Channel = connection.channel() 
# durable set to True 
channel.queue_declare(queue = ' hello ' , durable= True)
 def callback(ch, method, properties, body):
     print (body)
    Ch.basic_ack(delivery_tag = method.delivery_tag)
Channel.basic_consume(callback,
                      Queue = ' hello ' ,
                      No_ack = False)
Channel.start_consuming()

rabbitMQ server down, data is not lost

4, the order of message acquisition

The data of the queue defaults to the value in order, that is, there are three consumers. If the first wave is in the order of abc, then the order is abc, no matter how fast the data is processed, for example, a is still processing data. However, b has been processed, but b still can’t take the value, you must first take the value, then b can get the value.
This form is too inefficient.

Channel.basic_qos(prefetch_count=1) After setting this parameter, it is not the value in order, but whoever comes first.
This is just a consumer related setting.

Consumer, consumer_prefetch.py

Import pika
Connection = pika.BlockingConnection(pika.ConnectionParameters(host= ' 10.211.55.4 ' ))
Channel = connection.channel()
Channel.queue_declare(queue = ' hello ' )
 def callback(ch, method, properties, body):
     print (body)
    Ch.basic_ack(delivery_tag = method.delivery_tag) #Add
 this sentence to 
channel.basic_qos(prefetch_count =1 )

Channel.basic_consume(callback,
                      Queue = ' hello ' ,
                      No_ack = False)
Channel.start_consuming()

Fourth, RabbitMQ exchange mode

1, publish subscription mode

In simple mode, one piece of data is only given to one consumer; in the publish-subscribe mode, a message is sent to all subscribed consumers.

The producer puts the message in a specified exchange, and then each consumer creates a queue that is bound to the exchange, so that the consumer can get the subscribed data.

 

announcer,

Import pika

Connection = pika.BlockingConnection(pika.ConnectionParameters(
        Host = ' 192.168.137.208 ' ))
Channel = connection.channel()
#Mode to change
Channel.exchange_declare(exchange = ' logs ' ,
                         Exchange_type = ' fanout ' )

Message = " Hello World! " 
#exchange
Channel.basic_publish(exchange = ' logs ' ,
                      Routing_key = '' ,
                      Body = message)
Connection.close()

 

subscriber

Import pika 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        Host = ' 192.168.137.208 ' ))
Channel = connection.channel()
#Mode also needs to be changed
Channel.exchange_declare(exchange = ' logs ' ,
                         Exchange_type = ' fanout ' )
 # Randomly create queue 
result = channel.queue_declare(exclusive= True) 
#Get the queue name
Queue_name = result.method.queue
 # bind the queue to exchange 
channel.queue_bind(exchange= ' logs ' ,
                   Queue = queue_name)
 def callback(ch, method, properties, body):
     print ( body)
Channel.basic_consume(callback,
                      Queue = queue_name,
                      No_ack = True)
Channel.start_consuming()

 

2, keyword mode

When the publisher publishes the message, it will contain the keyword; and the subscriber will not only bind the queue to the exchange, but also bind the keyword. When the publisher’s keyword and the bound keyword are the same, the subscription is subscribed. In order to get the message, a queue can bind multiple keywords to an exchange.

RabbitMQ-Message-Queue18

announcer

Import pika

Connection = pika.BlockingConnection(pika.ConnectionParameters(
        Host = ' localhost ' ))
Channel = connection.channel()

# Declare a switch 
channel.exchange_declare (Exchange = ' direct_logs ' , exchange_type = " Direct " )

Message = " warning: Hello World! " 
channel.basic_publish(exchange = ' direct_logs ' ,
                      Routing_key = ' warning ' , #This is the keyword of the sender to send the message 
                      body = message) 
connection.close()

 

subscriber

 

Import pika

Connection = pika.BlockingConnection(pika.ConnectionParameters(
        Host = ' localhost ' ))
Channel = connection.channel()
Channel.exchange_declare 
(exchange = ' direct_logs ' ,
                         Exchange_type = ' direct ' )

Result = channel.queue_declare(exclusive= True)
Queue_name = result.method.queue
Channel.queue_bind 
(exchange = ' direct_logs ' ,
                   Queue = queue_name,
                   Routing_key = " error " )# This is the keyword def callback(ch, method, properties, body):
     print ( body))

Channel.basic_consume(callback,
                      Queue = queue_name,
                      No_ack = True)

Channel.start_consuming()

3, fuzzy matching

This is based on keywords, but this time it is not the same, but with fuzzy matching, ‘#’ means matching 0 or more characters, ‘*’ means matching an arbitrary character.

announcer

Import pika

Connection = pika.BlockingConnection(pika.ConnectionParameters(
        Host = ' localhost ' ))
Channel = connection.channel()

# Declare a switch 
channel.exchange_declare (Exchange = ' topic_logs ' , exchange_type = " Topic " )

Message = " Hello World! " 
channel.basic_publish(exchange = ' topic_logs ' ,
                      Routing_key = ' banana.apple.xigua.juzi ' , #This is the keyword that was shipped with
                      Body = message) 
connection.close()

subscriber

Import pika
 import sys

Connection = pika.BlockingConnection(pika.ConnectionParameters(
        Host = ' localhost ' ))
Channel = connection.channel()
Channel.exchange_declare 
(exchange = ' topic_logs ' ,
                         Exchange_type = ' topic ' )

Result = channel.queue_declare(exclusive= True)
Queue_name = result.method.queue
Channel.queue_bind 
(exchange = ' topic_logs ' ,
                   Queue = queue_name,
                   Routing_key = " *.apple.# " ) 
#This is the keyword that binds the queue to the exchange, but here is the fuzzy match. If it matches, you can get the value def callback(ch, method, properties, body):
     print ( body))

Channel.basic_consume(callback,
                      Queue = queue_name,
                      No_ack = True)

Channel.start_consuming()