Location>code7788 >text

Troubleshooting redisson memory leaks

Popularity:386 ℃/2024-09-24 10:19:46

Description of the problem

Recently, the production of a service suddenly appeared frequent alarms, interface P99 response time becomes longer, the operation and maintenance students observed that the corresponding pod cpu spike, memory occupation is very high.
Troubleshooting elevated cpu problems is a well-worn topic, and can generally be accomplished using thetop -p pid -HSee which thread is hogging the cpu high and combine it with thejstackFind the corresponding java thread code.
However, experience tells us that there is another, more common reason for cpu elevation, insufficient memory leading to frequent gc. The garbage collector reclaims memory and then quickly runs low again, continues to reclaim, looping the process, and with STW involved during gc, the user thread hangs, and the response time naturally increases. The lack of memory here can be a normal service itself does not have enough memory, or it can be an abnormal program bug that leads to memory overflow.
As expected, the node's full gc time was steep at that time, and byjstat -gcutil pid 500 30You can also see that fc is very frequent. As shown in the picture:

This problem actually appeared at the beginning of the month, when the R & D students and operation and maintenance students through the reboot temporary solution, today appeared again, it seems not simply through the "reboot method" can be solved, this time we need to analyze and solve it.

investigation process

This time we export the heap for analysis via heap dump, command:

jmap -dump:format=b,file=./ pid

Use the jdk's ownvirsualvmmaybeidea virsualvm launcherThe plugin opens the heap file to see

Obviously, related to redisson, the version we use is3.17.1The code for this is as follows! There aren't many places where the lookup service involves redisson, and there's only one place with a high and suspicious call volume, so the simplified code is as follows:

RLock lock = ("mytest");
(50, 100, );

// Business code...

RLock lock2 = ("mytest");
if (() && ()) {
  ();
}

Let's start by briefly analyzing the source code for RedissonLock tryLock and unlock, with notes added in the main places.

    @Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = (waitTime);
        long current = ();
        long threadId = ().getId();
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // Getting the lock,Return Success
        if (ttl == null) {
            return true;
        }
        
        time -= () - current;
        if (time <= 0) {
            //Or can't get the lock.,and exceeds the waiting time,Return Failure
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
        current = ();
        //Subscribe to lock release messages,subscribeIt's the centerpiece of this!!!
        CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        try {
            (time, );
        } catch (ExecutionException | TimeoutException e) {
            //overtime pay,Failed to acquire lock
            if (!(false)) {
                ((res, ex) -> {
                    if (ex == null) {
                        unsubscribe(res, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
            time -= () - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        
            //Lock released.,还未overtime pay,Spin tries to acquire
            while (true) {
                long currentTime = ();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // Getting the lock,Return Success
                if (ttl == null) {
                    return true;
                }

                time -= () - currentTime;
                if (time <= 0) {
                    //Or can't get the lock.,and exceeds the waiting time,Return Failure
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // Waiting for lock release
                currentTime = ();
                if (ttl >= 0 && ttl < time) {
                    (subscribeFuture).getLatch().tryAcquire(ttl, );
                } else {
                    (subscribeFuture).getLatch().tryAcquire(time, );
                }

                time -= () - currentTime;
                if (time <= 0) {
                    //Or can't get the lock.,and exceeds the waiting time,Return Failure
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            //unsubscribe
            unsubscribe((subscribeFuture), threadId);
        }
    }
    @Override
    public RFuture<Void> unlockAsync(long threadId) {
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        CompletionStage<Void> f = ((opStatus, e) -> {
            //Elimination of the lock-up period
            cancelExpirationRenewal(threadId);
            //...
        });

        return new CompletableFutureWrapper<>(f);
    }

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), , RedisCommands.EVAL_BOOLEAN,
                "if (('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = ('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "('del', KEYS[1]); " +
                        "('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                (getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

The redisson plus unlock primarily utilizes thelua scriptand redisPublishing Subscription MechanismsThe data structures used arehash
Lua scripts ensure atomicity of multiple command executions without concurrency issues.
When using synchroized/lock in java code to add a lock fails, you can put the thread into a chain table and wait for a wakeup to reacquire the lock. In the use of redis distributed system, the use of publish-subscribe mechanism, by subscribing to the channel, when the lock is released to reacquire the lock. redis publish-subscribe with our use of kafka and other mq middleware is the same principle, in practice, you can also use the redis publish-subscribe mechanism to realize the mq functionality, the following channel is equivalent to the mq in the topic. The relevant command is:

  • PUBLISH channel message, publish a message to the channel.
  • SUBSCRIBE channel [channel ...] , subscribes to channel and the client is notified when there is a message from channel.
  • UNSUBSCRIBE [channel [channel ...]] Unsubscribe
  • PSUBSCRIBE pattern [pattern ...] , subscribe to the channel that matches the pattern
  • PUNSUBSCRIBE [pattern [pattern ...]] , unsubscribe from the channel that matches the pattern

What follows is our troubleshooting process:

Suspecting a writing problem

Returning to our code, the first thing that strikes the eye as questionable is that locking and unlocking are not using the same object, which would be problematic if redisson locking and unlocking were related to object state.
But from the source code analysis can be seen, the unlocking logic is very simple, the main use of the thread id, this is not changed. Of course, this writing method should be amended, in addition to misleading people, there is no need to create a lock object. In addition, the time to hold the lock is set to 100ms is too short, although the business logic is processed very quickly, but if hold the lock during the occurrence of full gc, the lock will expire, other threads will be able to obtain the lock, concurrent execution.

Suspect network problems

Since the problem is not frequent, one or two times a month, so I doubt that it is triggered by some special conditions, such as network jitter, master-slave switching and other abnormalities. Contact dba students learned that the day before the redis network did appear jitter, combined with production logs found that the day before the two problems in August, there are redis exceptions, redisson github also has some related discussions, which is more firm in my speculation, in the case of network exceptions may be triggered by a certain bug, resulting in memory overflow, to verify that this is a waste of a lot of our time. time to verify this.

There are two main types of network problems, direct disconnection and read timeout. Connection directly disconnected we even development environment of redis is very good simulation, directly disconnected from the intranet can be. Read timeouts can be simulated by logging into the redis server using redis-cli and then using theclient pausecommand to block the client, the following will block all client requests for 10s, this command is often used in some of my usual simulation tests.

client pause 10000

Then write code loop test, use jvirsualvm to observe the memory objects, and found that there is no problem, redisson related objects are very low percentage, and can be reclaimed by gc.

for (int i = 0; i < 10000000; i++) {
  // Paste in the previous code
}

source code analysis

The previous source code analysis is the outermost, simplest part, not enough to help us find the problem. From the previous subscribe method into, there is a lot of internal logic to do concurrency control and publish subscription-related logic.
Going to subscribe will callPublishScribemethod, followed by a call to theAsyncSemaphorejdk's Semaphore we are very familiar with, AsyncSemaphore is an asynchronous form, the use of signals is the most critical to apply to the license to use the end, you have to call the release method to return, otherwise other applicants will not be able to apply to the license again.

    public CompletableFuture<E> subscribe(String entryName, String channelName) {
        AsyncSemaphore semaphore = (new ChannelName(channelName));
        CompletableFuture<E> newPromise = new CompletableFuture<>();

        (() -> {
            if (()) {
                ();
                return;
            }

            E entry = (entryName);
            if (entry != null) {
                ();
                //1.release permit
                ();
                //...
                return;
            }

            E oldValue = (entryName, value);
            if (oldValue != null) {
                //2.release permit
                ();
                //...
                return;
            }

            RedisPubSubListener<Object> listener = createListener(channelName, value);
            CompletableFuture<PubSubConnectionEntry> s = (, channelName, semaphore, listener);
            //...
        });

        return newPromise;
    }

The main AsyncSemaphore code is as follows, permits is 1 and listeners is an unbounded queue. In the exception instance we dumped out there is an AsyncSemaphore lambda object, and also a CompletableFuture lambda object, which looks like a high match to here, and the probability is that this is where the problem is, and it should be that in some case, release was not called after acquire, causing other threads to The call to declareAndGet is <=0, which makes it impossible to execute () to remove the element, and eventually the listener queue gets more and more elements until the memory overflow.

public class AsyncSemaphore {
    private final AtomicInteger counter;
    private final Queue<CompletableFuture<Void>> listeners = new ConcurrentLinkedQueue<>();

    public AsyncSemaphore(int permits) {
        counter = new AtomicInteger(permits);
    }

    public CompletableFuture<Void> acquire() {
        CompletableFuture<Void> future = new CompletableFuture<>();
        (future);
        tryRun();
        return future;
    }

    public void acquire(Runnable listener) {
        acquire().thenAccept(r -> ());
    }

    private void tryRun() {
        while (true) {
            if (() >= 0) {
                CompletableFuture<Void> future = ();
                if (future == null) {
                    ();
                    return;
                }

                if ((null)) {
                    return;
                }
            }

            if (() <= 0) {
                return;
            }
        }
    }

    public void release() {
        ();
        tryRun();
    }
}

There is also something to be said about Semaphore, if you acquire once, but the program exceptionally calls release multiple times, it will result in a license overrun, and subsequent acquires can apply for a license to execute. The solution can be found in rocketmqSemaphoreReleaseOnlyOnce, which encapsulates the Semaphore and maintains an AtomicBoolean that is guaranteed to be released only once.

Going back to the subscribe method above, there are two normal calls to release, and one that goes into thePublishSubscribeServie(used form a nominal expression)subscribeNoTimeout(, channelName, semaphore, listener)methodologies,The point here is that the type of topicType passed is

    public CompletableFuture<PubSubConnectionEntry> subscribeNoTimeout(Codec codec, String channelName,
                                                              AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
        CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>();
        //recount (e.g. results of election):
        subscribeNoTimeout(codec, new ChannelName(channelName), getEntry(new ChannelName(channelName)), promise,
                        , semaphore, new AtomicInteger(), listeners);
        return promise;
    }

The logic inside is more complex, interested students can analyze their own analysis, but we are concerned that each branch will eventually need to call.
Along these lines, I eventually found a place where the release method may not have been called: #unsubscribe.
The unsubscribe method executes () on complete, which is called in the BaseRedisPubSubListener callback, and only executes if the if condition holds. Earlier we said that the topicType of the passed record is subscribe, and here BaseRedisPubSubListener handles theunsubscribecap (a poem)punsubscribetype that doesn't correspond anymore, which results in whenComplete not executing and () not executing.

 private CompletableFuture<Void> addListeners(ChannelName channelName, CompletableFuture<PubSubConnectionEntry> promise,
            PubSubType type, AsyncSemaphore lock, PubSubConnectionEntry connEntry,
            RedisPubSubListener<?>... listeners) {

        //...
        ((res, e) -> {
            if (e != null) {
                ();
                return;
            }

            if (!(connEntry)) {
                if (!(channelName)) {
                    unsubscribe(type, channelName)
                        .whenComplete((r, ex) -> {
                            //It won't be executed here.,AsyncSemaphore releaseNot implemented!
                            ();
                        });
                } else {
                    ();
                }
            } else {
                ();
            }
        });
        return subscribeFuture;
}

 public CompletableFuture<Void> unsubscribe(PubSubType topicType, ChannelName channelName) {
        //...
        BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {

            @Override
            public boolean onStatus(PubSubType type, CharSequence channel) {
                //this oneifinaccessible...
                if (type == topicType && (channelName)) {
                    (true);

                    if (() == 1) {
                        MasterSlaveEntry msEntry = getEntry(channelName);
                        (());
                    }

                    //outside the triggerwhenCompleteimplementation
                    (null);
                    return true;
                }
                return false;
            }

        };

        ChannelFuture future;
        //Here it is.unsubscribecap (a poem)punsubscribe,And the front came in.topicTypebesubscribe,It doesn't match.
        if (topicType == ) {
            future = (channelName, listener);
        } else {
            future = (channelName, listener);
        }
        return result;
}

recurrence of problems

The previous analysis is right on the money, we still have to prove it through practice and justify it.
My reproduction code is as follows, locking via concurrent calls, started running the onStatus method of the BaseRedisPubSubListener with a breakpoint in #unsubscribe, and found that, as mentioned earlier, the topicType was indeed incorrect. Then after running for a while, hit a breakpoint in the method, and observe that the size of the listener property keeps growing, via thejmap pid The problem was reproduced when the gc was triggered and also not recycled.

public void test() {
  for (int i = 0; i < 20000000; i++) {
    (() -> {
      // Paste in the previous code and commit to the thread pool
    });
  }
}

Problem solving

At the start of troubleshooting the problem, the author mentioned in the githubissueInquire what it is and how to fix it. Their response was to talk tothis relevant, and recommended upgrading to version 3.21.2, but the description mentioned in it was not quite the same as mine, so following the experience of version selection, I decided to upgrade the version to 3.17 the last minor version of 3.17.7 to try it out, re-run the test code above, and after running it for a while, I found that the problem didn't appear anymore.

Looking at the #unsubscribe source code, I see that the piece of logic that went wrong has been fixed.

Lessons learned

Encountering difficult problems is almost an inevitable thing in every development, the problem solving process, methodology and review afterwards, the experience is very important to personal learning and ability to improve a lot of help.
The following points summarize what I've learned this time around:

  • timely stop-loss
    When there is a problem in production, many development students first think about how to find the cause and solve the underlying problem, but the reality should be to assess the impact and stop the problem in time to avoid it from spreading and expanding its impact.
    For example, can not be solved in a short period of time, but also down to slowly look at the logs, analyze the code, can be rolled back first rollback, can be restarted first restart, and strive to solve the problem before the appearance of the capital loss, reduce the impact on business.

  • report upwards
    When encountering thorny problems do not be muffled to figure out how to solve them by yourself, the correct practice is to report the problems and risks to your LEADER first. If the problem is tricky and serious, you can ask for assistance to avoid delaying the problem due to lack of personal ability and dragging a small problem into a big one.

  • Preservation of the site
    Sometimes the problem is difficult to reproduce, like our current situation may occur once a month, if we directly restart the service, then wait for the next time the problem occurs will be very long. So the right thing to do is to keep the scene, and at the same time to not affect the business, you can keep a node, take off its traffic, dump the program stack through jstack/jmap, and restart the other nodes.

  • Stay patient.
    Some problems can't be solved overnight, some are measured in days, some may take a month to solve. So it's important to be patient, look at the official documentation, github issues, analyze the source code, try all kinds of ways, rule out all kinds of possibilities, I believe that we will always find a solution.

  • Version Selection
    The version of redisson we chose was 3.17.1, which is actually not a very good choice. According to the version specification, x means major version, which is usually a major update, y means minor version, which is usually some feature iteration, and z means fix version, which is usually for bug fixing. For example, springboot is a very major update from upgrading to 3.0 with a minimum jdk version requirement of 17.
    The reason why I chose 3.17.7 for testing above is because 3.17.7 is the last minor release of 3.17, and when you see the RELEASE report for this release you'll know why, it's all about fixing bugs.
    Of course this fix for the problem is not necessarily in the .7 release, it could be somewhere between 1-7, so look more closely if you're interested.

For more sharing, feel free to follow my github:/jmilktea/jtea