Location>code7788 >text

Troubleshooting a RabbitMQ Consumer Disappearing Issue

Popularity:765 ℃/2024-09-23 08:52:45

Happy Moment

Today, my best friend asked me to borrow money.
Buddy: Bro, I'm in a tight spot lately, can you lend me some...
Me: I don't have a lot of money on hand either, why don't you try a bank loan or spending money?
Dude: No, you have to pay back what you borrowed.
Me: ...

开心一刻

Review of issues

One afternoon, the production monitor alerted:Message backlog, queue xxx messages over 100The first thing I thought of was whether the application service had stopped, but the application service survival monitor did not have an alarm, but I still looked for the on-duty operation and maintenance colleagues to help confirm the result is that the service's 6 nodes are alive, and then I let the operation and maintenance to confirm the situation of the queue's consumers, and found that there are only 2 nodes in the consumer list of the consumer, and the other 4 nodes of the consumer is missing, so the message The other 4 nodes are missing, so the messages are not being consumed, resulting in a message backlog!

So here's the question.

Why did the consumers registered in those 4 nodes disappear?

But the priority is to solve the message backlog problem, so let the operation and maintenance restart the service of those 4 nodes, the consumer re-registration on the message can be quickly consumed, the message backlog alarm can be restored!

Although the production problem was temporarily solved, the root cause was not found, and there is still a risk of recurrence; please follow my script below to see how I troubleshoot

problem screening

direct accessERROR level, it is easy to find the key logs.

Consumer thread error, thread abort.

and the exception stack

: Requested array size exceeds VM limit
	at $(:300)
	at (:344)
	at (:918)
    ...

Consumer thread error, thread abort You can read it, right? It's literal.

Consumer thread error, thread aborted

The consumer thread is the queue consumer we mentioned earlier, a queue consumer is a consumer thread, and the abort of the consumer thread means the abort of the queue consumer, which corresponds to the title of the article in theDisappearance of consumersIs it getting closer to the truth?

OutOfMemoryError It's familiar. Memory overflow.

OutOfMemoryError indicates that the Java Virtual Machine does not have enough space in heap memory to allocate the object

Let me ask you guys a question:OOM Does it always cause the JVM to exit? Think about that for a moment, the answer will be revealed later.

Back to the point, from the key logs as well as the exception stack, can't we come up with the following speculation

OOM causes the consumer thread to abort

Having speculated, let's go ahead and test it; I'll start by modeling the case for you, based on theSpringBoot Key dependencies

<dependencies>
    <dependency>
        <groupId></groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <dependency>
        <groupId></groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

configuration file

server:
  port: 8088
spring:
  rabbitmq:
    host: 192.168.2.118
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual #Setting Confirmation Mode Manual Confirmation
        concurrency: 3 #Number of consumers,thread count
        prefetch: 1

RabbitMQ configure

package ;

import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;

/**
 * @author: greenstone road
 */
@Configuration
public class TaskRabbitConfig {

    @Value("${:3}")
    private int concurrency;
    @Value("${:1}")
    private int prefetch;

    @Bean
    public DirectExchange taskExchange() {
        return new DirectExchange(Constant.TASK_EXCHANGE, true, false);
    }

    @Bean
    public Queue taskQueue() {
        return new Queue(Constant.TASK_QUEUE, true, false, false);
    }

    @Bean
    public Binding bindingTaskQueue() {
        return (taskQueue()).to(taskExchange()).with(Constant.TASK_QUEUE);
    }

    @Bean
    public SimpleMessageListenerContainer taskMessageListenerContainer(ConnectionFactory connectionFactory) {

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        (connectionFactory);
        //Setting Confirmation Mode Manual Confirmation
        ();
        (Constant.TASK_QUEUE);
        //Number of consumers,thread count
        (concurrency);
        //Setting the number of preprocessors
        (prefetch);

        (new TaskMessageListener());
        return container;
    }
}

message listener

/**
 * @author: greenstone road
 */
@Slf4j
public class TaskMessageListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) {
        String content = new String((), StandardCharsets.UTF_8);
        ("Consumer receives the message:{}", content);
        handleTask(content);
        try {
            // manually operatedACK
            (().getDeliveryTag(), false);
        } catch (IOException e) {
            ("Message Acknowledgement Failure,exceptions:", e);
        }
    }

    private void handleTask(String message) {
        try {
            // business processing
            ("Processing tasks:{}", message);
            ("Task processing completed");
        } catch (Exception e) {
            ("Processing tasks失败,exceptions:", e);
        }
    }
}

The business process is conducted at the time ofException Is there something wrong with capturing and manually confirming the message, which I'm sure is how you guys usually use it? I'll tweak it.handleTask methodologies

/**
 * Business Processing
 * @param message Message content
 * @author: greenstone road
 */
private void handleTask(String message) {
    try {
        // Business Processing
        ("Handling task: {}", message); int i = 3 / (() % 10); int
        int i = 3 / (() % 10); if (i == 1) {
        if (i == 1) {
            throw new OutOfMemoryError("Simulating a memory overflow");
        }
        ("Task processing result: {}", i); }
    } catch (Exception e) {
        ("Failed to process task, exception:", e); }
    }
}

After starting the service, the queue consumers are as follows

服务启动队列消费者情况

send a messageaThe log output is as follows

2024-09-22 20:15:55|taskMessageListenerContainer-2||INFO|20|Consumer received message: a
2024-09-22 20:15:55|taskMessageListenerContainer-2||INFO|37|Processing task: a
2024-09-22 20:15:55|taskMessageListenerContainer-2||INFO|42|Task processed: 3

Equivalent to business as usual; we then send the messageabcdefghijThe log output is as follows

2024-09-22 20:17:45|taskMessageListenerContainer-3||INFO|20|Consumer received message: abcdefghij
2024-09-22 20:17:45|taskMessageListenerContainer-3||INFO|37|Processing task: abcdefghij
2024-09-22 20:17:45|taskMessageListenerContainer-3||ERROR|44|Processing task failed, exception:
: / by zero
at (:38)
at (:21)
at (:1591)
at (:1510)
at (:1498)
at (:1489)
at (:1433)
at (:975)
at (:921)
at $1600(:83)
at $(:1296)
at $(:1202)
at (:748)

The divisor is 0 and the occurrence ofArithmeticException

ArithmeticException

Equivalent to business processes appearingExceptionAnd we carried outcatchSo the log printing is also logical to our code and doesn't affect the consumer threads, the queue consumers are still the same as the initial 3

还是那三个消费者

We send a message.abThe log output is as follows

2024-09-22 20:36:31|taskMessageListenerContainer-1||INFO|20|Consumer receives the message:ab
2024-09-22 20:36:31|taskMessageListenerContainer-1||INFO|37|Processing tasks:ab
2024-09-22 20:36:31|taskMessageListenerContainer-1||ERROR|1268|Consumer thread error, thread abort.
: Simulating Memory Overflow
	at (:40)
	at (:21)
	at (:1591)
	at (:1510)
	at (:1498)
	at (:1489)
	at (:1433)
	at (:975)
	at (:921)
	at $1600(:83)
	at $(:1296)
	at $(:1202)
	at (:748)
2024-09-22 20:36:31|taskMessageListenerContainer-2||INFO|20|Consumer receives the message:ab
2024-09-22 20:36:31|taskMessageListenerContainer-2||INFO|37|Processing tasks:ab
2024-09-22 20:36:31|taskMessageListenerContainer-2||ERROR|1268|Consumer thread error, thread abort.
: Simulating Memory Overflow
	at (:40)
	at (:21)
	at (:1591)
	at (:1510)
	at (:1498)
	at (:1489)
	at (:1433)
	at (:975)
	at (:921)
	at $1600(:83)
	at $(:1296)
	at $(:1202)
	at (:748)
2024-09-22 20:36:31|taskMessageListenerContainer-3||INFO|20|Consumer receives the message:ab
2024-09-22 20:36:31|taskMessageListenerContainer-3||INFO|37|Processing tasks:ab
2024-09-22 20:36:31|taskMessageListenerContainer-3||ERROR|1268|Consumer thread error, thread abort.
: Simulating Memory Overflow
	at (:40)
	at (:21)
	at (:1591)
	at (:1510)
	at (:1498)
	at (:1489)
	at (:1433)
	at (:975)
	at (:921)
	at $1600(:83)
	at $(:1296)
	at $(:1202)
	at (:748)
2024-09-22 20:36:31|taskMessageListenerContainer-1||ERROR|1415|Stopping container from aborted consumer
2024-09-22 20:36:31|taskMessageListenerContainer-1||INFO|646|Waiting for workers to finish.
2024-09-22 20:36:31|taskMessageListenerContainer-1||INFO|649|Successfully waited for workers to finish.

As you can see, in addition to our business logs, thespring The log shows that the message was consumed 3 times, but without exception, the consumption failed.

Consumer thread error, thread abort.

Stopping container from aborted consumer

Let's go back to the queue consumer situation

消费者无了

Let's run through the process

Consumer thread taskMessageListenerContainer-1 receives a message, business processing is OOM, Spring aborts the thread, the message is not manually acknowledged, and it returns to the queue waiting to be consumed.
Consumer thread taskMessageListenerContainer-2 receives a message, business processing is OOM, Spring aborts the thread, the message is not manually acknowledged, and it returns to the queue waiting to be consumed.
Consumer thread taskMessageListenerContainer-3 receives a message, business processing throws OOM, Spring aborts the thread, the message is not manually acknowledged, and it returns to the queue waiting to be consumed.

All 3 consumer threads are aborted by Spring, the corresponding 3 queue consumers are gone, and the messages end up back in the queue, waiting for the next ready consumer to consume them.

Didn't we catch the Exception, why is the OutOfMemoryError still thrown up to Spring?

OutOfMemoryError

OutOfMemoryError YesErrorIt's not.Exception, so our code doesn't catch the OutOfMemoryError, it continues up the chain to Spring, and the

#run

There is a piece of code in the

Spring对Error处理

publishConsumerFailedEvent Post a consumer failure event, the event handler receives the event will abort the thread; not to expand on this, I'll write another source code to give you a good introduction to Spring's abort logic

At this point, OutOfMemoryError will cause the consumer thread to abort, we are clear about it; careful partners may have such questions!

Shouldn't all 6 node consumer threads in production be aborted, why are there 2 node consumers left?

These 2 nodes have sufficient memory, so the JVM's heap memory is configured to be larger, and their consumer threads will not be OOM when processing messages; on the same day, the business people were processing a large amount of historical data, and a few rounds of operations down the line, the consumers of the 4 nodes with smaller memory were all dried up, and only the consumers of the 2 nodes with larger memory were left!

The root cause is actuallyOutOfMemoryErrorThe only thing we know at the moment is that it's

#writeValueAsString

This method leads to a specific cause that has yet to be further investigated

Troubleshooting

Because the cause of the OutOfMemoryError was not found, and because the OutOfMemoryError was triggered in a scenario that rarely occurs, such as operating on a large amount of historical data, and didn't result in a restart of the service, the tentative approach is to also capture the ERROR

/**
 * Business Processing
 * @param message Message content
 * @author: greenstone road
 */
private void handleTask(String message) {
    try {
        // Business Processing
        ("Handling task: {}", message); int i = 3 / (() % 10); int
        int i = 3 / (() % 10); if (i == 1) {
        if (i == 1) {
            throw new OutOfMemoryError("Simulating a memory overflow");
        }
        ("Task processing result: {}", i); }
    } catch (Exception | Error e) {
        ("Failed to process task, exception:", e); }
    }
}

Restart the service to continue consuming that unconsumed message in the queueabAt this time, the log output is as follows

2024-09-22 21:38:57|taskMessageListenerContainer-2||INFO|20|Consumer received message: ab
2024-09-22 21:38:57|taskMessageListenerContainer-2||INFO|37|Processing task: ab
2024-09-22 21:38:57|taskMessageListenerContainer-2||ERROR|44|Failed to process task, exception:
: Simulating a memory overflow
at (:40)
at (:21)
at (:1591)
at (:1510)
at (:1498)
at (:1489)
at (:1433)
at (:975)
at (:921)
at $1600(:83)
at $(:1296)
at $(:1202)
at (:748)
2024-09-22 21:38:57|main||INFO|61|Started RabbitmqApplication in 1.045 seconds (JVM running for 1.515)

Although the business process still fails, there is only error log output that matches the logic of our code, and there is no Spring error log, at this point the queue consumer situation is as follows

问题处理

Of course, this is only a delaying tactic, the ultimate solution is to analyze the causes of OOM, and then the right medicine!

summarize

  1. Sample code:spring-boot-rabbitmq

  2. OOM doesn't necessarily cause the JVM to exit, but the SimpleMessageListenerContainer will catch it and abort the current thread, and the corresponding queue consumer will be lost.

    啪,快乐没了
  3. The business code catch Error is only a stopgap measure, but it's a good solution for robustness reasons

    However, the causes of OOM must continue to be investigated, and then the right remedy, which is the ultimate solution to the problem