Location>code7788 >text

【Message Tool RabbitMQ】A brief analysis of common contents of RabbitMQ

Popularity:251 ℃/2025-03-21 10:30:51

The following is a blog about RabbitMQ, covering the implementation of the basics to dead letter queues, as well as supplements to other common knowledge points of RabbitMQ. The content is clear and the code is complete, and it is suitable for direct release.


Using RabbitMQ to implement message queues and dead letter queues: from basic to advanced

In modern distributed systems, message queues (such as RabbitMQ) are an important tool for decoupling and asynchronous communication. This article will be based on Spring Boot and RabbitMQ, and gradually implement the following functions from basic to advanced:

  1. Send a message to the queue
  2. Send a message to the switch
  3. Message reliability mechanism
    • Message confirmation mechanism (Publisher Confirms).
    • Durable Queues and Messages.
    • Manual Acknowledgement.
  4. Dead Letter Queue (DLQ): Process messages that cannot be consumed normally.

We will use a simpleUserThe object is the message content,UserClass containsnameandageField.


1. CreateUserkind

First, inservice-aandservice-bCreated inUserkind.

package ;

 import ;

 public class User implements Serializable {
     private String name;
     private int age;

     // There must be a parameter constructor
     public User() {
     }

     public User(String name, int age) {
          = name;
          = age;
     }

     // Getter and Setter
     public String getName() {
         return name;
     }

     public void setName(String name) {
          = name;
     }

     public int getAge() {
         return age;
     }

     public void setAge(int age) {
          = age;
     }

     @Override
     public String toString() {
         return "User{name='" + name + "', age=" + age + "}";
     }
 }

2. Send a message to the queue

2.1 Configure the queue

existservice-aConfigure a queue in.

package ;

 import ;
 import ;
 import ;

 @Configuration
 public class RabbitMQQueueConfig {

     @Bean
     public Queue userQueue() {
         return new Queue("userQueue", true); // The second parameter indicates persistence
     }
 }

2.2 Send a message

existservice-aSend inUserObject to queue.

package ;

import ;
import ;
import ;
import ;

@Service
public class QueueMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendUserToQueue(User user) {
        ("userQueue", user);
        ("Sent user to queue: " + user);
    }
}

2.3 Receive messages

existservice-bListen to the queue and receiveUserObject.

package ;

import ;
import ;
import ;

@Service
public class QueueMessageReceiver {

    @RabbitListener(queues = "userQueue")
    public void receiveUserFromQueue(User user) {
        ("Received user from queue: " + user);
    }
}

3. Send a message to the switch

3.1 Configuring switches and queues

existservice-aConfigure a Direct Exchange in the queue and bind the queue.

package ;

 import .*;
 import ;
 import ;

 @Configuration
 public class RabbitMQExchangeConfig {

     @Bean
     public DirectExchange userExchange() {
         return new DirectExchange("userExchange", true, false); // The second parameter indicates persistence
     }

     @Bean
     public Queue userExchangeQueue() {
         return new Queue("userExchangeQueue", true); // The second parameter indicates persistence
     }

     @Bean
     public Binding bindingUserExchangeQueue(DirectExchange userExchange, Queue userExchangeQueue) {
         return (userExchangeQueue)
                 .to(userExchange)
                 .with("");
     }
 }

3.2 Send a message

existservice-aSend inUserObject to switch.

package ;

import ;
import ;
import ;
import ;

@Service
public class ExchangeMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendUserToExchange(User user) {
        ("userExchange", "", user);
        ("Sent user to exchange: " + user);
    }
}

3.3 Receive messages

existservice-bListen to the queue and receiveUserObject.

package ;

import ;
import ;
import ;

@Service
public class ExchangeMessageReceiver {

    @RabbitListener(queues = "userExchangeQueue")
    public void receiveUserFromExchange(User user) {
        ("Received user from exchange: " + user);
    }
}

4. Message reliability

4.1 Message confirmation mechanism (Publisher Confirms)

existEnable Publisher Confirms and Returns in .

spring:
   rabbitmq:
     host: localhost
     port: 5672
     username: guest
     password: guest
     Publisher-confirm-type: correlated # Enable Publisher Confirms
     Publisher-returns: true # Enable Publisher Returns

existservice-aMedium configurationRabbitTemplateto support Publisher Confirms and Returns.

package ;

 import ;
 import ;
 import ;
 import ;

 @Configuration
 public class RabbitMQConfig {

     @Bean
     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

         // Enable Publisher Confirms and Returns
         (true);

         // Set confirmation callback
         ((correlationData, ack, cause) -> {
             if (ack) {
                 ("Message confirmed with correlation data: " + correlationData);
             } else {
                 ("Message failed with cause: " + cause);
             }
         });

         // Set the return callback
         (returned -> {
             ("Returned message: " + ());
             ("Reply code: " + ());
             ("Reply text: " + ());
             ("Exchange: " + ());
             ("Routing key: " + ());
         });

         return rabbitTemplate;
     }
 }

4.2 Message persistence

Enable persistence when configuring queues and switches.

@Bean
 public Queue userQueue() {
     return new Queue("userQueue", true); // The second parameter indicates persistence
 }

 @Bean
 public DirectExchange userExchange() {
     return new DirectExchange("userExchange", true, false); // The second parameter indicates persistence
 }

Set the message to persist when sending a message.

import ;
 import ;
 import ;
 import ;

 @Service
 public class ReliableMessageSender {

     @Autowired
     private RabbitTemplate rabbitTemplate;

     @Autowired
     private ObjectMapper objectMapper;

     public void sendUserWithConfirmation(User user) throws IOException {
         // Generate unique CorrelationData
         CorrelationData correlationData = new CorrelationData(().toString());

         // Set message properties
         MessageProperties properties = new MessageProperties();
         ("application/json"); // Clearly set content-type
         (); // Persistence message
         byte[] body = (user);
         Message message = new Message(body, properties);

         // Send a message
         ("userExchange", "", message, correlationData);
         ("Sent user with confirmation: " + user);
     }
 }

4.3 Manual confirmation by consumers

existservice-bofEnable manual confirmation in.

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

existservice-bManual confirmation logic is implemented in the

package ;

 import ;
 import ;
 import ;
 import ;
 import ;
 import ;

 import ;

 @Service
 public class ManualAckReceiver {

     @RabbitListener(queues = "userQueue")
     public void receiveUser(User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
         try {
             ("Received user from queue: " + user);
             // Manual confirmation of the message
             (tag, false);
         } catch (Exception e) {
             // Reject message and re-enter
             (tag, false, true);
         }
     }
 }

5. Dead Letter Queue (DLQ)

5.1 Configure the dead letter queue

existservice-aThe dead letter queue and the normal queue are configured.

package ;

 import .*;
 import ;
 import ;

 @Configuration
 public class RabbitMQDLXConfig {

     // Ordinary switch
     @Bean
     public DirectExchange normalExchange() {
         return new DirectExchange("normalExchange");
     }

     // Normal queue, configure dead letter switch
     @Bean
     public Queue normalQueue() {
         return ("normalQueue")
                 .deadLetterExchange("dlxExchange") // Specify the dead letter switch
                 .deadLetterRoutingKey("") // Specify the dead letter routing key
                 .build();
     }

     // Bind the normal queue to the normal switch
     @Bean
     public Binding bindingNormalQueue(DirectExchange normalExchange, Queue normalQueue) {
         return (normalQueue)
                 .to(normalExchange)
                 .with("");
     }

     // Dead letter switch
     @Bean
     public DirectExchange dlxExchange() {
         return new DirectExchange("dlxExchange");
     }

     // Dead letter queue
     @Bean
     public Queue dlqQueue() {
         return new Queue("dlqQueue");
     }

     // Bind the dead letter queue to the dead letter switch
     @Bean
     public Binding bindingDlqQueue(DirectExchange dlxExchange, Queue dlqQueue) {
         return (dlqQueue)
                 .to(dlxExchange)
                 .with("");
     }
 }

5.2 Send messages to normal queue

existservice-aSend messages to the normal queue.

package ;

import ;
import ;
import ;
import ;

@Service
public class NormalMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendUserToNormalQueue(User user) {
        ("normalExchange", "", user);
        ("Sent user to normal queue: " + user);
    }
}

5.3 Messages that consume ordinary queues

existservice-bConsumes messages from ordinary queues and simulates message processing failed.

package ;

 import ;
 import ;
 import ;
 import ;
 import ;
 import ;

 import ;

 @Service
 public class NormalMessageReceiver {

     @RabbitListener(queues = "normalQueue")
     public void receiveUserFromNormalQueue(User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
         try {
             ("Received user from normal queue: " + user);
             if (().equals("Bob")) {
                 throw new RuntimeException("Simulated processing failure");
             }
             // Manual confirmation of the message
             (tag, false);
         } catch (Exception e) {
             // Reject message and re-enter
             (tag, false, false); // If you do not re-enter, the message will be routed to the dead letter queue
             ("Message rejected and sent to DLQ: " + user);
         }
     }
 }

5.4 Messages of consumption dead letter queues

existservice-bMessages of the consumption dead letter queue.

package ;

import ;
import ;
import ;

@Service
public class DLQMessageReceiver {

    @RabbitListener(queues = "dlqQueue")
    public void receiveUserFromDLQ(User user) {
        ("Received user from DLQ: " + user);
    }
}

6. Test the dead letter queue

6.1 Send a message

existservice-aSend messages to the normal queue:

(new User("Alice", 25));
(new User("Bob", 30));

6.2 Observe log

  • Normal message (Alice) Will be consumed and confirmed:
    Received user from normal queue: User{name='Alice', age=25}
    
  • Failure message (Bob) Will be denied and routed to the dead letter queue:
    Received user from normal queue: User{name='Bob', age=30}
    Message rejected and sent to DLQ: User{name='Bob', age=30}
    Received user from DLQ: User{name='Bob', age=30}
    

7. Summary

Through the above steps, we have implemented the dead letter queue function of RabbitMQ:

  1. Normal queue: Bind to a normal switch, and configure dead letter switch and routing keys.
  2. Dead letter queue: Bind to a dead letter switch, used to store messages that cannot be consumed normally.
  3. Message processing
    • Normal messages are consumed and confirmed.
    • The failure message is denied and routed to the dead letter queue.
  4. Dead letter queue consumption: Consume messages in the dead letter queue separately.

This mechanism is very suitable for handling messages in exceptional situations, ensuring system reliability and maintainability.


8. Other common knowledge points

8.1 Message Expiration (TTL)

You can set an expiration time (Time-To-Live, TTL) for a queue or message. Expired messages will be routed to the dead letter queue.

Set up queue TTL:

@Bean
 public Queue normalQueue() {
     return ("normalQueue")
             .deadLetterExchange("dlxExchange")
             .deadLetterRoutingKey("")
             .ttl(60000) // Set the TTL of the message in the queue to 60 seconds
             .build();
 }

Set Message TTL:

MessageProperties properties = new MessageProperties();
 ("60000"); // Set the message TTL to 60 seconds
 Message message = new Message(body, properties);
 ("normalExchange", "", message);

8.2 Priority queue

You can set priority for queues, and messages with high priority will be consumed first.

Set the priority queue:

@Bean
 public Queue priorityQueue() {
     return ("priorityQueue")
             .maxPriority(10) // Set the maximum priority to 10
             .build();
 }

Send a priority message:

MessageProperties properties = new MessageProperties();
 (5); // Set message priority to 5
 Message message = new Message(body, properties);
 ("priorityExchange", "", message);

Hope this blog helps you! If you have any questions or suggestions, please leave a message in the comment area.