The following is a blog about RabbitMQ, covering the implementation of the basics to dead letter queues, as well as supplements to other common knowledge points of RabbitMQ. The content is clear and the code is complete, and it is suitable for direct release.
Using RabbitMQ to implement message queues and dead letter queues: from basic to advanced
In modern distributed systems, message queues (such as RabbitMQ) are an important tool for decoupling and asynchronous communication. This article will be based on Spring Boot and RabbitMQ, and gradually implement the following functions from basic to advanced:
- Send a message to the queue。
- Send a message to the switch。
-
Message reliability mechanism:
- Message confirmation mechanism (Publisher Confirms).
- Durable Queues and Messages.
- Manual Acknowledgement.
- Dead Letter Queue (DLQ): Process messages that cannot be consumed normally.
We will use a simpleUser
The object is the message content,User
Class containsname
andage
Field.
1. CreateUser
kind
First, inservice-a
andservice-b
Created inUser
kind.
package ;
import ;
public class User implements Serializable {
private String name;
private int age;
// There must be a parameter constructor
public User() {
}
public User(String name, int age) {
= name;
= age;
}
// Getter and Setter
public String getName() {
return name;
}
public void setName(String name) {
= name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
= age;
}
@Override
public String toString() {
return "User{name='" + name + "', age=" + age + "}";
}
}
2. Send a message to the queue
2.1 Configure the queue
existservice-a
Configure a queue in.
package ;
import ;
import ;
import ;
@Configuration
public class RabbitMQQueueConfig {
@Bean
public Queue userQueue() {
return new Queue("userQueue", true); // The second parameter indicates persistence
}
}
2.2 Send a message
existservice-a
Send inUser
Object to queue.
package ;
import ;
import ;
import ;
import ;
@Service
public class QueueMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendUserToQueue(User user) {
("userQueue", user);
("Sent user to queue: " + user);
}
}
2.3 Receive messages
existservice-b
Listen to the queue and receiveUser
Object.
package ;
import ;
import ;
import ;
@Service
public class QueueMessageReceiver {
@RabbitListener(queues = "userQueue")
public void receiveUserFromQueue(User user) {
("Received user from queue: " + user);
}
}
3. Send a message to the switch
3.1 Configuring switches and queues
existservice-a
Configure a Direct Exchange in the queue and bind the queue.
package ;
import .*;
import ;
import ;
@Configuration
public class RabbitMQExchangeConfig {
@Bean
public DirectExchange userExchange() {
return new DirectExchange("userExchange", true, false); // The second parameter indicates persistence
}
@Bean
public Queue userExchangeQueue() {
return new Queue("userExchangeQueue", true); // The second parameter indicates persistence
}
@Bean
public Binding bindingUserExchangeQueue(DirectExchange userExchange, Queue userExchangeQueue) {
return (userExchangeQueue)
.to(userExchange)
.with("");
}
}
3.2 Send a message
existservice-a
Send inUser
Object to switch.
package ;
import ;
import ;
import ;
import ;
@Service
public class ExchangeMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendUserToExchange(User user) {
("userExchange", "", user);
("Sent user to exchange: " + user);
}
}
3.3 Receive messages
existservice-b
Listen to the queue and receiveUser
Object.
package ;
import ;
import ;
import ;
@Service
public class ExchangeMessageReceiver {
@RabbitListener(queues = "userExchangeQueue")
public void receiveUserFromExchange(User user) {
("Received user from exchange: " + user);
}
}
4. Message reliability
4.1 Message confirmation mechanism (Publisher Confirms)
existEnable Publisher Confirms and Returns in .
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
Publisher-confirm-type: correlated # Enable Publisher Confirms
Publisher-returns: true # Enable Publisher Returns
existservice-a
Medium configurationRabbitTemplate
to support Publisher Confirms and Returns.
package ;
import ;
import ;
import ;
import ;
@Configuration
public class RabbitMQConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// Enable Publisher Confirms and Returns
(true);
// Set confirmation callback
((correlationData, ack, cause) -> {
if (ack) {
("Message confirmed with correlation data: " + correlationData);
} else {
("Message failed with cause: " + cause);
}
});
// Set the return callback
(returned -> {
("Returned message: " + ());
("Reply code: " + ());
("Reply text: " + ());
("Exchange: " + ());
("Routing key: " + ());
});
return rabbitTemplate;
}
}
4.2 Message persistence
Enable persistence when configuring queues and switches.
@Bean
public Queue userQueue() {
return new Queue("userQueue", true); // The second parameter indicates persistence
}
@Bean
public DirectExchange userExchange() {
return new DirectExchange("userExchange", true, false); // The second parameter indicates persistence
}
Set the message to persist when sending a message.
import ;
import ;
import ;
import ;
@Service
public class ReliableMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ObjectMapper objectMapper;
public void sendUserWithConfirmation(User user) throws IOException {
// Generate unique CorrelationData
CorrelationData correlationData = new CorrelationData(().toString());
// Set message properties
MessageProperties properties = new MessageProperties();
("application/json"); // Clearly set content-type
(); // Persistence message
byte[] body = (user);
Message message = new Message(body, properties);
// Send a message
("userExchange", "", message, correlationData);
("Sent user with confirmation: " + user);
}
}
4.3 Manual confirmation by consumers
existservice-b
ofEnable manual confirmation in.
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
existservice-b
Manual confirmation logic is implemented in the
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
@Service
public class ManualAckReceiver {
@RabbitListener(queues = "userQueue")
public void receiveUser(User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
("Received user from queue: " + user);
// Manual confirmation of the message
(tag, false);
} catch (Exception e) {
// Reject message and re-enter
(tag, false, true);
}
}
}
5. Dead Letter Queue (DLQ)
5.1 Configure the dead letter queue
existservice-a
The dead letter queue and the normal queue are configured.
package ;
import .*;
import ;
import ;
@Configuration
public class RabbitMQDLXConfig {
// Ordinary switch
@Bean
public DirectExchange normalExchange() {
return new DirectExchange("normalExchange");
}
// Normal queue, configure dead letter switch
@Bean
public Queue normalQueue() {
return ("normalQueue")
.deadLetterExchange("dlxExchange") // Specify the dead letter switch
.deadLetterRoutingKey("") // Specify the dead letter routing key
.build();
}
// Bind the normal queue to the normal switch
@Bean
public Binding bindingNormalQueue(DirectExchange normalExchange, Queue normalQueue) {
return (normalQueue)
.to(normalExchange)
.with("");
}
// Dead letter switch
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlxExchange");
}
// Dead letter queue
@Bean
public Queue dlqQueue() {
return new Queue("dlqQueue");
}
// Bind the dead letter queue to the dead letter switch
@Bean
public Binding bindingDlqQueue(DirectExchange dlxExchange, Queue dlqQueue) {
return (dlqQueue)
.to(dlxExchange)
.with("");
}
}
5.2 Send messages to normal queue
existservice-a
Send messages to the normal queue.
package ;
import ;
import ;
import ;
import ;
@Service
public class NormalMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendUserToNormalQueue(User user) {
("normalExchange", "", user);
("Sent user to normal queue: " + user);
}
}
5.3 Messages that consume ordinary queues
existservice-b
Consumes messages from ordinary queues and simulates message processing failed.
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
@Service
public class NormalMessageReceiver {
@RabbitListener(queues = "normalQueue")
public void receiveUserFromNormalQueue(User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
("Received user from normal queue: " + user);
if (().equals("Bob")) {
throw new RuntimeException("Simulated processing failure");
}
// Manual confirmation of the message
(tag, false);
} catch (Exception e) {
// Reject message and re-enter
(tag, false, false); // If you do not re-enter, the message will be routed to the dead letter queue
("Message rejected and sent to DLQ: " + user);
}
}
}
5.4 Messages of consumption dead letter queues
existservice-b
Messages of the consumption dead letter queue.
package ;
import ;
import ;
import ;
@Service
public class DLQMessageReceiver {
@RabbitListener(queues = "dlqQueue")
public void receiveUserFromDLQ(User user) {
("Received user from DLQ: " + user);
}
}
6. Test the dead letter queue
6.1 Send a message
existservice-a
Send messages to the normal queue:
(new User("Alice", 25));
(new User("Bob", 30));
6.2 Observe log
- Normal message (
Alice
) Will be consumed and confirmed:Received user from normal queue: User{name='Alice', age=25}
- Failure message (
Bob
) Will be denied and routed to the dead letter queue:Received user from normal queue: User{name='Bob', age=30} Message rejected and sent to DLQ: User{name='Bob', age=30} Received user from DLQ: User{name='Bob', age=30}
7. Summary
Through the above steps, we have implemented the dead letter queue function of RabbitMQ:
- Normal queue: Bind to a normal switch, and configure dead letter switch and routing keys.
- Dead letter queue: Bind to a dead letter switch, used to store messages that cannot be consumed normally.
-
Message processing:
- Normal messages are consumed and confirmed.
- The failure message is denied and routed to the dead letter queue.
- Dead letter queue consumption: Consume messages in the dead letter queue separately.
This mechanism is very suitable for handling messages in exceptional situations, ensuring system reliability and maintainability.
8. Other common knowledge points
8.1 Message Expiration (TTL)
You can set an expiration time (Time-To-Live, TTL) for a queue or message. Expired messages will be routed to the dead letter queue.
Set up queue TTL:
@Bean
public Queue normalQueue() {
return ("normalQueue")
.deadLetterExchange("dlxExchange")
.deadLetterRoutingKey("")
.ttl(60000) // Set the TTL of the message in the queue to 60 seconds
.build();
}
Set Message TTL:
MessageProperties properties = new MessageProperties();
("60000"); // Set the message TTL to 60 seconds
Message message = new Message(body, properties);
("normalExchange", "", message);
8.2 Priority queue
You can set priority for queues, and messages with high priority will be consumed first.
Set the priority queue:
@Bean
public Queue priorityQueue() {
return ("priorityQueue")
.maxPriority(10) // Set the maximum priority to 10
.build();
}
Send a priority message:
MessageProperties properties = new MessageProperties();
(5); // Set message priority to 5
Message message = new Message(body, properties);
("priorityExchange", "", message);
Hope this blog helps you! If you have any questions or suggestions, please leave a message in the comment area.