Location>code7788 >text

MQ Series | RabbitMQ Integration SpringBoot

Popularity:140 ℃/2024-12-16 17:59:00

RabbitMQ Quick Start Integration SpringBoot

summarize

  1. In most applications, message service middleware can be used to improve system asynchronous communication, extend decoupling capabilities, traffic peak shaving
    
  2. Two important concepts in messaging services:
    `message broker` and `destination`.
    After a message sender sends a message, it will be taken over by a message broker, which ensures that the message is delivered to the specified destination.
    
  3. Message queues have two main forms of destination
    1. `queue`: point-to-point message communication (`point-to-point`)
    2. `topic`: `publish`/subscribe` message communication
    

RabbitMQ Architecture

rabbitmq架构

conceptual

Producer

The producer is the sender of the message, and it sends the message to RabbitMQ's exchanger.

✨ Message

  • Message = message header + message body, sent to specified switch based on routekey Exchange
  • Message header: contains various attributesrouting-key(routing key), priority, delivery-mode (indicating that the message may require persistent storage), etc.

✨ Message broker Broker

  • A messaging middleware server, responsible for receiving, storing, and forwarding messages, acts like a post office 🏣
  • Message Storage + Message Routing
  • Broker = VHost1+Vhost2+Vhost3+.....
  • Virtual Host = Exchange + Queue +Binding

Virtual Host

  • Logical grouping mechanism to isolate different users, queues, switches and other resources

  • Virtual i.e. VHost

  • Default directory /

✨ Switch Exchange

  • Binds a routekey to receive messages and sends them to a queue that matches the routekey.
  • Three types are commonly used
    • dirctDirect Exchangeunicast】Exactly matchrouting keyqueues
      • image-20241207003055398
    • fanoutFanout Exchangepublicize] message distribution on all binding queues, no routing keys are handled
      • image-20241207003049286
    • topicTopic Exchangepattern matching
      • #:Configure 0 or more words
      • *: Match a word
      • image-20241207003038106 
    • headers: rarely used
    • system: rarely used

✨ Queue

  • Container for storing messages, FIFO
  • Buffered Messages + Persistence

Binding

  • Used for associations between message queues and exchangers.
  • A binding is a routing rule that connects a switch to a message queue based on a routing key, so a switch can be understood as a routing table made up of bindings.
  • The binding of Exchange and Queue can be a many-to-many relationship.

Connection

  • Network connection, e.g. a TCP connection

Channel

  • Channel, one in a multiplexed connectionIndependent bi-directional data streaming channels. A channel is a virtual connection built within a real TCP connection. the AMQP commands are sent over the channel. whether it is thepost a messageSubscription Queueneverthelessreceive a messageThese actions are accomplished over the channel. Because establishing and destroying TCP is very expensive overhead for an operating system, the concept of a channel was introduced to multiplex a TCP connection.

Consumer

The consumer is the receiver of the message, which gets the message from RabbitMQ's queue and processes it.

Docker Installation RMQ

docker run -d --restart=always --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:43699-p25672:25672-p 15671:15671 -p 15672:15672 rabbitmq:management

Send/receive messages on the backend page

show (a ticket)localhost:15672

login page

User password for login: guest/guest

登录页面

Log in to the backstage homepage

首页

Switch Exchange page

交换机页面

Five switch types:directfanout、headers、topic、x-local-random

新增交换机

queue page

队列页面

Binding: The switch binds to the corresponding queue based on the routing key

Virtual Host【Exchange --> binding(route-key) 】--> Queue(route-key)

The default path to the virtual host is "/", i.e.root directory

Switch and Queue Binding

交换机绑定队列

Queue and Switch Binding Relationships

队列和交换机绑定关系

SpringBoot Integration

Configuring the pom file

<!--AMQPdependencies,incorporateRabbitMQ-->
<dependency>
    <groupId></groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

configure

server.
  port: 8081
spring.
  name: rabbitmq-demo
    name: rabbitmq-demo
  spring: application: name: rabbitmq-demo
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtualHost: /
# publisher-confirm-type: CORRELATED
# publisher-returns: true
# publisher-confirm-type: CORRELATED # publisher-returns: true
# publish-confirm-type: CORRELATED # publisher-returns: true # listener.
# acknowledge-mode: manual # By default, message consumers acknowledge messages automatically, if they want to acknowledge messages manually, they need to change the acknowledge mode to manual.
# prefetch: 1 # The number of messages the consumer will fetch from the queue at a time. This attribute is not set: polling distribution, set to 1: fair distribution.

In the test class - create the switch

@Slf4j
@SpringBootTest
class RabbitmqDemoApplicationTests {

    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    public void createExchange() {
        DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
        (directExchange);
        ("Exchange[hello-java-exchange] Creation complete");
    }
}

Successful creationimage-20241206170654862

Creating a Queue

@Test
public void createQueue() {
    Queue queue = new Queue("hello-java-queue", true, false, false);
    (queue).
    ("Queue[hello-java-queue] creation completed");;
}

Successfully created after execution

image-20241206170811552

Creating Bindings

@Test
public void Binding() {
    Binding binding = new Binding("hello-java-queue", .
                                  ,
                                  "hello-java-exchange", ,
                                  "hello-java", null);
    (binding).
    ("Binding [hello-java-binding] creation complete");;
}

Direct Connect Switchhello-java-exchange] and the queue [hello-java-queueuse]routingkeyhello-java】Binding

Queue Binding

image-20241206170917982

Switch Binding

image-20241206171001858

Send Message [JSON Message Converter

configureRabbitConfig serializejson

Based on source codeRabbitAutoConfiguration establish@Bean RabbitTemplate The message converter property in theMessageConverter messageConverter = new SimpleMessageConverter();

illustratesRabbitMQ The autoconfiguration process creates the tool class [RabbitTemplate], where the default message converter is [SimpleMessageConverter], let's look at the [SimpleMessageConverterHow] source code sends and receives messages

SimpleMessageConverterCreating a Message

// Creating a Message,Serialization is used by default SerializableType Send,The message entities sent need to be serialized
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    if (object instanceof byte[] bytes) {
        ("application/octet-stream");
    } else if (object instanceof String) {
        try {
            bytes = ((String)object).getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new MessageConversionException("failed to convert to Message content", e);
        }
        ("text/plain");
        ("UTF-8");
    } else if (object instanceof Serializable) {
        try {
            bytes = (object);
        } catch (IllegalArgumentException e) {
            throw new MessageConversionException("failed to convert to serialized Message content", e);
        }
        ("application/x-java-serialized-object");
    }
    if (bytes != null) {
        ((long));
        return new Message(bytes, messageProperties);
    } else {
        String var10002 = ().getSimpleName();
        throw new IllegalArgumentException(var10002 + " only supports String, byte[] and Serializable payloads, received: " + ().getName());
    }
}

SimpleMessageConverterconsumer goods

public Object fromMessage(Message message) throws MessageConversionException {
        Object content = null;
        MessageProperties properties = ();
        if (properties != null) {
            String contentType = ();
            if (contentType != null && ("text")) {
                String encoding = ();
                if (encoding == null) {
                    encoding = ;
                }

                try {
                    content = new String((), encoding);
                } catch (UnsupportedEncodingException e) {
                    throw new MessageConversionException("failed to convert text-based Message content", e);
                }
            } else if (contentType != null && ("application/x-java-serialized-object")) {
                try {
                    content = ((new ByteArrayInputStream(())));
                } catch (IllegalArgumentException | IllegalStateException | IOException e) {
                    throw new MessageConversionException("failed to convert serialized Message content", e);
                }
            }
        }

        if (content == null) {
            content = ();
        }

        return content;
    }

自定义消息类型转器 MessageConverter

MessageConverter 的层次结构

image-20241207013346381

自定义消息类型转换器

import .Jackson2JsonMessageConverter;
import ;
import ;
import ;

@Configuration
public class RabbitMQConfig {

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

}

创建数据:订单退出原因实体对象 注意需要序列化 Serializable

@ToString
@Data
@Accessors(chain = true)
//@TableName("oms_order_return_reason")
public class OrderReturnReasonEntity implements Serializable {
	private static final long serialVersionUID = 1L;

	private Long id;
	/**
	 * 退货原因名
	 */
	private String name;
	/**
	 * 排序
	 */
	private Integer sort;
	/**
	 * 启用状态
	 */
	private Integer status;
	/**
	 * create_time
	 */
	private Date createTime;

}

测试类中发送消息

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sendMessage() {
    OrderReturnReasonEntity data = new OrderReturnReasonEntity();
    (1L)
            .setCreateTime(new Date())
            .setName("测试");
    ("hello-java-exchange", "hello-java", data);
    ("发送消息: {}", data);
}

队列收到消息

image-20241206192848893

收到消息对象

image-20241206193039689

{"id":1,"name":"测试","sort":null,"status":null,"createTime":1733484472414}

接收信息

启动类上添加 @EnableRabbit 开启 RabbitMQ

@EnableRabbit
@SpringBootApplication
public class RabbitmqDemoApplication {

    public static void main(String[] args) {
        (, args);
    }
}

在需要接收消息的地方添加方法 @RabbitListerner@RabbitHandler

@RabbitListerner :Used on classes and methods and bound to the corresponding queue

@RabbitHandler: Used in methods that can receiveDifferent types of data

@RabbitListener(queues = {"hello-java-queue"})
@Component
@Slf4j
public class OrderMQHandler {
    @RabbitHandler
    public void receiveOrderReturnReason(Message message, OrderReturnReasonEntity content, Channel channel) {
        //message body
        byte[] body = ();
        //Message Header Configuration
        MessageProperties messageProperties = ();
        ("message body内容:{}", content);
    }

    @RabbitHandler
    public void receiverOrder(OrderEntity content) {
        ("receive a message=>Order:{}", content);
    }
}

Successfully receivedOrderReturnReasonEntityobject data

2024-12-06T22:46:37.495+08:00 INFO 15808 --- [ntContainer#0-3] : Message body content:OrderReturnReasonEntity(id=1, name=beta (software)-0, sort=null, status=null, createTime=Fri Dec 06 22:46:37 CST 2024)

Successfully receivedOrderEntityobject data

2024-12-06T22:46:37.522+08:00 INFO 15808 --- [ntContainer#0-3] : receive a message=>Order:OrderEntity(id=1, memberId=null, orderSn=null, couponId=null, createTime=Fri Dec 06 22:46:37 CST 2024, memberUsername=test (machinery etc)-1, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=null, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)