Location>code7788 >text

RocketMQ practical combat—6. Production optimization and operation and maintenance plan

Popularity:539 ℃/2025-02-07 23:35:39

Outline

How to control the permission mechanism of the cluster

2. How to track message stacking on RocketMQ cluster

3. How to deal with the million-message backlog of RocketMQ

4. Financial-grade high-availability solutions for RocketMQ cluster crashes

5. Add message flow limit function to RocketMQ to ensure its high availability

6. Double-write and double-read solution for migration from Kafka to RocketMQ

 

How to control the permission mechanism of the cluster

(1) Necessity for permission control of RocketMQ

(2) Steps to implement permission control in RocketMQ

 

(1) Necessity for permission control of RocketMQ

If a company has many technical teams, each technical team will use some of the Topics in the RocketMQ cluster. Then there may be a problem at this time: if the Topic used by the order team is accidentally written into the wrong dirty data by the product team, it may cause the data in the Topic of the order team.

 

Therefore, at this time, you need to introduce permission function in RocketMQ, which means that users of the order team can only use "OrderTopic". Then the users of the product team can only use "ProductTopic", and each team cannot use each other's Topic.

 

(2) Steps to implement permission control in RocketMQ

Step 1:First, you need to put an additional ACL permission control configuration file on the Broker side. The configuration file needs to specify permissions, including which users have what operation permissions to which Topics, so that each broker knows the permissions of each user.

 

Step 2:Then, in the configuration file of each Broker, you need to set aclEnable=true to enable permission control.

 

Step 3:Then put a plain_acl.yml configuration file in the directory of each Broker machine. This directory is ${ROCKETMQ_HOME}/store/config. The specific permissions of this configuration file are as follows:

# This parameter is a global whitelist
 # The IP addresses defined here can all access Topic
 globalWhiteRemoteAddresses:
 - 13.21.33.*
 - 192.168.0.*

 # This account means that you can define many accounts here
 # Each account can configure which Topics have some operation permissions here
 accounts:

 # This accessKey actually means the user name, for example, we are called "Order Technology Team" here
 - accessKey: OrderTeam

 # This secretKey is actually the password of this username
 secretKey: 123456

 # The following is which machines under the current username should be added to the whitelist
 whiteRemoteAddress:

 # admin refers to whether this account is an administrator account
 admin: false

 # This refers to the Topic permissions and ConsumerGroup permissions of this account by default
 defaultTopicPerm: DENY
 defaultGroupPerm: SUB

 # This is the specific permissions for this account to pile up some accounts
 # The following is to say that the current account has PUB|SUB permissions for both Topics, which is the permissions for publishing and subscribing.
 # PUB is the permission to publish messages, SUB is the permission to subscribe to messages
 # DENY means denying your account access to this Topic
 topicPerms:
 - CreateOrderInformTopic=PUB|SUB
 - PaySuccessInformTopic=PUB|SUB

 # The following is the permissions to ConsumerGroup, which is the same
 groupPerms:
 - groupA=DENY
 - groupB=PUB|SUB
 - groupC=SUB
# Below is another account, such as the account of the product technology team
 - accessKey: ProductTeam
 secretKey: 12345678
 whiteRemoteAddress: 192.168.1.*

 # If admin is set to true, it has all permissions
 admin: true

What you need to note in the above configuration is that if an account does not explicitly specify permissions for a Topic, then the default Topic permissions will be adopted.

 

Step 4:Finally, among the producer and the consumer, specify the assigned RocketMQ account number. In this way, when the producer or consumer uses an account, they can only access the authorized Topic.

DefaultMQProducer producer = new DefaultMQProducer(
    "OrderProducerGroup",
    new AclClientRPCHook(new SessionCredentials(OrderTeam, "123456"))    
);

The above code is to pass an AclClientRPCHook instance when creating the Producer. You can set the account password of this Producer in AclClientRPCHook, the same is true for creating a Consumer. In this way, you can set up access rights to Topic for each account on the Broker side, and then different technical teams can use different accounts.

 

2. How to track message stacking on RocketMQ cluster

(1) Steps to enable the message track function of RocketMQ

(2) Processing flow of message track after configuring the message track function

 

(1) Steps to enable the message track function of RocketMQ

Sometimes you need to understand the message trajectory of a message to help troubleshoot online problems. For example, if you want to know when the message was sent from which Producer, when it entered which topic of which Broker, and when it was consumed by which Consumer. At this time, you can use RocketMQ's message track function, and the configuration steps are as follows:

 

Step 1:First, enable the message tracking function in the Broker configuration file, that is, set traceTopicEnable = true. After turning on this function, an internal Topic: RMQ_SYS_TRACE_TOPIC will be automatically created when starting Broker. This Topic will store the track tracking data of all messages.

 

Step 2:Then, when sending messages and consuming messages, the function of message tracking is enabled. That is, when creating Producer and Consumer, the second parameter of its constructor enableMsgTrace is set to true.

DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup", true);

(2) Processing flow of message track after configuring the message track function

After Broker, Producer, and Consumer are all configured with news tracking:

 

First, when the Producer sends a message, it will report the track data of the message to RMQ_SYS_TRACE_TOPIC. The data reported at this time include: Producer information, the time of sending the message, whether the message was successfully sent, and the time spent sending the message.

 

Then, after the message reaches the Broker, the Broker will also record the message trajectory data. The data recorded at this time include: the Topic of the message storage, the location of the message storage, the key and tags of the message.

 

Then, when the message is consumed by the Consumer, the Consumer will also report some trajectory data to the RMQ_SYS_TRACE_TOPIC. The data reported at this time include: Consumer information, the time of delivery of the message, which round of message delivery is this, whether the message consumption is successful, and the time spent on consuming the message.

 

Finally, if you want to query the message track, you only need to query it in the RocketMQ console. There is a message track in its navigation bar where you can create query tasks. You can query based on messageId, message key or Topic. After the query task is executed, you can see the interface of the message track. The trajectory data reported by Producer, Broker, and Consumer will be displayed in the message track interface.

 

3. How to deal with the million-message backlog of RocketMQ

(1) Case background of the problem of message backlog

(2) Directly discard messages to solve the message backlog problem

(3) Expand consumers on old Topic to solve the problem of message backlog

(4) Solve the message backlog problem through the expansion of new Topic

(5) Summary of handling the problem of message backlog

 

(1) Case background of the problem of message backlog

There was once a system that consisted of two parts: producer and consumer. The producer is responsible for constantly writing messages into RocketMQ, and the consumer is responsible for consuming messages from RocketMQ. This system has peak and low periods when running. During the peak period of a few hours at night, there will be about 1 million messages entering RocketMQ. In addition, after consumers obtain messages from RocketMQ, they will rely on the NoSQL database to process some business logic.

 

One night, the NoSQL database that consumers rely on hangs up, causing consumers to be unable to continue to consume data from RocketMQ for processing. Then the producers wrote more than 1 million messages to RocketMQ during the peak hours at night, all of which were backlogged.

 

There are generally the following solutions to deal with such urgent online accidents.

 

(2) Directly discard messages to solve the message backlog problem

If these messages are allowed to be lost, then the consumer's code can be urgently modified: all retrieved messages are discarded directly in the code without any processing. This can quickly make the millions of messages accumulated in RocketMQ processed, but the processing method is to discard all of them.

 

(3) Expand consumers on old Topic to solve the problem of message backlog

If these messages are not allowed to be lost, you can wait for the NoSQL database that the consumer depends on to recover. After the recovery, you can decide how to deal with it based on the number of MessageQueues of the online Topic.

 

Assuming that there are 20 MessageQueues online, and then only 4 consumers are consuming, each consumer will get the message from 5 MessageQueues. At this time, if you rely solely on 4 consumers to consume, you will definitely continue to accumulate news. After all, there are millions of news in RocketMQ.

 

Therefore, at this time, you can temporarily apply for 16 machines to deploy 16 consumer instances, and then allow 20 consumers to consume at the same time. Each consumer consumes a MessageQueue message, and the consumption speed will be increased by 5 times, and the backlog of millions of messages will be processed soon.

 

However, it is necessary to consider that the NoSQL database that consumers rely on must be able to withstand the temporary increase of 5 times of read and write pressure, because there were only 4 consumers reading and writing NoSQL, but now it has temporarily become 20 consumers. After processing the million-dollar backlog of messages, you can remove the extra 16 machines.

 

This is the most common way to deal with millions of messages backlogs.

 

(4) Solve the message backlog problem through the expansion of new Topic

What if Topic has only 4 MessageQueues in total, and then only 4 consumers? At this time, there is no way to expand consumers, because no matter how many consumers are added, there are still only 4 MessageQueues, which cannot reduce the consumption pressure of the original consumers.

 

Therefore, at this time, you need to temporarily modify the code of the four consumers so that they do not rely on NoSQL after obtaining the message, and directly write the message to a new Topic. The speed is very fast at this time, because it is just reading and writing RocketMQ. . Then the new Topic will have 20 MessageQueues, so 20 temporary consumers will deploy to consume new Topics, and only rely on NoSQL when consuming new Topics. Solve the problem of not scaling consumers by transferring backlogs to a new Topic.

 

(5) Summary of handling the problem of message backlog

If there are many MessageQueues and can directly expand consumers, then temporarily increase consumer instances to expand consumers.

 

If there are fewer MessageQueues and cannot directly expand consumers, then the messages backlogged in the original Topic will be written to the new Topic. When consuming new Topic messages, temporarily deploy enough consumer instances to achieve indirect expansion of consumers.

 

4. Financial-grade high-availability solutions for RocketMQ cluster crashes

If a financial-level system relies on a RocketMQ cluster, how should we design a high-availability solution when the RocketMQ cluster crashes?

 

A highly available downgrade solution is usually designed in a system that sends messages to RocketMQ. The idea of ​​this downgrade solution is as follows:

 

In the code that sends a message to RocketMQ, catch the exception through try catch, and try again if it is found. If you still fail to retry 3 times in a row, it means that the RocketMQ cluster may completely crash. At this time, this important message needs to be persisted: it can be a database, local disk file, and NoSQL storage. After that, you need to constantly try to send messages to RocketMQ. Once you find that the RocketMQ cluster is restored, you will query the previously persisted stored messages through the background thread and then send them to RocketMQ in order, so as to ensure that the messages will not completely crash because of the RocketMQ cluster. Lost.

 

Note: When persisting messages, ensure their order.

 

As long as you use this solution, even if the RocketMQ cluster suddenly crashes, the system will not lose messages. This highly available solution design is very necessary for some money-related financial systems and advertising systems.

 

5. Add message flow limit function to RocketMQ to ensure its high availability

Why add current limiting function to RocketMQ to ensure its high availability? Because the current limiting function can provide protection for RocketMQ, it avoids failures caused by writing large amounts of data to RocketMQ in a short time due to bugs and other reasons.

 

For example, the following code causes a message to RocketMQ in a while loop for some reason. If the system is deployed on more than 10 machines, then more than 10 machines may frequently write messages to RocketMQ, which instantly causes the TPS of the RocketMQ cluster to soar, overwhelming the RocketMQ cluster.

try {
     //Business code
     (message);
 } catch (Exception e) {
     while (true) {
         (message);
     }
 }

Therefore, in this case, the source code of RocketMQ is generally modified. When Broker receives messages, a current limiting mechanism is introduced. Only allow many messages to be written in one second to avoid the RocketMQ cluster hangs up due to some exceptions.

 

6. Double-write and double-read solution for migration from Kafka to RocketMQ

Assuming that the MQ used by the system was Kafka, and now it is necessary to migrate from Kafka to RocketMQ, then how should this migration process be done?

 

First, double writing is required, that is, in all Producer systems, messages are written to Kafka and RocketMQ at the same time. Usually, double writing will last for about 1 week, because the data in MQ will be kept for up to one week. After the double write lasts for a week, the data in Kafka and RocketMQ are basically the same.

 

But double writing is not enough, and double reading is also required at the same time. While writing double, all Consumer consumers must obtain messages from Kafka and RocketMQ at the same time and process them with the same logic. However, the messages obtained from Kafka are still processed by the core logic and fall into the database or other storage. Although the messages obtained from RocketMQ are processed using the same logic, the processing results will not be dropped into the database or other storage.

 

When Consumer consumes messages, it is necessary to count the number of messages read and processed from Kafka and RocketMQ every day, and record the corresponding message processing results into a temporary storage. After a period of time, it can be compared whether the number of messages read and processed from Kafka and RocketMQ is consistent and whether the processed message results are consistent. If so, then you can make a formal switch.

 

Basically, for migrations similar to middleware, this dual-write and double-read solution will be adopted. After writing double for a period of time, then observe whether the results are consistent. If so, then switch again.