SpringBoot introduces RocketMQ
Rapidly Build Standalone RocketMQ
/article/3079 Refer to this article to quickly build a standalone RocketMQ
Project introduction jar package and configuration
<dependency>
<groupId></groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
</dependency>
rocketmq.
consumer.
group: oneCoupon_merchant_admin_consumer_group
# Maximum number of messages to pull at one time, note that this is the maximum number of messages to pull, not the maximum number to consume
pull-batch-size: 10
name-server: xxx:9876
producer.
# Set the same group for messages of the same type, to ensure uniqueness.
group: oneCoupon_merchant_admin_producer_group
# Send message timeout, default 3000
sendMessageTimeout: 10000
# Retry times when send message fails, default 2
retryTimesWhenSendFailed: 2
# Asynchronous message retry times, default 2
retryTimesWhenSendAsyncFailed: 2
# Maximum message length, default 1024 * 1024 * 4 (default 4M)
maxMessageSize: 4096
# Message compression threshold, default 4k(1024 * 4)
compressMessageBodyThreshold: 4096
# Whether to retry another broker in case of internal send failure, default false
retryNextServer: false
Adding consumers to the console
typical example
autotroph (original producers in a food chain)
@Component
@RequiredArgsConstructor
@Slf4j
public class ShortLinkStatsSaveProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void send(String topic ,Map<String, String> producerMap) {
("send message to rocketMQ, topic: {}, producerMap: {}", topic, producerMap);
(topic, producerMap);
}
}
consumers
@Component
@RocketMQMessageListener(consumerGroup = "saaslink_consumer_group", topic = RedisKeyConstant.SHORT_LINK_STATS_STREAM_TOPIC_KEY)
@Slf4j
public class ShortLinkStatsSaveConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt msgExt) {
String msgId = ();
// utilizationredisRealization of idempotents
if ((())) {
// Determine if the current message flow is complete
if ((())) {
return;
}
throw new ServiceException("Message incomplete process,Requires message queue retry");
}
try {
byte[] msgExtBody = ();
// change over tomap
Map<String, String> producerMap = (msgExtBody, );
ShortLinkStatsRecordDTO statsRecord = (("statsRecord"), );
// The actual added logic
actualSaveShortLinkStats(("fullShortUrl"), ("gid"), statsRecord);
} catch (Throwable ex) {
// So-and-so is down.
(());
("Record short links to monitor consumption exceptions", ex);
throw ex;
}
(());
}
}
Anatomy of a RocketMQ
RocketMQ Architecture Components
- Producer: the role of message publishing, supports distributed cluster deployment. producer through the MQ load balancing module to select the appropriate Broker cluster queue for message delivery, the delivery process supports fast failure and low latency.
- Consumer: the role of message consumption , support for distributed cluster deployment. Support to push push , pull pull two modes of consumption of messages . Also supports cluster mode and broadcast mode of consumption , it provides real-time message subscription mechanism , can meet the needs of most users .
- NameServer: NameServer is a very simple Topic route registration center, its role is similar to Dubbo zookeeper, support for dynamic registration and discovery of Broker. Mainly includes two functions: Broker management, NameServer accepts the registration information of the Broker cluster and saves it as the basic data of the routing information. Then it provides a heartbeat detection mechanism to check whether the Broker is still alive; route information management, each NameServer will save the entire route information about the Broker cluster and the queue information for client queries. Then the Producer and Conumser can know the route information of the whole Broker cluster through NameServer, so as to deliver and consume the messages.NameServers are usually deployed in a clustered way, and there is no information communication between the instances.The Broker registers its own route information with each NameServer, so each NameServer registers its own route information with each NameServer. Broker registers its routing information with each NameServer, so each NameServer instance keeps a complete copy of the routing information. When a NameServer is offline for some reason, the Broker can still synchronize its routing information with other NameServers, and the Producer and Consumer can still dynamically perceive the routing information of the Broker.
- BrokerServer: Message relay role, responsible for storing and forwarding messages. The proxy server is responsible for receiving messages from producers in the RocketMQ system, storing them, and preparing them for pull requests from consumers. The proxy server also stores message-related metadata, including consumer groups, consumption progress offsets and topics, and queue messages.
Cluster Workflow
- Start NameServer, NameServer up to listen to the port, waiting for Broker, Producer, Consumer to connect up, equivalent to a routing control center.
- Broker starts, keeps long connection with all NameServers and sends heartbeat packets regularly. The heartbeat packet contains the current Broker information (IP+Port, etc.) and stores all the Topic information. After successful registration, there is a mapping relationship between Topics and Brokers in the NameServer cluster.
- Before sending and receiving messages, create a Topic. When creating a Topic, you need to specify the Broker on which the Topic is to be stored, or you can automatically create a Topic when you send a message.
- The Producer sends a message. When it starts, it first establishes a long connection with one of the NameServer clusters and obtains from the NameServer which Broker the currently sent Topic exists on, then polls and selects a queue from the queue list, and then establishes a long connection with the Broker where the queue is located in order to send a message to the Broker.
- Consumer is similar to Producer, establish a long connection with one of the NameServer, get the current subscription Topic exists on which Broker, and then directly with the Broker to establish a connection channel, start consuming messages.
message storage
-
Producer and CommitLog:
- The Producer sends the message to RocketMQ using the Topic, QueueId, and Message. after the message arrives at the server, it is written to the CommitLog, which is the core location where RocketMQ stores the message. In the figure, the CommitLog section shows the state of the message storage.
- The red color indicates a message that has been written, and a hollow box indicates a message that is waiting to be written. Messages are stored in the CommitLog in sequential order, with the corresponding commitLogOffset (i.e., the offset in the CommitLog), msgSize (the message size), and tagsCode (the tag code used for message filtering).
-
Asynchronously build the consumption logic queue (doDispatch):
- RocketMQ asynchronously builds the message written to the CommitLog into a consumer logic queue. This step is accomplished with doDispatch, which adds the message's commitLogOffset, msgSize, tagsCode, and other information to the appropriate ConsumerQueue to support consumer consumption of the message.
- This process implements the association between the logical queue of messages and the physical storage (CommitLog).
-
ConsumerQueue and the consumer process:
- The figure shows three ConsumerQueues (ConsumerQueue0, ConsumerQueue1, ConsumerQueue2), each corresponding to a different Topic queue.
- These ConsumerQueue hold offsets (commitLogOffset) that point to messages in the CommitLog. With these offsets, consumers can find and consume the appropriate messages from the CommitLog based on the ConsumerQueue.
- In each ConsumerQueue, minOffset indicates the minimum offset in the consumption queue, while maxOffset indicates the maximum offset in the consumption queue. consumerOffset is the current progress of consumption.
- The message status is labeled in three ways: unconsumed (red block), consumed (solid box), and waiting to be distributed (empty box).
-
Consumer and message consumption:
- Consumers consume messages by reading the consumerOffset in the corresponding ConsumerQueue. The process of consumption is done by updating the consumerOffset to record the progress of consumption.
- When a message is consumed, the consumerOffset is updated to the next unconsumed offset, and the state of the consumed message is marked as consumed in the ConsumerQueue.
News Brushes the Plate
In RocketMQ, the process of message flush disk can be done in two ways: synchronous flush disk and asynchronous flush disk.
This can be configured in the file by defaulting to asynchronous swiping of the disk:
flushDiskType = SYNC_FLUSH
# or
flushDiskType = ASYNC_FLUSH
- Synchronous Flush Disk
- Message Sending: The Producer sends a message to the Broker.
- Storage Processes:
- The Broker receives the message and writes it to Java heap memory first.
- The message is then transferred to virtual memory.
- Eventually, the messages in virtual memory are flushed to disk.
- Acknowledgement (ACK) mechanism:
- In synchronous flushing mode, the Broker will return an ACK to the Producer only after the message has been successfully written to disk, indicating that the message has been persisted successfully.
- The advantage of this approach is higher data security, ensuring that the data is persisted to disk before a successful confirmation is returned.
- The downside is lower performance, as you need to wait for the swipe operation to complete.
- Asynchronous Flush Disk
- Message Sending: The Producer sends a message to the Broker.
- Storage Processes:
- When the Broker receives a message, it also writes it to Java heap memory first.
- The message is then stored into virtual memory.
- In this mode, the Broker returns the ACK to the Producer as soon as the message is written to virtual memory.
- The actual flushing (writing to disk) of the message is done in an asynchronous multi-threaded manner.
- Acknowledgement (ACK) mechanism:
- Asynchronous swipe disk mode returns the ACK directly after the message is written to virtual memory without waiting for the result of the message being written to disk.
- The advantage of this approach is higher performance, the Producer can send messages faster, do not have to wait for the disk to complete.
- The disadvantage is that data security is low because there is a risk of data loss since the message is not yet persisted to disk immediately after the ACK is returned.
RocketMQ Feature Implementation Analysis
RocketMQ Delayed Messages
rockeketMQ supports 18 levels of latency levels with default values of "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
-
Message Subject Replacement (
SCHEDULE_TOPIC_XXX
)- When the Producer sends a delayed message, RocketMQ does not deliver the message directly to the user-specified destination.
Topic
Instead, RocketMQ takes these messages and sends them to RocketMQ. Instead, RocketMQ takes these messages'Topic
Replace the internal predefinedSCHEDULE_TOPIC_XXX
。 - The purpose of this is to differentiate these delayed messages and manage them through an internal scheduling mechanism.
SCHEDULE_TOPIC_XXX
is a special theme dedicated to storing delayed messages.
- When the Producer sends a delayed message, RocketMQ does not deliver the message directly to the user-specified destination.
-
Put into the corresponding queue according to the delay level
- RocketMQ's delayed messages are18 delay levelsThe delay time for each level is different (e.g., 1s, 5s, 10s, etc.).
- To manage these messages with different delay levels, the
SCHEDULE_TOPIC_XXX
Under the theme there will be18 queues, each queue corresponds to a delay level. For example:-
SCHEDULE_TOPIC_XXX
lowerQueue0
May correspond to a delay level of 1 second. -
Queue1
may correspond to a delay level of 5 seconds, and so on.
-
- When a message arrives at RocketMQ, the Broker takes into account the message's
delayLevel
attribute into the queue corresponding to the delay level.
-
Create timed tasks for scheduling per queue
- RocketMQ internally creates for each delay queue thetimed task. These timed tasks constantly check their corresponding delay queues to determine if any of them contain messages that have reached their delivery time.
- The main role of the timing task is to scan the storage of delayed messages at regular intervals and determine whether certain messages need to be restored for processing.
- For example, for
Queue0
(1-second delay level), the timed task scans once per second to see if there is a message that the delay time has arrived. If there is, the next action is performed.
-
Recovery expiration messages are re-delivered to the real
Topic
- When the timer task realizes that the delay time for a message has been reached, RocketMQ sends the message to theresumption to the user-specified real
Topic
。 - The process of recovery includes the following steps:
- through (a gap)
SCHEDULE_TOPIC_XXX
removes the message from the delayed queue of the - Modify the message's
Topic
Specify the original user for theTopic
, and re-delivered to the message queue based on the message's target topic, queue information. - Write the recovered message to the
CommitLog
and based on the message'sQueueId
and the originalTopic
into the corresponding Consumer Queue.
- through (a gap)
- In this way, when a delayed message reaches a specified time, it re-enters the actual consumption logic, and the consumer can consume these expiring delayed messages as if they were normal messages.
- When the timer task realizes that the delay time for a message has been reached, RocketMQ sends the message to theresumption to the user-specified real
RocketMQ Message Retry Mechanism
RocketMQ provides a way to send a message to a Consumer if it fails to consume the message.Retesting mechanismthat enables the message to be consumed again. The retry mechanism is implemented by redelivering failed messages to a specific retry queue. The following is a detailed analysis of RocketMQ message retries.
How the retry mechanism is implemented
-
Retry queue (
%RETRY%+consumerGroup
)- When the consumer fails to consume the message, RocketMQ puts the message into aA queue dedicated to retries. The name of this queue is
"%RETRY%+consumerGroup"
whichconsumerGroup
is the name of the consumer group. - It's important to note that the retry queue is for everyConsumer Group set, rather than for each
Topic
Setting. - The advantage of this is that an independent retry mechanism and tracking of consumption status can be implemented for each consumer group, ensuring that the retry messages of each consumer group do not interfere with each other.
- When the consumer fails to consume the message, RocketMQ puts the message into aA queue dedicated to retries. The name of this queue is
-
Delayed Retry Level
- RocketMQ sets up the retry queue with multipleRetry Level, and each retry level has a corresponding re-delivery delay time.
- Messages that fail to be consumed are not immediately re-delivered, but are deposited into an internal delay queue (
SCHEDULE_TOPIC_XXXX
). RocketMQ then resaves the message to the corresponding retry queue via a background timed task with a set delay level (%RETRY%+consumerGroup
) Medium. - The delay time for each retry is incremented to give the system some time to recover from the cause of the consumption failure. For example, the first retry might be after 10 seconds, the second after 30 seconds, and so on.
- This mechanism can effectively avoid excessive system load caused by frequent retries.
-
Retry message consumption
- When a message is deposited into the
"%RETRY%+consumerGroup"
After queuing up the message, RocketMQ will follow the normal consumption process and deliver the message to the consumer again for consumption. - If the consumer successfully consumes, the retry process ends; if it still fails to consume, the next retry will be performed based on the set number of retries.
- When a message is deposited into the
Strategies for Consumer Failure
RocketMQ has designed a series of retry policies for consumption failures to ensure that messages are consumed to the maximum extent possible in the event of a failure. The strategies are as follows:
-
Retries:
- RocketMQ defaults to consuming every message that fails.Retry up to 16 times。
- If the message is still not consumed after 16 attempts, RocketMQ assumes that the consumer is unable to consume the message successfully, and requires special handling.
-
Incremental retry interval:
- The retry interval for a message is incremented by an internal delay queue (
SCHEDULE_TOPIC_XXXX
) Done. - This incremental mechanism is designed to give the system enough time to solve the problems that lead to consumption failures, avoiding unnecessary resource consumption caused by repeated retries within a short period of time.
- For example, the first retry might be after 10 seconds, the second after 30 seconds, the third after 1 minute, and so on. The retry interval is gradually increased to maximize the successful consumption of the message.
- The retry interval for a message is incremented by an internal delay queue (
-
Dead Letter Queue (DLQ, Dead Letter Queue):
- If the message cannot be consumed after all the retries (e.g. 16), RocketMQ puts the message into a special queue calleddead letter queue(
Dead Letter Queue
)。 - The role of the dead message queue is to save those messages that cannot be consumed normally after several attempts for subsequent analysis and processing.
- dead letter queue
Topic
named"%DLQ%+consumerGroup"
, each consumer group has its corresponding dead letter queue.
- If the message cannot be consumed after all the retries (e.g. 16), RocketMQ puts the message into a special queue calleddead letter queue(
transaction message
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, , new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
("client-transaction-msg-check-thread");
return thread;
}
});
(executorService);
//Transaction Listener
(transactionListener);
();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % ], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = (msg, null);
("%s%n", sendResult);
(10);
} catch (MQClientException | UnsupportedEncodingException e) {
();
}
}
for (int i = 0; i < 100000; i++) {
(1000);
}
();
}
}
public interface TransactionListener {
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
RocketMQ provides aTransaction messaging mechanismTransaction messages are used to implement distributed transactions, ensuring that different components of a distributed system can consistently perform a set of operations. Transaction messages allow an application to send an acknowledgement message after a successful local transaction, allowing the messaging system to ensure that the message is eventually processed consistently by other systems. The following is a detailed analysis of the RocketMQ transaction messaging mechanism.
Transaction Messaging Implementation Principles
RocketMQ's transaction messages consist of three main steps:preprocessing message、Local transaction execution、Transaction Status Lookback。
-
Prepare Message
- When a Producer needs to send a transaction message, it first sends apreprocessing message to RocketMQ.
- This message is stored in the Broker, but its state is"Not confirmed", indicating that the transaction is still being processed and has not yet been formally committed.
- When the Broker receives a preprocessed message, it returns an acknowledgement to the Producer that the preprocessed message has been successfully stored.
-
Local Transaction Execution
- After the Producer receives confirmation of the success of a preprocessed message, it will start executing the correspondinglocal affairs. Local transactions are usually business logic operations of an application, such as database write operations.
- The result of the execution of a local transaction is one of two things:
- successes: Indicates that the local transaction was executed successfully, and the next step is to confirm that the message was formally committed.
- fail (e.g. experiments): Indicates that the execution of the local transaction failed and the transaction message needs to be rolled back.
-
Commit or Rollback Transaction Messages
- After the local transaction is executed, the Producer sends the result of the execution to the Broker as asubmit (a report etc) mayberolling back Message:
- Commit transaction message: Indicates that the local transaction was executed successfully, the Broker converts the preprocessed message to an official message, and the message status changes to consumable.
- Rolling Back Transaction Messages: Indicates that the execution of the local transaction failed, the Broker deletes the preprocessed message, and the message will not be received by the consumer.
- Committing a message or rolling back a message is an important step in determining whether a message can be consumed or not, ensuring the ultimate consistency of the message.
- After the local transaction is executed, the Producer sends the result of the execution to the Broker as asubmit (a report etc) mayberolling back Message:
-
Transaction Status Check
- If the Producer is unable to send a commit or rollback request due to a network or system failure, RocketMQ's Broker passes theTransaction Status Lookback to ensure the final state of the message.
- The Broker sends the ProducerTrackback requests, asks for the status of the local transaction corresponding to this preprocessed message.Producer returns the status, possibly based on the execution of the local transaction:
- COMMIT: Acknowledging the commit, the Broker converts the message into an official message.
- ROLLBACK: rollback message, the Broker deletes the preprocessed message.
- UNKNOWN: The Producer is unable to determine the status of the transaction, and the Broker will check back again until it confirms the result.
- The checkback mechanism ensures the reliability of the system, and even in the event of a failure, the final state of the message can be guaranteed through the checkback mechanism.
Simple example - e-commerce order system
When a user places an order for an item, the following three actions need to be performed simultaneously:
- Generate Order Record: Inserts a new order record into the order database.
- Deduct Inventory: Deduct the quantity of the corresponding item in stock in the inventory system.
- Notify the logistics system: Generate a new distribution request in the logistics system
These three operations involve the Order Service, Inventory Service, and Logistics Service and need to ensure data consistency between them. If an order is generated successfully, but the inventory deduction fails, or the notification to the logistics system fails, this can lead to data inconsistency problems.
To ensure that all three operations succeed consistently in a distributed environment, you can use RocketMQ's transactional messaging mechanism, as follows:
-
Sends a preprocessing message:
- When a user places an order, the order service first sends a Prepare Message to RocketMQ, signaling the start of a transaction to create the order.
- This message is stored in RocketMQ's Broker, but its status is "unacknowledged", waiting for the order service to complete the local transaction.
Perform local transactions:
-
Execution of local transactions
- The Order Service starts executing local transactions after sending a successful preprocessing message:
- Inserts order records into the order database.
- If the order insertion is successful, the stock service continues to be invoked and the stock quantity is deducted.
- After a successful inventory deduction, the logistics system is notified and a delivery request is generated.
- If all three of these operations succeed, the local transaction was successfully executed.
- Commit or rollback transaction messages:
- If the local transaction succeeds, the order service sends a commit transaction message to RocketMQ confirming that the preprocessed message is ready to be consumed, and RocketMQ delivers this message to the consuming group that subscribed to the Topic, such as the logistics service.
- If anything goes wrong during local transaction execution, such as a failed inventory deduction, the Order Service sends a rollback transaction message to RocketMQ, which deletes the preprocessed message, indicating that the order transaction failed and the message will not be delivered further.
- Transaction status lookback:
- If, due to a network anomaly or some other reason, RocketMQ waits for an acknowledgement (commit or rollback) from the order service for an extended period of time and does not receive an acknowledgement, RocketMQ initiates a transaction status lookup with the order service.
- RocketMQ asks the order service whether the local transaction corresponding to the previous preprocessed message succeeded or failed.
- The order service returns the appropriate status and commits the transaction message if the local transaction is confirmed to be successful, or rolls back the message if it fails.
User places order --> Order Service --> sends preprocessed message to RocketMQ --> RocketMQ stores the message (Prepare)
--> Order Service performs local transactions (generate order, deduct inventory, notify logistics).
--> If successful --> Order Service commits the transaction message --> RocketMQ message is formally delivered to the logistics service.
--> If failed --> Order Service Rollback message --> RocketMQ deletes the preprocessed message.
--> If unacknowledged --> RocketMQ initiates a transaction status rollback --> Order Service returns to final state