Location>code7788 >text

Java Message Queue Getting Started

Popularity:876 ℃/2024-10-21 18:34:36

What is a message queue?

The generation of message queues is mainly to solve the asynchronous decoupling between systems and ensure the ultimate consistency. In practical application scenarios, there are often a number of mainstream and auxiliary process operations, where the main process needs to respond quickly to user requests, while the auxiliary process may involve complex processing logic or rely on external services. By putting the messages of these auxiliary processes into the message queue, they can be executed in parallel and will not block the operation of the main process, thus improving the overall performance of the system and user experience. At the same time, the message queue supports at least one message delivery mechanism, which ensures that the auxiliary processes will be able to be executed even under unstable network or other abnormal conditions, thus enhancing the reliability and stability of the system.

What scenarios to use message queues

In practical application scenarios, RocketMQ, as an efficient and reliable message queuing service, is able to meet different business requirements. The following scenarios explain the keywords decoupling, asynchronous processing, peak shaving, reliability and scalability.

decoupled

Suppose there is an e-commerce system that needs to trigger a series of operations such as inventory update, order record generation, etc. whenever a user places an order. If you don't use message queues, these logics need to be executed serially or called directly, which not only increases the complexity of the system, but also makes the dependencies between various components become tight. With RocketMQ, the order service can act as a producer to send order information to a specific topic, and the subsequent inventory and order record services act as consumers to subscribe to and process the relevant information from that topic. In this way, even if a downstream service fails, it will not directly affect the normal operation of the entire shopping process.

asynchronous processing

Consider an online payment platform, after the user completes the payment action, in addition to immediately returning the payment result to the front-end, the back-end also needs to perform a series of operations such as updating points, issuing coupons, etc. If all the steps are synchronized, it will greatly increase the response time. If all steps are synchronized, it will greatly increase the response time. At this point, the use of RocketMQ allows the payment service to quickly confirm the success of the transaction and feedback to the client, while asynchronously releasing the relevant events for other modules to subscribe to the processing, thus effectively improving the user experience and service efficiency.

lit. strip peaks and fill valleys (idiom); fig. to overwhelm a person's capacity for work

For applications with obvious peak access periods (such as e-commerce platforms during holiday promotions), a large number of requests coming in within a short period of time may cause the server to experience a dramatic increase in pressure or even crash. With a messaging middleware like RocketMQ, requests that exceed the processing capacity can be stored temporarily during peak traffic hours and gradually distributed to back-end services at a controlled rate, thus smoothing the traffic and protecting the core business from impact.

dependability

The security and integrity of the data transmission process is of paramount importance. For example, in financial transaction scenarios, each transaction needs to be recorded accurately. RocketMQ supports transactional messaging, which ensures the consistency of message sending and local transactions; it also provides a message persistence storage mechanism and a retry policy to ensure that message delivery can be finalized even in the event of network instability or system anomalies, which greatly enhances the robustness of the system.

scalability

As business grows, it is often necessary to horizontally scale existing architectures to handle higher volumes of concurrent requests. By introducing RocketMQ, we can easily add more producer instances to share the message production task, or deploy additional consumer instances to speed up message consumption. Thanks to consumer groups, additional nodes can automatically participate in load balancing without having to change their existing configuration, simplifying the scaling process and improving overall throughput.

The main product available for message queues:

Online business scenarios are suitable:
Designed for high concurrency, low latency and high availability, RocketMQ is especially suited for financial-grade, real-time transaction processing. It supports transactional messages to ensure data consistency in distributed systems. In addition, RocketMQ provides flexible message types, including sequential messages, timed/delayed messages, etc., which are ideal for complex online business needs.

Big data transfer fits:
Kafka excels in large-scale data processing scenarios, especially log collection and streaming data processing. Its core strength lies in its single-file storage structure, which enables Kafka to achieve extremely high efficiency when processing large amounts of continuously written data. Therefore, Kafka is ideal for applications that need to efficiently process large amounts of data.

JMS standard implementation is required to fit:
ActiveMQ as a fully compatible with the JMS1.1 specification of the message middleware , ideal for those projects that rely on Java enterprise application environment . In addition to providing rich support for messaging protocols , ActiveMQ also has a powerful feature set , such as persistent message storage , distributed deployment model , etc. , can meet a variety of complex message routing needs .

The amqp and other multi-fine small scenes are suitable:
RabbitMQ is well known for its support for AMQP (Advanced Message Queuing Protocol) and is compatible with a variety of protocols such as STOMP, MQTT, etc., making it ideal for communication between IoT devices and the decoupling needs of a microservices architecture. Although RocketMQ is also adding support for these protocols, RabbitMQ is still one of the preferred solutions for this type of application scenario.

Detailed example of sending and receiving messages using MQ (using rocketmq as an example)

Detailed example of using MQ to send and receive messages (using RocketMQ as an example)

1. Addition of dependencies

First, in the project'sfile to add a dependency on the RocketMQ client library. This step is necessary to utilize RocketMQ functionality in your Java application.

1 <dependency>
2     <groupId></groupId>
3 
4     <artifactId>rocketmq-client</artifactId>
5 
6     <version>4.9.1</version>
7 
8 </dependency>

For Gradle projects, the corresponding dependency declarations are as follows:

  1 implementation ':rocketmq-client:4.9.1' 

Make sure to use a version that is compatible with your locally or remotely running RocketMQ server.

2. Messaging

Below is a simple example of a producer that sends a synchronization message to a specified topic. Also shown here is how to configure and start aDefaultMQProducerExample.

 1 import ;
 2 import ;
 3 
 4 public class Producer {
 5     public static void main(String[] args) throws Exception {
 6         // Create a producer instance and set the producer group name
 7         DefaultMQProducer producer = new DefaultMQProducer("my-group");
 8         // Setting the NameServer Address
 9         ("localhost:9876");
10         // kick-start producer
11         ();
12         
13         for (int i = 0; i < 10; i++) {
14             // Create a message, specifying the subject, label and message content
15             Message msg = new Message("test-topic-1", "TagA", ("Hello RocketMQ " + i).getBytes());
16             
17             // Send message to Broker
18             (msg);
19         }
20         
21         // Close Producer
22         ();
23     }
24 }

3. Consumer news

Next, a consumer example is shown that illustrates how to subscribe to messages on a specific topic and process them. This example uses the push mode of theDefaultMQPushConsumer

 1 import ;
 2 import ;
 3 import ;
 4 import ;
 5 import ;
 6 
 7 import ;
 8 
 9 public class Consumer {
10     public static void main(String[] args) throws Exception {
11         // Create a consumer instance and set the consumer group name
12         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer_group");
13         // Setting the NameServer Address
14         ("localhost:9876");
15         // Subscribe to one or more Topics and filter the desired messages by tag
16         ("test-topic-1", "*");
17         // Register a message listener that triggers the onMessage method when a new message arrives.
18         (new MessageListenerConcurrently() {
19             @Override
20             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
21                 ("%s Receive New Messages: %s %n", ().getName(), msgs);
22                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
23             }
24         });
25         // Consumer activation
26         ();
27         ("Consumer Started.%n");
28     }
29 }

Why does this enable asynchronous decoupling?

Through the above steps, we can utilize RocketMQ to achieve asynchronous communication of messages, thus decoupling the system components. The producer publishes messages to specific topics without caring about which consumers will receive them; similarly, consumers can access information by subscribing to topics of interest without directly interacting with the producer. This approach not only simplifies the complexity of the system, but also improves its scalability and flexibility. For example, in an ordering system, when a user places an order, he or she can immediately send a message containing the order details to the inventory management service, which can process the request when it is idle without blocking the user's flow. In addition, RocketMQ supports multiple message types (e.g., sequential messages, delayed messages, etc.), allowing developers to choose the most appropriate message model to further optimize application design based on specific needs.