Outline
Fair Lock RedissonFairLock Overview
2. Locking and queuing of fair lock source code
3. The source code of fair lock can be re-entered and locked
4. Comparison of the old and new versions of fair lock source code
5. Queue reordering of fair lock source code
6. Fair lock source code release lock
7. Add locks in order of fair lock source code
Fair Lock RedissonFairLock Overview
(1) Unfair and fair reentrant lock
(2) Simple use of Redisson fair lock
(3) Initialization of Redisson fair lock
(1) Unfair and fair reentrant lock
1. Unfair reentrant lock
After the lock is released, the thread queued to acquire the lock will re-acquire the lock unorderedly without any order.
2. Fair reentrant lock
After the lock is released, the thread queued to acquire the lock will acquire the lock in the order when the lock is requested. A fair lock can ensure that the order in which threads acquire locks is the same as the order in which they request to acquire locks. That is, whoever applies to obtain this lock first can obtain this lock first. A fair reentrant lock will queue up the lock requests of each thread, ensuring that the threads that first apply for the lock acquisition can be acquired first, thereby achieving the so-called fairness.
3. The differences between reentrant unfair locks and fair locks
Reentrant unfair locks and fair locks are the same in the overall technical implementation framework. The only difference is that the logic of locking and unlocking is different. The locking logic of unfair lock is relatively simple. The locking logic of fair locks must be added to the queueing mechanism to ensure that each thread can obtain the locks in order.
(2) Simple use of Redisson fair lock
Redisson's reentrant lock RedissonLock refers to a non-fair reentrant lock, and Redisson's fair lock Redisson Fair Lock refers to a fair reentrant lock.
Redisson's fair reentrant lock implements an interface, ensuring that when multiple threads request locking at the same time, it is preferred to the threads that issue the request first. All request threads will be queued in a queue. When a thread goes down, Redisson will wait for 5 seconds before continuing to allocate the next thread.
RedissonFairLock is a subclass of RedissonLock. The lock implementation framework of RedissonFairLock is basically the same as RedissonLock. In the lua script that acquires the lock and releases the lock, the logic of RedissonFairLock is different.
//1. The most common way to use
RedissonClient redisson = (config);
RLock fairLock = ("myLock");
();
//It will be unlocked automatically after 2.10 seconds, and there is no need to call the unlock method to unlock manually.
(10, );
//3. Try to add lock, wait for up to 100 seconds, and automatically unlock 10 seconds after locking.
boolean res = (100, 10, );
();
// Provides a related method for asynchronous execution of fair reentrant locks
RLock fairLock = ("myLock");
();
(10, );
Future<Boolean> res = (100, 10, );
(3) Initialization of Redisson fair lock
public class RedissonDemo {
public static void main(String[] args) throws Exception {
...
//Create a RedissonClient instance
RedissonClient redisson = (config);
//Get fair reentrant lock
RLock fairLock = ("myLock");
();// Add lock
();//Release the lock
}
}
public class Redisson implements RedissonClient {
//Redis's connection manager encapsulates a Config instance
protected final ConnectionManager connectionManager;
//Redis's command executor encapsulates a ConnectionManager instance
protected final CommandAsyncExecutor commandExecutor;
...
protected Redisson(Config config) {
= config;
Config configCopy = new Config(config);
//Initialize Redis's connection manager
connectionManager = (configCopy);
...
//Initialize Redis command executor
commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
...
}
public RLock getFairLock(String name) {
return new RedissonFairLock(commandExecutor, name);
}
...
}
public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime;
private final CommandAsyncExecutor commandExecutor;
...
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor, name, 60000*5);
}
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
super(commandExecutor, name);
= commandExecutor;
= threadWaitTime;
...
}
...
}
public class RedissonLock extends RedissonBaseLock {
protected long internalLockLeaseTime;
final CommandAsyncExecutor commandExecutor;
...
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
= commandExecutor;
//InternalLockLeaseTime related to WatchDog
//The connection manager ConnectionManager can be obtained through the command executor CommandExecutor
//The configuration information of Redis can be obtained through the ConnectionManager ConnectionManager.
//The lockWatchdogTimeout timeout can be obtained through Redis's configuration information class Config
= ().getCfg().getLockWatchdogTimeout();
...
}
...
}
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
...
protected long internalLockLeaseTime;
final String id;
final String entryName;
final CommandAsyncExecutor commandExecutor;
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
= commandExecutor;
= ().getId();//Get UUID
= ().getCfg().getLockWatchdogTimeout();
= id + ":" + name;
}
...
}
abstract class RedissonExpirable extends RedissonObject implements RExpirable {
RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
...
}
public abstract class RedissonObject implements RObject {
protected final CommandAsyncExecutor commandExecutor;
protected String name;
protected final Codec codec;
public RedissonObject(CommandAsyncExecutor commandExecutor, String name) {
this(().getCodec(), commandExecutor, name);
}
public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
= codec;
= commandExecutor;
if (name == null) {
throw new NullPointerException("name can't be null");
}
setName(name);
}
...
}
public class ConfigSupport {
...
//Create Redis's connection manager
public static ConnectionManager createConnectionManager(Config configCopy) {
//Generate UUID
UUID id = ();
...
if (() != null) {
validate(());
//Return ClusterConnectionManager instance
return new ClusterConnectionManager((), configCopy, id);
}
...
}
...
}
public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
super(config, id);
...
}
...
}
public class MasterSlaveConnectionManager implements ConnectionManager {
protected final String id;//UUID when initialized
private final Config cfg;
protected Codec codec;
...
protected MasterSlaveConnectionManager(Config cfg, UUID id) {
= ();//The UUID is passed in
...
= cfg;
= ();
...
}
public String getId() {
return id;
}
public Codec getCodec() {
return codec;
}
...
}
2. Locking and queuing of fair lock source code
(1) Execution process when locking
(2) Description of the parameters related to the lua script that obtains fair lock
(3) Lua script step 1: Enter the while loop to remove the queue and ordered collection and waiting for the timeout threads to wait for the timeout
(4) Lua script step 2: determine whether the current thread can acquire the lock
(5) Lua script step three: execute the operation of acquiring the lock
(6) Lua script step 4: determine whether the lock has been held by the current thread (reenter the lock)
(7) Lua script step 5: Determine whether the thread that currently fails to acquire the lock is queued in the queue
(8) Lua script step 6: Queue the threads that failed to acquire the lock
(9) The process of executing the lua script by the first thread that fails to acquire the lock
(10) The process of executing the lua script by the second thread that failed to acquire the lock
(1) Execution process when locking
When using Redisson's fair lock RedissonFairLock for locking: First, the lock() method of RedissonLock will be called, then the tryAcquire() method of RedissonLock will be called, and then the tryAcquireAsync() method of RedissonLock will be called.
In the RedissonLock's tryAcquireAsync() method, a tryLockInnerAsync() method that can be overloaded by the RedissonLock subclass will be called. For unfair locks, executing this will call RedissonLock's tryLockInnerAsync() method. For fair locks, executing this will call RedissonFairLock's tryLockInnerAsync() method.
In the tryLockInnerAsync() method of RedissonFairLock, the specific lua script is executed.
public class RedissonDemo {
public static void main(String[] args) throws Exception {
...
//Create a RedissonClient instance
RedissonClient redisson = (config);
//Get fair reentrant lock
RLock fairLock = ("myLock");
();// Add lock
();//Release the lock
}
}
public class RedissonLock extends RedissonBaseLock {
...
//Lock without parameters
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
//Lock with parameters
public void lock(long leaseTime, TimeUnit unit) {
try {
lock(leaseTime, unit, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = ().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
//Locking successfully
if (ttl == null) {
return;
}
//Locking failed
...
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//Unfair lock, next call is the () method
//Fair lock, next call is the () method
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, , threadId, RedisCommands.EVAL_LONG);
}
//Add callback listening to ttlRemainingFuture of type RFuture<Long>
CompletionStage<Long> f = (ttlRemaining -> {
//The lock lua script in tryLockInnerAsync() has been executed asynchronously, and the following method logic will be called back:
//Locking successfully
if (ttlRemaining == null) {
if (leaseTime != -1) {
//If the incoming leaseTime is not -1, that is, the expiration time of the specified lock, then no timed scheduled task will be created
internalLockLeaseTime = (leaseTime);
} else {
//Create a scheduled schedule task
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompleteFutureWrapper<>(f);
}
...
}
public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime;//The thread can wait for the lock time
private final CommandAsyncExecutor commandExecutor;
private final String threadsQueueName;
private final String timeoutSetName;
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor, name, 60000*5);//Pause in 60 seconds*5=5 minutes
}
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
super(commandExecutor, name);
= commandExecutor;
= threadWaitTime;
threadsQueueName = prefixName("redisson_lock_queue", name);
timeoutSetName = prefixName("redisson_lock_timeout", name);
}
...
@Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
long wait = threadWaitTime;
if (waitTime != -1) {
// Assign the specified acquisition lock waiting time passed to the wait variable
wait = (waitTime);
}
...
if (command == RedisCommands.EVAL_LONG) {
return evalWriteAsync(getRawName(), command,
//Step 1: remove stale threads, remove threads waiting for timeout
"while true do " +
//Get the first element in the queue
//KEYS[2] is the name of a queue used to queue threads
"local firstThreadId2 = ('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
//Get the score corresponding to the first element in the queue, that is, the expiration time of the first thread ranked first
//KEYS[3] is the name of an ordered collection used to sort threads
"local timeout = tonumber(('zscore', KEYS[3], firstThreadId2));" +
//If the expiration time of the first thread is less than the current time, it means that the thread has not yet obtained the lock after waiting for timeout, so it must be removed
//ARGV[4] is the current time
"if timeout <= tonumber(ARGV[4]) then " +
//remove the item from the queue and timeout set NOTE we do not alter any other timeout
//Remove this thread from the ordered collection + queue
"('zrem', KEYS[3], firstThreadId2);" +
"('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
//check if the lock can be acquired now
//Step 2: Determine whether the current thread can try to acquire the lock. The following two situations can be judged to try to acquire the lock
//Scenario 1: The lock does not exist + the queue does not exist; KEYS[1] is the name of the lock; KEYS[2] is the queue queued for threads;
//Scenario 2: The lock does not exist + the queue exists + The first element of the queue is the current thread; ARGV[2] is the UUID + ThreadID of the current thread;
"if (('exists', KEYS[1]) == 0) " +
"and ((('exists', KEYS[2]) == 0) " +
"or (('lindex', KEYS[2], 0) == ARGV[2])) then " +
//Step 3: The current thread performs the operation of acquiring the lock
//remove this thread from the queue and timeout set
//The first element of the pop-up queue + Delete the element corresponding to UUID:ThreadID from the ordered collection
"('lpop', KEYS[2]);" +
"('zrem', KEYS[3], ARGV[2]);" +
//decrease timeouts for all waiting in the queue
//Decrement the fraction of each thread in the ordered set, that is, decrement the waiting time when each thread acquires the lock
//zrange returns the members (0,-1) in the specified interval in the ordered set KEYS[3], that is, all members
"local keys = ('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
//Score minus the keys[i] of the ordered set KEYS[3]: tonumber(ARGV[3])
//ARGV[3] is the time that thread can wait when it acquires locks. The default is 5 minutes
"('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
//acquire the lock and set the TTL for the lease
//hset sets the Hash value for locking operation + pexpire sets the expiration time of the lock key + Finally, return nil to indicate that the locking is successful
"('hset', KEYS[1], ARGV[2], 1);" +
"('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
//check if the lock is already held, and this is a re-entry(reentry)
//Step 4: Determine whether the lock has been held by the current thread. KEYS[1] is the name of the lock, and ARGV[2] is the UUID + ThreadID of the current thread;
"if ('hexists', KEYS[1], ARGV[2]) == 1 then " +
"('hincrby', KEYS[1], ARGV[2],1);" +
"('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
//the lock cannot be acquired, check if the thread is already in the queue
//Step 5: Determine whether the thread that currently fails to acquire the lock is queued in the queue
//KEYS[3] is an ordered set of thread sorting, and ARGV[2] is the UUID + ThreadID of the current thread;
"local timeout = ('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
//the real timeout is the timeout of the prior thread in the queue,
//but this is approximately correct, and avoids having to traverse the queue
//If the thread that currently fails to acquire the lock is already queued in the queue
// Then return to the thread waiting for the lock to be acquired, and the timeout will be over as much time left. The external code will block and wait for this time after this time after the external code gets this time.
//ARGV[3] is the time that the current thread can wait when acquiring the lock, ARGV[4] is the time
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
//add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
//the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the threadWaitTime
//Step 6: Queue the threads that failed to acquire the lock
"local lastThreadId = ('lindex', KEYS[2], -1);" +
"local ttl;" +
//If the last element queued in the queue is not the current thread
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId is the last thread in the queue, ARGV[2] is the UUID+thread ID of the current thread, ARGV[4] is the current time
//Because the thread with the maximum expiration time is the last in the queue
//So the expiration time of the current thread can be calculated by the expiration time of the last element in the queue
//This ensures that the expiration time of the threads newly added to the queue and ordered collection is the largest
//The following line will calculate: How much time is left? The last thread in the current queue will expire. The external code will block and wait for this time after getting this time.
//In this way, the thread that joins the queue will block the expiration time of the thread that waits for the previous thread that joins the queue.
"ttl = tonumber(('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//The following line will calculate: How much time is left, the lock will expire, and the external code will block and wait for this time after getting this time.
"ttl = ('pttl', KEYS[1]);" +
"end;" +
// Calculate the expiration time of the current thread while waiting for the lock queue.
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//Insert the current thread as an element into the ordered set, and set the element score to the expiration time when the thread is queued for locks.
//Then insert the current thread as an element into the tail of the queue
"if ('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
(getRawName(), threadsQueueName, timeoutSetName),
(leaseTime),
getLockName(threadId),
wait,
currentTime
);
}
...
}
...
}
(2) Description of the parameters related to the lua script that obtains fair lock
KEYS[1] is getRawName(), which is the key of a Hash data structure, that is, the name of the lock, such as "myLock".
KEYS[2] is threadsQueueName, which is the name of a queue used to queue threads. When multiple client threads apply to acquire locks, they will queue in this queue. For example, "redisson_lock_queue:{myLock}".
KEYS[3] is timeoutSetName, which is the name of an ordered set used to sort threads. This ordered set can be automatically sorted by the score specified by each data. For example, "redisson_lock_timeout:{myLock}".
ARGV[1] is leaseTime, which represents the expiration time of the lock. If leaseTime is not specified, the default is internalLockLeaseTime = 30 seconds.
ARGV[2] is getLockName(threadId), representing the client UUID + thread ID.
ARGV[3] is threadWaitTime, which represents the time the thread can wait (default 5 minutes).
ARGV[4] is currentTime, representing the current time.
(3) Lua script step 1: Enter the while loop to remove the queue and ordered collection and waiting for the timeout threads to wait for the timeout
In the while loop, the command is first executed: "lindex redisson_lock_queue:{myLock} 0", which means obtaining the first element in the queue "redisson_lock_queue:{myLock}". At the beginning, the queue is empty, so nothing can be obtained, firstThreadId2 is false. At this time, it will break and exit the while loop.
If the first element in the queue is obtained, the zscore command will be executed: the score corresponding to the element is obtained from the ordered set, that is, the expiration time of the corresponding thread of the element. If the expiration time is smaller than the current time, then the element must be removed from the queue and ordered set. Otherwise, it will also break and exit the while loop.
//Step 1: remove stale threads, remove threads waiting for timeout
"while true do " +
//Get the first element in the queue
//KEYS[2] is the name of a queue used to queue threads
"local firstThreadId2 = ('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
//Get the score corresponding to the first element in the queue, that is, the expiration time of the first thread ranked first
//KEYS[3] is the name of an ordered collection used to sort threads
"local timeout = tonumber(('zscore', KEYS[3], firstThreadId2));" +
//If the expiration time of the first thread is less than the current time, it means that the thread has not yet obtained the lock when it has expired, so it needs to be removed
//ARGV[4] is the current time
"if timeout <= tonumber(ARGV[4]) then " +
//remove the item from the queue and timeout set NOTE we do not alter any other timeout
//Remove this thread from ordered collection + queue
"('zrem', KEYS[3], firstThreadId2);" +
"('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
(4) Lua script step 2: determine whether the current thread can acquire the lock
Judgment condition 1:
First, execute the command "exists myLock" to determine whether the lock exists. At the beginning, no threads had locked, so the judgment condition is definitely valid, and this condition is true.
Judgment condition 2:
Then execute the command "exists redisson_lock_queue:{myLock}" to see if the queue exists. There was no such queue at the beginning, so this condition is definitely true.
Judgment condition three:
If there is this queue, the condition for determining the existence of the queue is not true, and the judgment followed by "or" is performed. That is, execute the command "lindex redisson_lock_queue:{myLock} 0" to determine whether the first element of the queue is the UUID + ThreadID of the current thread.
//check if the lock can be acquired now
//Step 2: Determine whether the current thread can try to acquire the lock. The following two situations can be judged to try to acquire the lock
//Scenario 1: The lock does not exist + the queue does not exist; KEYS[1] is the name of the lock;
//Scenario 2: The lock does not exist + the queue exists + The first element of the queue is the current thread; ARGV[2] is the UUID + ThreadID of the current thread;
"if (('exists', KEYS[1]) == 0) " +
"and ((('exists', KEYS[2]) == 0) " +
"or (('lindex', KEYS[2], 0) == ARGV[2])) then " +
...
"end;" +
Summarize the situation where the current thread can now try to acquire the lock as follows:
Situation 1:The lock does not exist + the queue does not exist either
Situation 2:The lock does not exist + the queue exists + The first element of the queue is the current thread
(5) Lua script step three: execute the operation of acquiring the lock
When the condition to determine whether you can try to acquire the lock is now passed, the following operation will be performed:
Step 1:Execute the command "lpop redisson_lock_queue:{myLock}" to pop up the first element of the queue. The queue is empty at the beginning, so the command will not be processed. Then execute the command "zrem redisson_lock_timeout:{myLock} UUID1:ThreadID1", that is, delete the element corresponding to UUID1:ThreadID1 from the ordered set. The ordered set is also empty at the beginning, so the command will not be processed.
Step 2:Run the command "hset myLock UUID1:ThreadID1 1" to perform the lock operation. In the Hash value with key to myLock, field is UUID1:ThreadID1's value is 1. Then execute the command "pexpire myLock 30000" to set the expiration time of the lock key to 30 seconds.
Finally, nil is returned, so in the outer code, the lock will be considered successful. So a WatchDog watchdog scheduled task will be created and the lock will be checked after 10 seconds. If the check finds that the current thread still holds this lock, then reset the expiration time of the lock key to 30 seconds, and recreate a WatchDog watchdog timing scheduled task and continue the check after 10 seconds.
//check if the lock can be acquired now
//Step 2: Determine whether the current thread can try to acquire the lock. The following two situations can be judged to try to acquire the lock
//Scenario 1: The lock does not exist + the queue does not exist; KEYS[1] is the name of the lock; KEYS[2] is the queue queued for threads;
//Scenario 2: The lock does not exist + the queue exists + The first element of the queue is the current thread; ARGV[2] is the UUID + ThreadID of the current thread;
"if (('exists', KEYS[1]) == 0) " +
"and ((('exists', KEYS[2]) == 0) " +
"or (('lindex', KEYS[2], 0) == ARGV[2])) then " +
//Step 3: The current thread performs the operation of acquiring the lock
//remove this thread from the queue and timeout set
//The first element of the pop-up queue + Delete the element corresponding to UUID:ThreadID from the ordered collection
"('lpop', KEYS[2]);" +
"('zrem', KEYS[3], ARGV[2]);" +
//decrease timeouts for all waiting in the queue
//Decrement the fraction of each thread in the ordered set, that is, decrement the waiting time when each thread acquires the lock
//zrange returns the members (0,-1) in the specified interval in the ordered set KEYS[3], that is, all members
"local keys = ('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
//Score minus the keys[i] of the ordered set KEYS[3]: tonumber(ARGV[3])
//ARGV[3] is the time that thread can wait when it acquires locks. The default is 5 minutes
"('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
//acquire the lock and set the TTL for the lease
//hset sets the Hash value for locking operation + pexpire sets the expiration time of the lock key + Finally, return nil to indicate that the locking is successful
"('hset', KEYS[1], ARGV[2], 1);" +
"('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
(6) Lua script step 4: determine whether the lock has been held by the current thread (reenter the lock)
The command "hexists myLock UUID:ThreadID" will be executed. If the judgment condition is passed, it means that the thread holding the lock has reentered the lock. Therefore, the command "hincrby myLock UUID:ThreadID 1" will be executed. For the Hash value whose key is the lock name, the value value of field is UUID + thread ID is accumulated by 1. And execute the command "pexpire myLock 300000" to reset the expiration time of the lock key. Finally, nil is returned, indicating that the reentry and locking is successful.
//check if the lock is already held, and this is a re-entry(reentry)
//Step 4: Determine whether the lock has been held by the current thread. KEYS[1] is the name of the lock, and ARGV[2] is the UUID + ThreadID of the current thread;
"if ('hexists', KEYS[1], ARGV[2]) == 1 then " +
"('hincrby', KEYS[1], ARGV[2], 1);" +
"('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
(7) Lua script step 5: Determine whether the thread that currently fails to acquire the lock is queued in the queue
By executing the command "zscore redisson_lock_timeout:{myLock} UUID:ThreadID", the corresponding score of the current thread in the ordered set, that is, the expiration time. If the acquisition is successful, it returns: how much timeout is left for the current thread to wait for the acquisition of the lock, and the external code will block and wait for this time after it gets this time.
//the lock cannot be acquired, check if the thread is already in the queue
//Step 5: Determine whether the thread that currently fails to acquire the lock is queued in the queue
//KEYS[3] is an ordered set of thread sorting, and ARGV[2] is the UUID+ThreadID of the current thread;
"local timeout = ('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
//the real timeout is the timeout of the prior thread in the queue,
//but this is approximately correct, and avoids having to traverse the queue
//If the thread that currently fails to acquire the lock is already queued in the queue
// Then return to the thread waiting for the lock to be acquired, and the timeout will be over as much time left. The external code will block and wait for this time after this time after the external code gets this time.
//ARGV[3] is the time that the current thread can wait when acquiring the lock, ARGV[4] is the time
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
(8) Lua script step 6: Queue the threads that failed to acquire the lock
First get the last element in the queue. Because the thread with the maximum expiration time is last in the queue, the expiration time of the current thread can be calculated by the expiration time of the last element in the queue. This ensures that the expiration time of threads newly added to the queue and ordered collection is the largest. Then obtain the remaining survival time of the last thread in the lock or queue, and then calculate the expiration time of the current thread while waiting for the lock.
Then insert the current thread as an element into the ordered set, and set the fraction of the element in the ordered set to the expiration time when the thread is queued for locking, and then insert the current thread as an element into the tail of the queue.
Finally, return the remaining survival time of the first thread in the lock or queue to the outer code. If the return value obtained by the outer code is non-null, the client will enter a while loop. In the while loop, it will wait for ttl every time it blocks and then try to lock and re-execute the lua script.
If there are no elements in the queue, the first thread to join the queue will block the expiration time of the lock. If there are elements in the queue, the next thread joining the queue will block the expiration time of the thread waiting for the previous thread joining the queue.
//Step 6: Queue the threads that failed to acquire the lock
"local lastThreadId = ('lindex', KEYS[2], -1);" +
"local ttl;" +
//If the last element queued in the queue is not the current thread
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId is the last thread in the queue, ARGV[2] is the UUID + thread ID of the current thread, ARGV[4] is the current time
//Because the thread with the maximum expiration time is the last in the queue
//So the expiration time of the current thread can be calculated by the expiration time of the last element in the queue
//This ensures that the expiration time of the threads newly added to the queue and ordered collection is the largest
//The following line will calculate: How much time is left? The last thread in the current queue will expire. The external code will block and wait for this time after getting this time.
//In this way, the thread that joins the queue will block the expiration time of the thread that waits for the previous thread that joins the queue.
"ttl = tonumber(('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//The following line will calculate: How much time is left, the lock will expire, and the external code will block and wait for this time after getting this time.
"ttl = ('pttl', KEYS[1]);" +
"end;" +
// Calculate the expiration time of the current thread while waiting for the lock queue.
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//Insert the current thread as an element into the ordered set, and set the element score to the expiration time when the thread is queued for locks.
//Then insert the current thread as an element into the tail of the queue
"if ('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
(9) The process of executing the lua script by the first thread that fails to acquire the lock
The core of fair locks is that when applying for locking, each client that fails to lock will queue up. When the lock is released, the lock will be acquired in turn to achieve fairness.
Assuming that the first client thread has been locked successfully at this time and the second client thread also tries to lock it, the following queueing process will be performed.
Step 1:Enter the while loop and remove the thread waiting for the timeout. Execute the command "lindex redisson_lock_queue:{myLock} 0" to obtain the first element of the queue. Since the queue is still empty at this time, false is obtained, so the while loop is exited.
Step 2:Determine whether the current thread can now try to acquire the lock. Because I executed the command "exists myLock", I found that the lock already existed, so I decided that it would not pass.
Step 3:Determine whether the lock has been held by the current thread, because the UUID + thread ID of the second client thread must not be equal to the first client thread. So at this time, execute the command "hexists myLock UUID2:ThreadID2" and find that it does not exist. Therefore, the judgment conditions for reentrant locks here are also not valid.
Step 4:Determines whether the thread that currently fails to acquire the lock is already queued in the queue. Since the current thread is the first thread to fail to acquire the lock, the judgment is not passed.
Step 5:Next, queue up.
//queuing the threads that failed to acquire the lock
"local lastThreadId = ('lindex', KEYS[2], -1);" +
"local ttl;" +
//If the last element queued in the queue is not the current thread
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId is the last thread in the queue, ARGV[2] is the UUID+thread ID of the current thread, ARGV[4] is the current time
//Because the thread with the maximum expiration time is the last in the queue
//So the expiration time of the current thread can be calculated by the expiration time of the last element in the queue
//This ensures that the expiration time of the threads newly added to the queue and ordered collection is the largest
//The following line will calculate: How much time is left? The last thread in the current queue will expire. The external code will block and wait for this time after getting this time.
//In this way, the thread that joins the queue will block the expiration time of the thread that waits for the previous thread that joins the queue.
"ttl = tonumber(('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//The following line will calculate: How much time is left, the lock will expire, and the external code will block and wait for this time after getting this time.
"ttl = ('pttl', KEYS[1]);" +
"end;" +
// Calculate the expiration time of the current thread while waiting for the lock queue.
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//Insert the current thread as an element into the ordered set, and set the element score to the expiration time when the thread is queued for locks.
//Then insert the current thread as an element into the tail of the queue
"if ('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;"
First, execute the command "lindex redisson_lock_queue:{myLock} 0". That is, the last element is obtained from the queue. Since the queue is empty at this time, the element cannot be obtained. Then execute the command "ttl = pttl myLock" to obtain the remaining survival time of the lock.
Then calculate the expiration time of the current thread while queueing for locks. Assuming that the remaining survival time of myLock is 20 seconds, then timeout = ttl + 5 minutes + current time = 20 seconds + 5 minutes + 10:00:00 = 10:05:20;
Then execute the command "zadd redisson_lock_timeout:{myLock} 10:05:20 UUID2:ThreadID2", which means inserting an element in the ordered set. The element value is UUID2:ThreadID2, and the corresponding score of the element is 10:05:20. The score will be represented by the long-type time stamp of time. The later the time, the larger the timestamp. Ordered Set Sorted Set will automatically sort from small to large according to the inserted element scores.
Then execute the command "rpush redisson_lock_queue:{myLock} UUID2:TheadID2", which means insert UUID2:ThreadID2 into the tail of the queue.
Finally, return ttl to the outer code, that is, return the remaining survival time of myLock. If the ttl obtained by the outer code is non-null, the client will enter a while loop. In the while loop, every time it blocks and waits for ttl, it tries to lock and re-executes the lua script.
(10) The process of executing the lua script by the second thread that failed to acquire the lock
If a third client thread also attempts to add locks at this time, the following queueing process will be performed.
Step 1:Enter the while loop and remove the thread waiting for the timeout. Execute the command "lindex redisson_lock_queue:{myLock} 0" to obtain the first element of the queue. At this time, UUID2:ThreadID2 is obtained, which means that the second client thread is queuing in the queue.
Continue to execute the command "zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2" to obtain the score corresponding to UUID2:ThreadID2 from the ordered set, timeout = 10:05:20.
Assuming the current time is 10:00:25, then the condition of timeout <= 10:00:25 does not hold, so the while loop is exited.
Step 2:Determine whether the current thread can now try to acquire the lock, but find that it cannot pass. Because when executing the command "exists myLock", it is found that the lock already exists.
Step 3:Determines whether the lock has been held by the current thread. Since the UUID + thread ID of the third client thread must not be equal to the first client thread. So at this time, execute the command "hexists myLock UUID3:ThreadID3" and find that it does not exist. Therefore, the judgment conditions for reentrant locks here are also not valid.
Step 4:Determines whether the thread that currently fails to acquire the lock is already queued in the queue. Since the current thread is the second thread that failed to acquire the lock, the judgment is not passed.
Step 5:Next, queue up.
//queuing the threads that failed to acquire the lock
"local lastThreadId = ('lindex', KEYS[2], -1);" +
"local ttl;" +
//If the last element queued in the queue is not the current thread
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId is the last thread in the queue, ARGV[2] is the UUID + thread ID of the current thread, ARGV[4] is the current time
//Because the thread with the maximum expiration time is the last in the queue
//So the expiration time of the current thread can be calculated by the expiration time of the last element in the queue
//This ensures that the expiration time of the threads newly added to the queue and ordered collection is the largest
//The following line will calculate: How much time is left? The last thread in the current queue will expire. The external code will block and wait for this time after getting this time.
//In this way, the thread that joins the queue will block the expiration time of the thread that waits for the previous thread that joins the queue.
"ttl = tonumber(('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//The following line will calculate: How much time is left, the lock will expire, and the external code will block and wait for this time after getting this time.
"ttl = ('pttl', KEYS[1]);" +
"end;" +
// Calculate the expiration time of the current thread while waiting for the lock queue.
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//Insert the current thread as an element into the ordered set, and set the element score to the expiration time when the thread is queued for locks.
//Then insert the current thread as an element into the tail of the queue
"if ('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;"
First, execute the command "lindex redisson_lock_queue:{myLock} 0" to obtain the last element in the queue UUID2:ThreadID2.
Then judge whether the condition is true: lastThreadId is not false + lastThreadId is not yourself. Since ARGV[2] = UUID3:ThreadID3 at this time, the judgment condition is valid. That is, the last element queued in the queue is not the client thread currently trying to acquire the lock.
So execute: "zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2" - The current time, that is, get how much time the last thread in the queue will expire, thereby obtaining ttl.
Then calculate the expiration timeout of the current thread when queueing for lock based on ttl, then execute the zadd and rpush commands to enqueue and queue the current thread, and finally return ttl.
3. The source code of fair lock can be re-entered and locked
The client holding a fair lock repeats () and executes the lock lua script as follows:
Step 1:Enter the while loop and remove the thread waiting for the timeout. Execute the command "lindex redisson_lock_queue:{myLock} 0" to obtain the first element of the queue. At this time, UUID2:ThreadID2 is obtained, which means that the second client thread is queuing in the queue.
Continue to execute the command "zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2" to obtain the score corresponding to UUID2:ThreadID2 from the ordered set, timeout = 10:05:20.
Assuming the current time is 10:00:25, then the condition of timeout <= 10:00:25 does not hold, so the while loop is exited.
Step 2:Determine whether the current thread can now try to acquire the lock, but find that it cannot pass. Because when executing the command "exists myLock", it is found that the lock already exists.
Step 3:Determines whether the lock has been held by the current thread. Since the UUID + thread ID of the current thread is equal to the thread holding the lock. That is, when the command "hexists myLock UUID:ThreadID" is executed, it is found that the key exists, so the judgment condition for reentrant lock here is true.
Therefore, the command "hincrby myLock UUID:ThreadID 1" will be executed. For the Hash value whose key is the lock name, the Hash value of the key is UUID + thread ID is accumulated by 1. And execute the command "pexpire myLock 300000" to reset the expiration time of the lock key. Finally, nil is returned, indicating that the reentry and locking is successful.
//check if the lock is already held, and this is a re-entry(reentry)
//Step 4: Determine whether the lock has been held by the current thread. KEYS[1] is the name of the lock, and ARGV[2] is the UUID+ThreadID of the current thread;
"if ('hexists', KEYS[1], ARGV[2]) == 1 then " +
"('hincrby', KEYS[1], ARGV[2], 1);" +
"('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
4. Comparison of the old and new versions of fair lock source code
(1) The new version failed to lock again and did not refresh the queue score (waiting for timeout timeout)
(2) If the old version fails to lock again, the queue score will be refreshed (the timeout waiting for the timeout)
When the client thread fails to try to add a fair lock, it will enter a while loop. In the while loop, you will wait for a while each time before trying to add a fair lock again.
public class RedissonLock extends RedissonBaseLock {
...
// Add lock
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//Thread ID, used to generate the value of setting Hash
long threadId = ().getId();
//Try to add lock, and execute the () method to pass the default leaseTime=-1
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// ttl is null means that the lock is successfully
if (ttl == null) {
return;
}
//Processing when locking fails
CompleteFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
(future);
} else {
(future);
}
try {
while (true) {
//Try to get the lock again
ttl = tryAcquire(-1, leaseTime, unit, threadId);
//The returned ttl is null. After obtaining the lock, it exits the while loop
if (ttl == null) {
break;
}
//The returned ttl is not null, which means that other clients or threads still hold locks
//Then use the synchronous component Semaphore to block and wait for a period of ttl
if (ttl >= 0) {
try {
(future).getLatch().tryAcquire(ttl, );
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
(future).getLatch().tryAcquire(ttl, );
}
} else {
if (interruptibly) {
(future).getLatch().acquire();
} else {
(future).getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe((future), threadId);
}
}
...
}
Suppose the first lock of the second client thread is at 10:00:00, and then at 10:00:15, the client thread initiates a request again to try to lock, but the first client thread has always held the lock between 10:00:00 and 10:00:15. At this time, the process of locking the lock again of the second client thread is as follows:
(1) The new version failed to lock again and did not refresh the queue score (waiting for timeout timeout)
Step 1:Enter the while loop and remove the thread waiting for the timeout. Execute the command "lindex redisson_lock_queue:{myLock} 0" to obtain the first element of the queue. At this time, UUID2:ThreadID2 is obtained, which means that the second client thread is queuing in the queue.
Continue to execute the command "zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2" to obtain the corresponding score of UUID2:ThreadID2 from the ordered set, such as the obtained timeout = 10:05:20. According to the current time is 10:00:15, then the condition of timeout <= 10:00:15 does not hold, so the while loop is exited.
Step 2:Determine whether the current thread can now try to acquire the lock, but find that it cannot pass. Because when executing the command "exists myLock", it is found that the lock already exists.
Step 3:Determines whether the lock has been held by the current thread. Since the UUID + thread ID of the second client thread must not be equal to the first client thread, the command "hexists myLock UUID2:ThreadID2" is executed at this time and it is found that it does not exist, so the judgment condition for reentrant locks here is also not true.
Step 4:Determines whether the thread that currently fails to acquire the lock is already queued in the queue. Since the current thread attempts to acquire the lock for the second time, the judgment is passed. Then return to the second client thread when waiting for the lock to be acquired, the timeout will be timed out as much time left and the queued score will not be refreshed.
//Redisson version 3.16.8
if (command == RedisCommands.EVAL_LONG) {
return evalWriteAsync(getRawName(), command,
//Step 1: remove stale threads, remove threads waiting for timeout
"while true do " +
//Get the first element in the queue
//KEYS[2] is the name of a queue used to queue threads
"local firstThreadId2 = ('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
//Get the score corresponding to the first element in the queue, that is, the expiration time of the first thread ranked first
//KEYS[3] is the name of an ordered collection used to sort threads
"local timeout = tonumber(('zscore', KEYS[3], firstThreadId2));" +
//If the expiration time of the first thread is less than the current time, it means that the thread has not yet obtained the lock after waiting for timeout, so it must be removed
//ARGV[4] is the current time
"if timeout <= tonumber(ARGV[4]) then " +
//Remove this thread from the ordered collection + queue
"('zrem', KEYS[3], firstThreadId2);" +
"('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
//Step 2: Determine whether the current thread can try to acquire the lock. The following two situations can be judged to try to acquire the lock
//Scenario 1: The lock does not exist + the queue does not exist; KEYS[1] is the name of the lock; KEYS[2] is the queue queued for threads;
//Scenario 2: The lock does not exist + the queue exists + The first element of the queue is the current thread; ARGV[2] is the UUID + ThreadID of the current thread;
"if (('exists', KEYS[1]) == 0) " +
"and ((('exists', KEYS[2]) == 0) " +
"or (('lindex', KEYS[2], 0) == ARGV[2])) then " +
//Step 3: The current thread performs the operation of acquiring the lock
//The first element of the pop-up queue + Delete the element corresponding to UUID:ThreadID from the ordered collection
"('lpop', KEYS[2]);" +
"('zrem', KEYS[3], ARGV[2]);" +
//Decrement the fraction of each thread in the ordered set, that is, decrement the waiting time when each thread acquires the lock
//zrange returns the members (0,-1) in the specified interval in the ordered set KEYS[3], that is, all members
"local keys = ('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
//Score minus the keys[i] of the ordered set KEYS[3]: tonumber(ARGV[3])
//ARGV[3] is the time that thread can wait when it acquires locks. The default is 5 minutes
"('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
//hset sets the Hash value for locking operation + pexpire sets the expiration time of the lock key + Finally, return nil to indicate that the locking is successful
"('hset', KEYS[1], ARGV[2], 1);" +
"('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
//Step 4: Determine whether the lock has been held by the current thread (reenter the lock). KEYS[1] is the name of the lock, and ARGV[2] is the UUID+ThreadID of the current thread;
"if ('hexists', KEYS[1], ARGV[2]) == 1 then " +
"('hincrby', KEYS[1], ARGV[2],1);" +
"('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
//Step 5: Determine whether the thread that currently fails to acquire the lock is queued in the queue
//KEYS[3] is an ordered set of thread sorting, and ARGV[2] is the UUID + ThreadID of the current thread;
"local timeout = ('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
//If the thread that currently fails to acquire the lock is already queued in the queue
// Then return to the thread waiting for the lock to be acquired, and the timeout will be over as much time left. The external code will block and wait for this time after this time after the external code gets this time.
//ARGV[3] is the time that the current thread can wait when acquiring the lock, ARGV[4] is the time
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
//Step 6: Queue the threads that failed to acquire the lock
"local lastThreadId = ('lindex', KEYS[2], -1);" +
"local ttl;" +
//If the last element queued in the queue is not the current thread
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId is the last thread in the queue, ARGV[2] is the UUID + thread ID of the current thread, ARGV[4] is the current time
//Because the thread with the maximum expiration time is the last in the queue
//So the expiration time of the current thread can be calculated by the expiration time of the last element in the queue
//This ensures that the expiration time of the threads newly added to the queue and ordered collection is the largest
//The following line will calculate: How much time is left? The last thread in the current queue will expire. The external code will block and wait for this time after getting this time.
//In this way, the thread that joins the queue will block the expiration time of the thread that waits for the previous thread that joins the queue.
"ttl = tonumber(('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//The following line will calculate: How much time is left, the lock will expire, and the external code will block and wait for this time after getting this time.
"ttl = ('pttl', KEYS[1]);" +
"end;" +
// Calculate the expiration time of the current thread while waiting for the lock queue.
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//Insert the current thread as an element into the ordered set, and set the element score to the expiration time when the thread is queued for locks.
//Then insert the current thread as an element into the tail of the queue
"if ('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
(getRawName(), threadsQueueName, timeoutSetName),
(leaseTime),
getLockName(threadId),
wait,//Default is 5 minutes
currentTime
);
}
(2) If the old version fails to lock again, the queue score will be refreshed (the timeout waiting for the timeout)
The old version of fair locked lua script is shown below, and when the second client thread locks again, it will enter the queueing logic again.
First, the calculation timeout will be made as much time as the first element in the queue has, that is, ttl. Then, based on the waiting timeout passed by ttl +, calculate the timeout of the current thread waiting lock.
Then execute the command "zadd redisson_lock_timeout:{myLock} timeout UUID2:ThreadID2", and refresh the fraction of the element of the same name in the ordered set to be timeout. Every time the client thread repeatedly attempts to add a lock, it will extend its corresponding expiration time, which means it refreshes the queued score.
When adding an existing element, the zadd command returns 0, but updates the score of the element.
//Redisson version 3.8.1
if (command == RedisCommands.EVAL_LONG) {
return (getName(), command,
//Step 1: Remove the thread waiting for timeout
"while true do " +
//Get the first element in the queue
//KEYS[2] is the name of a queue used to queue threads
"local firstThreadId2 = ('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end; " +
//Get the score corresponding to the first element in the queue, that is, the expiration time of the first thread ranked first
//KEYS[3] is the name of an ordered collection used to sort threads
"local timeout = tonumber(('zscore', KEYS[3], firstThreadId2));" +
//If the expiration time of the first thread is less than the current time, it means that the thread has not yet obtained the lock after waiting for timeout, so it must be removed
//ARGV[4] is the current time
"if timeout <= tonumber(ARGV[4]) then " +
//Remove this thread from the ordered collection + queue
"('zrem', KEYS[3], firstThreadId2); " +
"('lpop', KEYS[2]); " +
"else " +
"break;" +
"end; " +
"end;" +
//Step 2: Determine whether the current thread can try to acquire the lock. The following two situations can be judged by
//Scenario 1: The lock does not exist + the queue does not exist; KEYS[1] is the name of the lock; KEYS[2] is the queue queued for threads;
//Scenario 2: The lock does not exist + the queue exists + The first element of the queue is the current thread; ARGV[2] is the UUID+ThreadID of the current thread;
"if (('exists', KEYS[1]) == 0) and ((('exists', KEYS[2]) == 0) " +
"or (('lindex', KEYS[2], 0) == ARGV[2])) then " +
//Step 3: The current thread performs the operation of acquiring the lock
//The first element of the pop-up queue + Delete the element corresponding to UUID:ThreadID from the ordered collection
"('lpop', KEYS[2]); " +
"('zrem', KEYS[3], ARGV[2]); " +
//hset sets the Hash value for locking operation + pexpire sets the expiration time of the lock key + Finally, return nil to indicate that the locking is successful
"('hset', KEYS[1], ARGV[2], 1); " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//Step 4: Determine whether the lock has been held by the current thread. KEYS[1] is the name of the lock, and ARGV[2] is the UUID+ThreadID of the current thread;
"if (('hexists', KEYS[1], ARGV[2]) == 1) then " +
"('hincrby', KEYS[1], ARGV[2], 1); " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//Step 5: Queue the threads that failed to acquire the lock and process them.
"local firstThreadId = ('lindex', KEYS[2], 0); " +
"local ttl; " +
//If the first element queued in the queue is not the current thread
"if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " +
//Timeout will be exceeded if the first element in the queue has more time left
"ttl = tonumber(('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" +
"else " +
"ttl = ('pttl', KEYS[1]);" +
"end; " +
// Calculate the timeout time of the current thread waiting for lock
"local timeout = ttl + tonumber(ARGV[3]);" +
//Insert the current thread as an element into the ordered set, and set the element score to the expiration time when the thread is queued for locks.
//Then insert the current thread as an element into the tail of the queue
"if ('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"('rpush', KEYS[2], ARGV[2]);" +
"end; " +
"return ttl;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),//KEYS[1], KEYS[2], KEYS[3]
internalLockLeaseTime,//ARGV[1]
getLockName(threadId),//ARGV[2]
currentTime + threadWaitTime,//ARGV[3] = current time + 5 seconds
CurrentTime//ARGV[4]
);
}
Note: It is not possible to just use ordered sets, because the fractions of ordered sets will also change during the execution of the lua script. In the old version, every time the client thread attempts to add a lock, the scores in the ordered set will be updated. In the new version, when the current thread can try to acquire the lock, it will also traverse the scores in the updated ordered collection.
In addition, the time complexity of obtaining the first element in an ordered set is higher than that of the queue. It is not possible to just use queues, because you need to manage the waiting timeout of queued threads. If there is no ordered collection, the threads that have timed out in the queue cannot be removed. Of course, in order to manage the waiting timeout of threads, it is also possible to replace the ordered set with two layers of hash values.
5. Queue reordering of fair lock source code
(1) The new version will re-arrange the queue after 5 minutes.
(2) The old version will re-arrange the queue after 5 seconds.
(3) What causes the queue reordering is step one of the lua script (removing the thread waiting for timeout)
(1) The new version will re-arrange the queue after 5 minutes.
In the new version of fair lock, threads that fail to acquire the lock will enter the queue and wait for up to 5 minutes by default.
In these 5 minutes, the thread will not refresh the queue sort and score no matter how many times it is locked again.
Within these 5 minutes, the thread will be moved out of the queue and ordered collection without making another lock attempt. So after 5 minutes, the thread tries to lock again, which will re-enter, causing the queue to be re-arranged.
(2) The old version will re-arrange the queue after 5 seconds.
In the old version of fair lock, threads that fail to acquire the lock will enter the queue for up to 5 seconds by default.
Within these 5 seconds, if the thread tries to lock again, it will extend its maximum waiting time, that is, refresh the queue score in the ordered set.
Within these 5 seconds, the thread will be moved out of the queue and ordered collection without making another lock attempt. So after 5 seconds, the thread tries to lock again, which will re-enter, causing the queue to be re-arranged.
(3) What causes the queue reordering is step one of the lua script (removing the thread waiting for timeout)
That is, the role of while loop in fair lock lua script.
When the client thread uses RedissonLock's tryAcquire() method to try to acquire a fair lock and specifies a timeout for acquiring the lock. For example, if the specified client thread queues for more than 20 seconds, it no longer tries to acquire the lock. If the timeout time of obtaining the lock is not specified, the new version is the default timeout of 5 minutes, and the old version is the default timeout after 5 seconds.
At this time, since these thread elements waiting to acquire the lock have timed out still exist in the queue and ordered collection, the while loop logic can be used to clear these client threads that no longer try to acquire the lock.
In the new version, these threads waiting for the acquisition lock timeout will be moved out of the queue over time. In the older version, as time went by, these threads waiting for the timeout to acquire the lock, as long as they no longer tried to add the lock, the timeout time they waited for the acquisition lock would not be updated and would be removed from the queue.
If the client goes down, the client will not try to acquire the lock again. In the new version, over time, the downtime client threads are moved out of the queue. In older versions, the timeout score in the ordered set is not refreshed and extended, so that the logic of the while loop will move these downtime client threads out of the queue.
In the new version, the downtime client thread will be moved out of the queue after up to 5 minutes. In older versions, the downtime client thread will be moved out of the queue after up to 5 seconds.
Due to network delay and other reasons, the client thread may wait for locking too long, which triggers the reordering of the queueing order of each client thread. If some clients wait in the queue for too long, they may trigger the queue reordering once. The frequency of the new version triggering reordering is every 5 minutes, and the frequency of the old version triggering reordering is every 5 seconds.
//Step 1: Remove the thread waiting for timeout
"while true do " +
//Get the first element in the queue
//KEYS[2] is the name of a queue used to queue threads
"local firstThreadId2 = ('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end; " +
//Get the score corresponding to the first element in the queue, that is, the expiration time of the first thread ranked first
//KEYS[3] is the name of an ordered collection used to sort threads
"local timeout = tonumber(('zscore', KEYS[3], firstThreadId2));" +
//If the expiration time of the first thread is less than the current time, it means that the thread has not yet obtained the lock after waiting for timeout, so it must be removed
//ARGV[4] is the current time
"if timeout <= tonumber(ARGV[4]) then " +
//Remove this thread from the ordered collection + queue
"('zrem', KEYS[3], firstThreadId2); " +
"('lpop', KEYS[2]); " +
"else " +
"break;" +
"end; " +
"end;" +
6. Fair lock source code release lock
(1) The process of releasing fair locks
(2) Lua script analysis for releasing fair lock
(1) The process of releasing fair locks
The first thing to call RedissonLock's unlock() method when releasing the fair lock is to be called.
In the unlock() method of RedissonLock, get(unlockAsync()) will be called. That is, first call the unlockAsync() method of RedissonBaseLock, and then call the get() method of RedissonObject.
One of the RedissonBaseLock unlockAsync() methods is an asynchronous execution method, and the operation of releasing the lock is executed asynchronously. The get() method of RedisObject will wait synchronously to obtain the result of asynchronous execution through RFuture. Therefore, get(unlockAsync()) can be understood as asynchronous to synchronization.
In the unlockAsync() method of RedissonBaseLock, the unlockInnerAsync() method of RedissonFairLock will be called to release the lock. Then, after completing the process of releasing the lock, the timing scheduling task will be cancelled asynchronously.
public class Application {
public static void main(String[] args) throws Exception {
Config config = new Config();
().addNodeAddress("redis://192.168.1.110:7001");
//Create a RedissonClient instance
RedissonClient redisson = (config);
//Get fair reentrant lock
RLock fairLock = ("myLock");
();
();
...
}
}
public class RedissonLock extends RedissonBaseLock {
...
@Override
public void unlock() {
...
//Asynchronous to synchronous
//The first call is the unlockAsync() method of RedissonBaseLock
//Then the call is the get() method of RedissonObject
get(unlockAsync(().getId()));
...
}
...
}
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
...
@Override
public RFuture<Void> unlockAsync(long threadId) {
//Asynchronously execute the lua script that releases the lock
RFuture<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = ((opStatus, e) -> {
//Cancel the scheduled task
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompleteFutureWrapper<>(f);
}
protected abstract RFuture<Boolean> unlockInnerAsync(long threadId);
...
}
public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime;
private final CommandAsyncExecutor commandExecutor;
private final String threadsQueueName;
private final String timeoutSetName;
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor, name, 60000*5);
}
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
super(commandExecutor, name);
= commandExecutor;
= threadWaitTime;
threadsQueueName = prefixName("redisson_lock_queue", name);
timeoutSetName = prefixName("redisson_lock_timeout", name);
}
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), RedisCommands.EVAL_BOOLEAN,
//Step 1: Remove the thread waiting for timeout
"while true do " +
//Get the first element in the queue
//KEYS[2] is the name of a queue used to queue threads
"local firstThreadId2 = ('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end; " +
//Get the score corresponding to the first element in the queue, that is, the expiration time of the first thread ranked first
//KEYS[3] is the name of an ordered collection used to sort threads
"local timeout = tonumber(('zscore', KEYS[3], firstThreadId2));" +
//If the expiration time of the first thread is less than the current time, it means that the thread has not yet obtained the lock after waiting for timeout, so it must be removed
//ARGV[4] is the current time
"if timeout <= tonumber(ARGV[4]) then " +
//Remove this thread from the ordered collection + queue
"('zrem', KEYS[3], firstThreadId2); " +
"('lpop', KEYS[2]); " +
"else " +
"break;" +
"end; " +
"end;" +
//Step 2: Determine whether the lock still exists, and determine whether the Hash value of the key is the lock name exists
"if (('exists', KEYS[1]) == 0) then " +
//Get the first thread in the queue
"local nextThreadId = ('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
//ARGV[1] is the type of notification event
"('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " +
"end;" +
//Step 2: Determine whether the lock still exists, and determine whether the Hash value of the key is UUID+thread ID exists
"if (('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//The Hash value of key is UUID + thread ID is decremented by 1
"local counter = ('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"end; " +
"('del', KEYS[1]); " +
"local nextThreadId = ('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
//Posted an event to the first thread in the queue
"('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; ",
(getRawName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.UNLOCK_MESSAGE,//ARGV[1]
internalLockLeaseTime,
getLockName(threadId),
()
);
}
...
}
(2) Lua script analysis for releasing fair lock
Step 1: Remove the thread waiting for timeout
First, it will also enter the while loop and remove the thread waiting for the timeout. That is, get the first thread in the queue and determine whether the expiration time of the thread is less than the current time. If it is less than the current time, it means that the thread's queue in the queue has expired, so the thread is removed from the ordered set + queue. If the thread tries to lock again, it will be reordered + re-queued.
Step 2: Determine whether the lock still exists
If the Hash value whose key is the lock name does not exist, first obtain the first thread in the queue, and then publish an event to the corresponding client of the thread to obtain the lock.
If the Hash value whose key is the lock name still exists, then determine whether the mapping of field is UUID + thread ID exists. If the mapping of field is UUID + thread ID does not exist, it means that the lock has been released and returns nil directly. If the mapping of field is UUID + thread ID exists, then in the Hash value of field is UUID + thread ID, the value of field is UUID + thread ID is decremented by 1. That is, call Redis's hincrby command and decrement by 1.
Step 3: The result after decreasing by 1 is as follows
If the result after decrementing by 1 is greater than 0, it means that the thread is still holding the lock. Corresponding to the thread holding the lock reentering the lock multiple times, the expiration time of the lock needs to be reset.
If the result after decrementing by 1 is less than 0, it means that the thread no longer holds the lock, delete the key corresponding to the lock and publish an event to the client corresponding to the first thread in the queue.
7. Add locks in order of fair lock source code
(1) After the lock is released, the second client thread will first add the lock
(2) After the lock is released, the first client thread will add the lock again
Suppose client A holds the lock first, and client B is ranked behind client C in the queue. So if client A releases the lock, how do clients B and C lock in order?
(1) After the lock is released, the second client thread will first add the lock
After the lock is released by client A, the lock key is deleted, client B first tries to add the lock. At this time, the logic of step 2 of the lua script executed by client B:
//check if the lock can be acquired now
//Step 2: Determine whether the current thread can try to acquire the lock. The following two situations can be judged to try to acquire the lock
//Scenario 1: The lock does not exist + the queue does not exist; KEYS[1] is the name of the lock;
//Scenario 2: The lock does not exist + the queue exists + The first element of the queue is the current thread; ARGV[2] is the UUID + ThreadID of the current thread;
"if (('exists', KEYS[1]) == 0) " +
"and ((('exists', KEYS[2]) == 0) " +
"or (('lindex', KEYS[2], 0) == ARGV[2])) then " +
...
"end;"
First, execute the judgment "exists myLock = 0". Since the current lock exists, the condition does not hold.
Then, the judgment "exists redisson_lock_queue:{myLock} = 0" is executed, and the condition does not hold due to the existence of the queue.
Next, the judgment "lindex redisson_lock_queue:{myLock} 0 == UUID2:ThreadID2". Since the queue exists, the first one in the queue is not client B but client C, so the condition is not true and client B cannot lock it.
It can be seen from this that even after the lock is released, multiple clients try to add locks, they only recognize the first client in the queue. This enables the acquisition of locks in sequence in order of queues, ensuring fairness.
(2) After the lock is released, the first client thread will add the lock again
When the client C, which ranks first in the queue, comes to try to lock, the locking logic in step three will be executed:
//check if the lock can be acquired now
//Step 2: Determine whether the current thread can try to acquire the lock. The following two situations can be judged to try to acquire the lock
//Scenario 1: The lock does not exist + the queue does not exist; KEYS[1] is the name of the lock; KEYS[2] is the queue queued for threads;
//Scenario 2: The lock does not exist + the queue exists + The first element of the queue is the current thread; ARGV[2] is the UUID+ThreadID of the current thread;
"if (('exists', KEYS[1]) == 0) " +
"and ((('exists', KEYS[2]) == 0) " +
"or (('lindex', KEYS[2], 0) == ARGV[2])) then " +
//Step 3: The current thread performs the operation of acquiring the lock
//remove this thread from the queue and timeout set
//The first element of the pop-up queue + Delete the element corresponding to UUID:ThreadID from the ordered collection
"('lpop', KEYS[2]);" +
"('zrem', KEYS[3], ARGV[2]);" +
//decrease timeouts for all waiting in the queue
//Decrement the fraction of each thread in the ordered set, that is, decrement the waiting time when each thread acquires the lock
//zrange returns the members (0,-1) in the specified interval in the ordered set KEYS[3], that is, all members
"local keys = ('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
//Score minus the keys[i] of the ordered set KEYS[3]: tonumber(ARGV[3])
//ARGV[3] is the time that thread can wait when it acquires locks. The default is 5 minutes
"('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
//acquire the lock and set the TTL for the lease
//hset sets the Hash value for locking operation + pexpire sets the expiration time of the lock key + Finally, return nil to indicate that the locking is successful
"('hset', KEYS[1], ARGV[2], 1);" +
"('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;"
First, execute the command "lpop redisson_lock_queue:{myLock}" to pop out the first element in the queue.
Then, execute the command "zrem redisson_lock_timeout:{myLock} UUID3:ThreadID3" to delete the corresponding elements of the thread of client C in the ordered collection.
Next, execute "hset myLock UUID3:ThreadID3 1" to lock, set field to UUID + thread ID value to 1.
Finally, execute the command "pexpire myLock 30000" and set the expiration time of the Hash value with the key as the lock name to 30000 milliseconds.
After client C completes locking, client C will dequeue from the queue, and client B is at the head of the queue.