RabbitMQ Quick Start Integration SpringBoot
summarize
-
In most applications, message service middleware can be used to improve system asynchronous communication, extend decoupling capabilities, traffic peak shaving
-
Two important concepts in messaging services: `message broker` and `destination`. After a message sender sends a message, it will be taken over by a message broker, which ensures that the message is delivered to the specified destination.
-
Message queues have two main forms of destination 1. `queue`: point-to-point message communication (`point-to-point`) 2. `topic`: `publish`/subscribe` message communication
RabbitMQ Architecture
conceptual
Producer
The producer is the sender of the message, and it sends the message to RabbitMQ's exchanger.
✨ Message
- Message = message header + message body, sent to specified switch based on routekey Exchange
- Message header: contains various attributesrouting-key(routing key), priority, delivery-mode (indicating that the message may require persistent storage), etc.
✨ Message broker Broker
- A messaging middleware server, responsible for receiving, storing, and forwarding messages, acts like a post office 🏣
- Message Storage + Message Routing
- Broker = VHost1+Vhost2+Vhost3+.....
- Virtual Host = Exchange + Queue +Binding
Virtual Host
-
Logical grouping mechanism to isolate different users, queues, switches and other resources
-
Virtual i.e. VHost
-
Default directory /
✨ Switch Exchange
- Binds a routekey to receive messages and sends them to a queue that matches the routekey.
- Three types are commonly used
- ✨dirct:Direct Exchange 【unicast】Exactly matchrouting keyqueues
- ✨fanout:Fanout Exchange【publicize] message distribution on all binding queues, no routing keys are handled
- ✨topic:Topic Exchange【pattern matching】
-
#
:Configure 0 or more words -
*
: Match a word
-
- headers: rarely used
- system: rarely used
- ✨dirct:Direct Exchange 【unicast】Exactly matchrouting keyqueues
✨ Queue
- Container for storing messages, FIFO
- Buffered Messages + Persistence
Binding
- Used for associations between message queues and exchangers.
- A binding is a routing rule that connects a switch to a message queue based on a routing key, so a switch can be understood as a routing table made up of bindings.
- The binding of Exchange and Queue can be a many-to-many relationship.
Connection
- Network connection, e.g. a TCP connection
Channel
- Channel, one in a multiplexed connectionIndependent bi-directional data streaming channels. A channel is a virtual connection built within a real TCP connection. the AMQP commands are sent over the channel. whether it is thepost a message、Subscription Queueneverthelessreceive a messageThese actions are accomplished over the channel. Because establishing and destroying TCP is very expensive overhead for an operating system, the concept of a channel was introduced to multiplex a TCP connection.
Consumer
The consumer is the receiver of the message, which gets the message from RabbitMQ's queue and processes it.
Docker Installation RMQ
docker run -d --restart=always --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:43699-p25672:25672-p 15671:15671 -p 15672:15672 rabbitmq:management
Send/receive messages on the backend page
show (a ticket)localhost:15672
login page
User password for login: guest/guest
Log in to the backstage homepage
Switch Exchange page
Five switch types:direct、fanout、headers、topic、x-local-random
queue page
Binding: The switch binds to the corresponding queue based on the routing key
Virtual Host【Exchange --> binding(route-key) 】--> Queue(route-key)
The default path to the virtual host is "/", i.e.root directory
Switch and Queue Binding
Queue and Switch Binding Relationships
SpringBoot Integration
Configuring the pom file
<!--AMQPdependencies,incorporateRabbitMQ-->
<dependency>
<groupId></groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
configure
server.
port: 8081
spring.
name: rabbitmq-demo
name: rabbitmq-demo
spring: application: name: rabbitmq-demo
host: localhost
port: 5672
username: guest
password: guest
virtualHost: /
# publisher-confirm-type: CORRELATED
# publisher-returns: true
# publisher-confirm-type: CORRELATED # publisher-returns: true
# publish-confirm-type: CORRELATED # publisher-returns: true # listener.
# acknowledge-mode: manual # By default, message consumers acknowledge messages automatically, if they want to acknowledge messages manually, they need to change the acknowledge mode to manual.
# prefetch: 1 # The number of messages the consumer will fetch from the queue at a time. This attribute is not set: polling distribution, set to 1: fair distribution.
In the test class - create the switch
@Slf4j
@SpringBootTest
class RabbitmqDemoApplicationTests {
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange() {
DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
(directExchange);
("Exchange[hello-java-exchange] Creation complete");
}
}
Successful creation
Creating a Queue
@Test
public void createQueue() {
Queue queue = new Queue("hello-java-queue", true, false, false);
(queue).
("Queue[hello-java-queue] creation completed");;
}
Successfully created after execution
Creating Bindings
@Test
public void Binding() {
Binding binding = new Binding("hello-java-queue", .
,
"hello-java-exchange", ,
"hello-java", null);
(binding).
("Binding [hello-java-binding] creation complete");;
}
Direct Connect Switch【hello-java-exchang
e] and the queue [hello-java-queue
use]routingkey
【hello-java
】Binding
Queue Binding
Switch Binding
Send Message [JSON Message Converter
configureRabbitConfig
serializejson
Based on source code
RabbitAutoConfiguration
establish@Bean RabbitTemplate
The message converter property in theMessageConverter messageConverter = new SimpleMessageConverter();
illustrates
RabbitMQ
The autoconfiguration process creates the tool class [RabbitTemplate
], where the default message converter is [SimpleMessageConverter
], let's look at the [SimpleMessageConverter
How] source code sends and receives messages
SimpleMessageConverter
Creating a Message
// Creating a Message,Serialization is used by default SerializableType Send,The message entities sent need to be serialized
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
if (object instanceof byte[] bytes) {
("application/octet-stream");
} else if (object instanceof String) {
try {
bytes = ((String)object).getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new MessageConversionException("failed to convert to Message content", e);
}
("text/plain");
("UTF-8");
} else if (object instanceof Serializable) {
try {
bytes = (object);
} catch (IllegalArgumentException e) {
throw new MessageConversionException("failed to convert to serialized Message content", e);
}
("application/x-java-serialized-object");
}
if (bytes != null) {
((long));
return new Message(bytes, messageProperties);
} else {
String var10002 = ().getSimpleName();
throw new IllegalArgumentException(var10002 + " only supports String, byte[] and Serializable payloads, received: " + ().getName());
}
}
SimpleMessageConverter
consumer goods
public Object fromMessage(Message message) throws MessageConversionException {
Object content = null;
MessageProperties properties = ();
if (properties != null) {
String contentType = ();
if (contentType != null && ("text")) {
String encoding = ();
if (encoding == null) {
encoding = ;
}
try {
content = new String((), encoding);
} catch (UnsupportedEncodingException e) {
throw new MessageConversionException("failed to convert text-based Message content", e);
}
} else if (contentType != null && ("application/x-java-serialized-object")) {
try {
content = ((new ByteArrayInputStream(())));
} catch (IllegalArgumentException | IllegalStateException | IOException e) {
throw new MessageConversionException("failed to convert serialized Message content", e);
}
}
}
if (content == null) {
content = ();
}
return content;
}
自定义消息类型转器 MessageConverter
MessageConverter
的层次结构
自定义消息类型转换器
import .Jackson2JsonMessageConverter;
import ;
import ;
import ;
@Configuration
public class RabbitMQConfig {
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
创建数据:订单退出原因实体对象 注意需要序列化 Serializable
@ToString
@Data
@Accessors(chain = true)
//@TableName("oms_order_return_reason")
public class OrderReturnReasonEntity implements Serializable {
private static final long serialVersionUID = 1L;
private Long id;
/**
* 退货原因名
*/
private String name;
/**
* 排序
*/
private Integer sort;
/**
* 启用状态
*/
private Integer status;
/**
* create_time
*/
private Date createTime;
}
测试类中发送消息
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessage() {
OrderReturnReasonEntity data = new OrderReturnReasonEntity();
(1L)
.setCreateTime(new Date())
.setName("测试");
("hello-java-exchange", "hello-java", data);
("发送消息: {}", data);
}
队列收到消息
收到消息对象
{"id":1,"name":"测试","sort":null,"status":null,"createTime":1733484472414}
接收信息
在启动类上添加 @EnableRabbit
开启 RabbitMQ
@EnableRabbit
@SpringBootApplication
public class RabbitmqDemoApplication {
public static void main(String[] args) {
(, args);
}
}
在需要接收消息的地方添加方法 @RabbitListerner
、@RabbitHandler
@
RabbitListerner
:Used on classes and methods and bound to the corresponding queue@
RabbitHandler
: Used in methods that can receiveDifferent types of data
@RabbitListener(queues = {"hello-java-queue"})
@Component
@Slf4j
public class OrderMQHandler {
@RabbitHandler
public void receiveOrderReturnReason(Message message, OrderReturnReasonEntity content, Channel channel) {
//message body
byte[] body = ();
//Message Header Configuration
MessageProperties messageProperties = ();
("message body内容:{}", content);
}
@RabbitHandler
public void receiverOrder(OrderEntity content) {
("receive a message=>Order:{}", content);
}
}
Successfully receivedOrderReturnReasonEntity
object data
2024-12-06T22:46:37.495+08:00 INFO 15808 --- [ntContainer#0-3] : Message body content:OrderReturnReasonEntity(id=1, name=beta (software)-0, sort=null, status=null, createTime=Fri Dec 06 22:46:37 CST 2024)
Successfully receivedOrderEntity
object data
2024-12-06T22:46:37.522+08:00 INFO 15808 --- [ntContainer#0-3] : receive a message=>Order:OrderEntity(id=1, memberId=null, orderSn=null, couponId=null, createTime=Fri Dec 06 22:46:37 CST 2024, memberUsername=test (machinery etc)-1, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=null, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)