Outline
RedissonLock Overview
2. Create a RedissonClient instance by reentering the lock source code
3. Re-enter the lock source code of the lua script lock logic
4. WatchDog can reenter the lock source code to maintain lock logic
5. Reentrant lock logic of reentrant lock source code
6. Mutual-exclusive blocking logic of locks that can reenter the lock source code
7. Release lock logic of reentering lock source code
8. The reenter lock source code to obtain lock timeout and lock timeout automatically release logic
9. Summary of the source code of reentrant lock
RedissonLock Overview
(1) Introduce dependencies in
(2) Build RedissonClient and use Redisson
(3) Redisson reentrant lock RedissonLock is simple to use
(1) Introduce dependencies in
<dependencies>
<dependency>
<groupId></groupId>
<artifactId>redisson</artifactId>
<version>3.16.8</version>
</dependency>
</dependencies>
(2) Build RedissonClient and use Redisson
Refer to the Chinese document on the official website to connect to the Redis Cluster of 3 masters and 3 slaves.
///redisson/redisson/wiki/Catalog
public class Application {
public static void main(String[] args) throws Exception {
//Redis CLuster connecting 3 masters and 3 slaves
Config config = new Config();
()
.addNodeAddress("redis://192.168.1.110:7001")
.addNodeAddress("redis://192.168.1.110:7002")
.addNodeAddress("redis://192.168.1.110:7003")
.addNodeAddress("redis://192.168.1.111:7001")
.addNodeAddress("redis://192.168.1.111:7002")
.addNodeAddress("redis://192.168.1.111:7003");
//Create a RedissonClient instance
RedissonClient redisson = (config);
//Acquiring the reentrant lock
RLock lock = ("myLock");
();
();
RMap<String, Object> map = ("myMap");
("foo", "bar");
map = ("myMap");
(("foo"));
}
}
(3) Redisson reentrant lock RedissonLock is simple to use
Redisson reentrable lock RLock implements an interface, and also provides asynchronous (Async), reactive (Reactive) and RxJava2 standard interfaces.
RLock lock = ("myLock");
//The most common way to use
();
If the lock timeout time is unreasonable, the lock cannot be released actively when the timeout time has expired, but in fact the lock is released by the Redis node through the expiration time, this will be problematic.
To avoid this, Redisson provides an internal WatchDog for monitoring locks. WatchDog's function is to continuously extend the validity period of the lock before the Redisson instance is closed.
The default timeout for WatchDog check lock is 30 seconds, which can be specified by.
RLock's tryLock method provides the leaseTime parameter to specify the timeout time of the lock. After this time is exceeded, the lock will be automatically released.
//If the lock is not released actively, the lock will be automatically released in 10 seconds
(10, );
//Waiting for locking is up to 100 seconds; if the lock is not released actively after locking is successfully completed, the lock will be automatically released after 10 seconds.
boolean res = (100, 10, );
if (res) {
try {
...
} finally {
();
}
}
RLock fully complies with Java's Lock specification, that is, only processes with locks can be unlocked, and other processes will throw an IllegalMonitorStateException error if they unlock. If other processes are required to unlock, you can use the distributed semaphore Semaphore.
2. Create a RedissonClient instance by reentering the lock source code
(1) Initialize the ConnectionManager ConnectionManager with Redis
(2) Initialize Redis's command executor CommandExecutor
Use the() method to create a RedissonClient instance according to the configuration, because the Redisson class will implement the RedissonClient interface, and the main task of creating a RedissonClient instance is actually:
1. Initialize the ConnectionManager with Redis
2. Initialize the CommandExecutor command executor of Redis
(1) Initialize the ConnectionManager ConnectionManager with Redis
Redis's configuration class Config will be encapsulated in the ConnectionManager ConnectionManager. In the future, Redis's configuration class Config can be obtained through the ConnectionManager ConnectionManager.
public class Application {
public static void main(String[] args) throws Exception {
Config config = new Config();
().addNodeAddress("redis://192.168.1.110:7001");
//Create a RedissonClient instance
RedissonClient redisson = (config);
...
}
}
//Create the source code of RedissonClient instance
public class Redisson implements RedissonClient {
protected final Config config;//Redis configuration class
protected final ConnectionManager connectionManager;//Redis's connection manager
protected final CommandAsyncExecutor commandExecutor;//Redis's command executor
...
public static RedissonClient create(Config config) {
return new Redisson(config);
}
protected Redisson(Config config) {
= config;
Config configCopy = new Config(config);
//Create a connection manager with Redis based on the Redis configuration class Config instance
connectionManager = (configCopy);
RedissonObjectBuilder objectBuilder = null;
if (()) {
objectBuilder = new RedissonObjectBuilder(this);
}
//Create Redis command executor
commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor);
}
...
}
public class ConfigSupport {
...
//Create Redis's connection manager
public static ConnectionManager createConnectionManager(Config configCopy) {
//Generate UUID
UUID id = ();
...
if (() != null) {
validate(());
//Return ClusterConnectionManager instance
return new ClusterConnectionManager((), configCopy, id);
}
...
}
...
}
public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
super(config, id);
...
= ();
//Encapsulate Redis's configuration class Config in ConnectionManager
= create(cfg);
initTimer();
Throwable lastException = null;
List<String> failedMasters = new ArrayList<String>();
for (String address : ()) {
RedisURI addr = new RedisURI(address);
//Asynchronously connect to Redis node
CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, addr, ());
...
//Get the connection established through connectionFuture blocking
RedisConnection connection = ().join();
...
List<ClusterNodeInfo> nodes = (clusterNodesCommand);
...
CompletableFuture<Collection<ClusterPartition>> partitionsFuture = parsePartitions(nodes);
Collection<ClusterPartition> partitions = ();
List<CompletableFuture<Void>> masterFutures = new ArrayList<>();
for (ClusterPartition partition: partitions) {
if (()) {
(().toString());
continue;
}
if (() == null) {
throw new IllegalStateException("Master node: " + () + " doesn't have address.");
}
CompleteFuture<Void> masterFuture = addMasterEntry(partition, cfg);
(masterFuture);
}
CompleteFuture<Void> masterFuture = ((new CompleteFuture[0]));
();
...
}
...
}
...
}
public class MasterSlaveConnectionManager implements ConnectionManager {
protected final String id;//UUID when initialized
private final Map<RedisURI, RedisConnection> nodeConnections = new ConcurrentHashMap<>();
...
protected MasterSlaveConnectionManager(Config cfg, UUID id) {
= ();//The UUID is passed in
= cfg;
...
}
protected final CompletionStage<RedisConnection> connectToNode(NodeType type, BaseConfig<?> cfg, RedisURI addr, String sslHostname) {
RedisConnection conn = (addr);
if (conn != null) {
if (!()) {
closeNodeConnection(conn);
} else {
return (conn);
}
}
//Create a Redis client connection instance
RedisClient client = createClient(type, addr, (), (), sslHostname);
//Send an asynchronous connection request to the Redis server, this future will return layer by layer outward
CompletionStage<RedisConnection> future = ();
return (connection -> {
if (()) {
if (!()) {
RedisURI address = new RedisURI(() + "://" + ().getAddr().getAddress().getHostAddress() + ":" + ().getAddr().getPort());
(address, connection);
}
(addr, connection);
return (connection);
} else {
();
CompleteFuture<RedisConnection> f = new CompleteFuture<>();
(new RedisException("Connection to " + ().getAddr() + " is not active!"));
return f;
}
});
}
//Create a Redis client connection instance
@Override
public RedisClient createClient(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) {
RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname);
return (redisConfig);
}
...
}
//Redisson mainly uses Netty to establish connections with the Redis server
public final class RedisClient {
private final Bootstrap bootstrap;
private final Bootstrap pubSubBootstrap;
...
public static RedisClient create(RedisClientConfig config) {
return new RedisClient(config);
}
private RedisClient(RedisClientConfig config) {
...
bootstrap = createBootstrap(copy, );
pubSubBootstrap = createBootstrap(copy, );
= ();
}
private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
Bootstrap bootstrap = new Bootstrap()
.resolver(())
.channel(())
.group(());
(new RedisChannelInitializer(bootstrap, config, this, channels, type));
(ChannelOption.CONNECT_TIMEOUT_MILLIS, ());
(ChannelOption.SO_KEEPALIVE, ());
(ChannelOption.TCP_NODELAY, ());
().afterBoostrapInitialization(bootstrap);
return bootstrap;
}
//Stack an asynchronous connection request to the Redis server
public RFuture<RedisConnection> connectAsync() {
CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();
CompleteFuture<RedisConnection> f = (res -> {
CompletableFuture<RedisConnection> r = new CompleteFuture<>();
//Netty's Bootstrap initiates the connection
ChannelFuture channelFuture = (res);
(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (().group().isShuttingDown()) {
IllegalStateException cause = new IllegalStateException("RedisClient is shutdown");
(cause);
return;
}
if (()) {
RedisConnection c = (());
().whenComplete((res, e) -> {
().group().execute(new Runnable() {
@Override
public void run() {
if (e == null) {
if (!(c)) {
();
}
} else {
(e);
();
}
}
});
});
} else {
().group().execute(new Runnable() {
public void run() {
(());
}
});
}
}
});
return r;
});
return new CompleteFutureWrapper<>(f);
}
...
}
(2) Initialize Redis's command executor CommandExecutor
First, CommandSyncService inherits from CommandAsyncService class.
The CommandAsyncService class implements the CommandExecutor interface.
The ConnectionManager connection manager is then encapsulated in the Command Executor commandexecutor.
Therefore, the Connection Manager ConnectionManager can be obtained through the CommandExecutor command executor.
//Redis command synchronous executor CommandSyncService
public class CommandSyncService extends CommandAsyncService implements CommandExecutor {
//Initialize CommandExecutor
public CommandSyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {
super(connectionManager, objectBuilder, );
}
public <T, R> R read(String key, RedisCommand<T> command, Object... params) {
return read(key, (), command, params);
}
public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {
RFuture<R> res = readAsync(key, codec, command, params);
return get(res);
}
public <T, R> RevalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalRead(key, (), evalCommandType, script, keys, params);
}
public <T, R> RevalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
RFuture<R> res = evalReadAsync(key, codec, evalCommandType, script, keys, params);
return get(res);
}
public <T, R> RevalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalWrite(key, (), evalCommandType, script, keys, params);
}
public <T, R> RevalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
RFuture<R> res = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
return get(res);
}
}
//AsyncService, the asynchronous executor of Redis command, CommandAsyncService
public class CommandAsyncService implements CommandAsyncExecutor {
//Redis Connection Manager
final ConnectionManager connectionManager;
final RedissonObjectBuilder objectBuilder;
final referenceType;
public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, referenceType) {
= connectionManager;
= objectBuilder;
= referenceType;
}
@Override
public <V> V getNow(CompletableFuture<V> future) {
try {
return (null);
} catch (Exception e) {
return null;
}
}
@Override
public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {
RFuture<R> res = readAsync(key, codec, command, params);
return get(res);
}
@Override
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
NodeSource source = getNodeSource(key);
return async(true, source, codec, command, params, false, false);
}
private NodeSource getNodeSource(String key) {
int slot = (key);
return new NodeSource(slot);
}
public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) {
CompleteFuture<R> mainPromise = createPromise();
RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry);
();
return new CompleteFutureWrapper<>(mainPromise);
}
@Override
public <V> V get(RFuture<V> future) {
if (().getName().startsWith("redisson-netty")) {
throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
}
try {
return ().get();
} catch (InterruptedException e) {
(true);
().interrupt();
throw new RedisException(e);
} catch (ExecutionException e) {
throw convertException(e);
}
}
...
}
3. Re-enter the lock source code of the lua script lock logic
(1) Get a RedissonLock instance through the () method
(2) Execution process when locking
(3) Lua script executed when locking
(4) Command executor logic for executing locked lua script
(5) How to get the corresponding node according to the slot value
(1) Get a RedissonLock instance through the () method
In the () method, the command executor CommandExecutor is passed to create a RedissonLock instance. The command executor CommandExecutor is initialized when executing the () method, so the command executor CommandExecutor will be encapsulated in the RedissonLock instance.
Therefore, through the RedissonLock instance, a command executor CommandExecutor can be obtained, the connection manager ConnectionManager can be obtained, the connection manager ConnectionManager can be obtained through the connection manager ConnectionManager can be obtained, and various configuration information can be obtained through the Redis configuration information class Config.
The RedissonLock class inherits from the RedissonBaseLock class that implements the RLock interface. In the RedissonLock construction method, there is an internalLockLeaseTime variable, which is related to the WatchDog watchdog. The default value of interlnalLockLeaseTime is 30000 milliseconds, that is, 30 seconds;
public class Application {
public static void main(String[] args) throws Exception {
Config config = new Config();
().addNodeAddress("redis://192.168.1.110:7001");
//Create a RedissonClient instance
RedissonClient redisson = (config);
//Acquiring the reentrant lock
RLock lock = ("myLock");
();
...
}
}
//Create Redisson instance
public class Redisson implements RedissonClient {
protected final Config config;//Redis configuration class
protected final ConnectionManager connectionManager;//Redis's connection manager
protected final CommandAsyncExecutor commandExecutor;//Redis's command executor
...
public static RedissonClient create(Config config) {
return new Redisson(config);
}
protected Redisson(Config config) {
...
//Create a connection manager with Redis based on the Redis configuration class Config instance
connectionManager = (configCopy);
//Create Redis command executor
commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
...
}
...
@Override
public RLock getLock(String name) {
return new RedissonLock(commandExecutor, name);
}
...
}
//Create a RedissonLock instance
//The command executor can be obtained through the RedissonLock instance;
public class RedissonLock extends RedissonBaseLock {
protected long internalLockLeaseTime;
protected final LockPubSub pubSub;
final CommandAsyncExecutor commandExecutor;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
= commandExecutor;
//InternalLockLeaseTime related to WatchDog
//The connection manager ConnectionManager can be obtained through the command executor CommandExecutor
//The configuration information of Redis can be obtained through the ConnectionManager ConnectionManager.
//The lockWatchdogTimeout timeout can be obtained through Redis's configuration information class Config
= ().getCfg().getLockWatchdogTimeout();
= ().getSubscribeService().getLockPubSub();
}
...
}
//Create Redis command executor
//The connection manager ConnectionManager can be obtained through the command executor CommandExecutor
public class CommandAsyncService implements CommandAsyncExecutor {
final ConnectionManager connectionManager;
...
public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, referenceType) {
= connectionManager;
= objectBuilder;
= referenceType;
}
@Override
public ConnectionManager getConnectionManager() {
return connectionManager;
}
...
}
//Create Redis's connection manager
//The configuration information of Redis can be obtained through the ConnectionManager ConnectionManager.
public class ClusterConnectionManager extends MasterSlaveConnectionManager {
...
public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
super(config, id);
...
}
...
}
//Create Redis's connection manager
//The configuration information of Redis can be obtained through the ConnectionManager ConnectionManager.
public class MasterSlaveConnectionManager implements ConnectionManager {
private final Config cfg;
protected final String id;//UUID when initialized
...
protected MasterSlaveConnectionManager(Config cfg, UUID id) {
= ();//The UUID is passed in
= cfg;
...
}
@Override
public Config getCfg() {
return cfg;
}
...
}
//The lockWatchdogTimeout variable in the configuration information class Config is initialized to 30 seconds, which is related to WatchDog
public class Config {
private long lockWatchdogTimeout = 30 * 1000;
...
//This parameter is only used if lock has been acquired without leaseTimeout parameter definition.
//Lock expires after "lockWatchdogTimeout" if watchdog didn't extend it to next "lockWatchdogTimeout" time interval.
//This prevents against infinity locked locks due to Redisson client crush or any other reason when lock can't be released in proper way.
//Default is 30000 million seconds
public Config setLockWatchdogTimeout(long lockWatchdogTimeout) {
= lockWatchdogTimeout;
return this;
}
public long getLockWatchdogTimeout() {
return lockWatchdogTimeout;
}
}
By default, when the() method is called to lock, the passed leaseTime is -1. At this time, the lock timeout time will be set to the default 30 seconds of lockWatchdogTimeout, thereby avoiding deadlock.
public class RedissonLock extends RedissonBaseLock {
...
// Add lock
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = ().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
...
}
//Unlock
@Override
public void unlock() {
try {
get(unlockAsync(().getId()));
} catch (RedisException e) {
if (() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) ();
} else {
throw e;
}
}
}
...
}
(2) Execution process when locking
First, the tryAcquire() method of RedissonLock will be called to process the asynchronous RFuture correlation, and then the tryAcquireAsync() method of RedissonLock will be called to process the result of the execution script, and then the method will be called to execute the locked lua script.
public class RedissonLock extends RedissonBaseLock {
protected long internalLockLeaseTime;
protected final LockPubSub pubSub;
final CommandAsyncExecutor commandExecutor;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
= commandExecutor;
//InternalLockLeaseTime related to WatchDog
//The connection manager ConnectionManager can be obtained through the command executor CommandExecutor
//The configuration information of Redis can be obtained through the ConnectionManager ConnectionManager.
//The lockWatchdogTimeout timeout can be obtained through Redis's configuration information class Config
= ().getCfg().getLockWatchdogTimeout();
= ().getSubscribeService().getLockPubSub();
}
...
// Add lock
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//Thread ID, used to generate the value of setting Hash
long threadId = ().getId();
//Try to add lock, and execute the () method to pass the default leaseTime=-1
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// ttl is null means that the lock is successfully
if (ttl == null) {
return;
}
//Processing when locking fails
CompleteFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
(future);
} else {
(future);
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
(future).getLatch().tryAcquire(ttl, );
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
(future).getLatch().tryAcquire(ttl, );
}
} else {
if (interruptibly) {
(future).getLatch().acquire();
} else {
(future).getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe((future), threadId);
}
}
...
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//By default waitTime and leaseTime are both -1, the get method called below is the get() method from RedissonObject
//It can be understood as asynchronous to synchronous: to convert asynchronous tryAcquireAsync to synchronously by get
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// By default, since leaseTime=-1, internalLockLeaseTime when initializing the RedissonLock instance will be used.
//The default value of internalLockLeaseTime is the default value of lockWatchdogTimeout, 30 seconds
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, , threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = (ttlRemaining -> {
//The ttlRemaining returned by the lock is null means that the lock is successful
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = (leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompleteFutureWrapper<>(f);
}
//By default, when the external incoming leaseTime=-1, the default value of lockWatchdogTimeout will be taken = 30 seconds
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), command,
"if (('exists', KEYS[1]) == 0) then " +
"('hincrby', KEYS[1], ARGV[2], 1); " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (('hexists', KEYS[1], ARGV[2]) == 1) then " +
"('hincrby', KEYS[1], ARGV[2], 1); " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return ('pttl', KEYS[1]);",
(getRawName()),//The name of the lock: KEYS[1]
(leaseTime),//Expiration time: ARGV[1], default is 30 seconds
getLockName(threadId)//ARGV[2], the value is UUID + thread ID
);
}
...
}
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
final String id;
final String entryName;
final CommandAsyncExecutor commandExecutor;
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
= commandExecutor;
= ().getId();
= ().getCfg().getLockWatchdogTimeout();
= id + ":" + name;
}
protected String getLockName(long threadId) {
return id + ":" + threadId;
}
...
}
abstract class RedissonExpirable extends RedissonObject implements RExpirable {
...
RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
...
}
public abstract class RedissonObject implements RObject {
protected final CommandAsyncExecutor commandExecutor;
protected String name;
protected final Codec codec;
public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
= codec;
= commandExecutor;
if (name == null) {
throw new NullPointerException("name can't be null");
}
setName(name);
}
...
protected final <V> V get(RFuture<V> future) {
//The following will call the() method
return (future);
}
...
}
public class CommandAsyncService implements CommandAsyncExecutor {
...
@Override
public <V> V get(RFuture<V> future) {
if (().getName().startsWith("redisson-netty")) {
throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
}
try {
return ().get();
} catch (InterruptedException e) {
(true);
().interrupt();
throw new RedisException(e);
} catch (ExecutionException e) {
throw convertException(e);
}
}
...
}
(3) Lua script executed when locking
public class RedissonLock extends RedissonBaseLock {
//By default, when the external incoming leaseTime=-1, the default value of lockWatchdogTimeout will be taken = 30 seconds
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), command,
"if (('exists', KEYS[1]) == 0) then " +
"('hincrby', KEYS[1], ARGV[2], 1); " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (('hexists', KEYS[1], ARGV[2]) == 1) then " +
"('hincrby', KEYS[1], ARGV[2], 1); " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return ('pttl', KEYS[1]);",
(getRawName()),//The name of the lock: KEYS[1], such as "myLock"
(leaseTime),//Expiration time: ARGV[1], default is 30 seconds
getLockName(threadId)//ARGV[2], the value is UUID + thread ID
);
}
...
}
First, execute the exists command of Redis to determine whether the Hash value of the key as the lock name does not exist, that is, to determine whether the Hash value of the key as the lock name myLock exists.
1. If the Hash value whose key is the lock name does not exist, then the following locking process will be performed.
First, use the Redis hset command to set a Hash value with a key as the lock name. The key of the Hash value is the lock name, and the value is a map. That is, there will be a mapping in the value value with field UUID + thread ID and value of 1. For example: hset myLock UUID:ThreadID 1, the ARGV[2] in the lua script is the unique value composed of UUID + thread ID.
Then use the Redis pexpire command to set the expiration time of the Hash value of the key as the lock name, that is, the expiration time of the Hash value of the key as the lock name is 30 seconds. For example: pexpire myLock 30000. So by default, the myLock lock will automatically expire after 30 seconds.
2. If the Hash value of the key is the lock name exists, then the following judgment processing is performed.
First, use the hexists command of Redis to determine whether the mapping of field UUID + thread ID already exists in the Hash value where key is the lock name.
If the mapping of field UUID + thread ID exists in the Hash value where key is the lock name, then the value value of field UUID + thread ID is incremented by 1 through the Redis hincrby command. For example: hincrby myLock UUID:ThreadID 1. That is, in the Hash value with the key myLock, the value of field UUID:ThreadID is accumulated from 1 to 2. When this happens, the current thread often reenters the lock. Next execute: pexpire myLock 30000, and set the validity period of myLock to 30 seconds again.
If the mapping of field UUID + thread ID does not exist in the Hash value where key is the lock name, when this happens, it is often the case that other threads cannot obtain the lock and cause mutual exclusion. Then, through Redis's pttl command, the remaining survival time of the Hash value with the key as the lock name is returned, because the ARGV[2] of different threads is different, ARGV[2] = UUID + thread ID.
(4) Command executor logic for executing locked lua script
In the tryLockInnerAsync() method of RedissonLock, the lua script will be executed through the evalWriteAsync() method of RedissonBaseLock, that is, the lua script will be executed through the evalWriteAsync() method of CommandAsyncService.
In the evalWriteAsync() method of CommandAsyncService, the getNodeSource() method of CommandAsyncService will be executed to obtain the corresponding node. Then execute the evalAsync() method of CommandAsyncService to execute the lua script.
In the getNodeSource() method of CommandAsyncService, CRC16 operation will be performed based on the key, and then modulo 16384 to calculate the slot value of the key. Then create a NodeSource instance based on this slot value and return it.
In the evalAsync() method of CommandAsyncService, the obtained NodeSource instance will be encapsulated into the Redis executor RedisExecutor. Then execute RedisExecutor to implement the script request to the corresponding Redis node for processing.
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
//Passed from outside: When creating a Redisson instance that implements RedissonClient, the initial command executor CommandExecutor is the initialized command executor when creating a Redisson instance that implements RedissonClient.
final CommandAsyncExecutor commandExecutor;
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
= commandExecutor;
= ().getId();
= ().getCfg().getLockWatchdogTimeout();
= id + ":" + name;
}
...
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
//Get the available nodes and continue to encapsulate a command executor CommandBatchService
MasterSlaveEntry entry = ().getEntry(getRawName());
int availableSlaves = ();
CommandBatchService executorService = createCommandBatchService(availableSlaves);
//Execute the lua script through method
RFuture<T> result = (key, codec, evalCommandType, script, keys, params);
if (commandExecutor instance of CommandBatchService) {
return result;
}
//Execute asynchronously and get the result
RFuture<BatchResult<?>> future = ();
CompletionStage<T> f = ((res, ex) -> {
if (ex == null && () != availableSlaves) {
throw new CompletionException(new IllegalStateException("Only " + () + " of " + availableSlaves + " slaves were synched"));
}
return ();
});
return new CompleteFutureWrapper<>(f);
}
private CommandBatchService createCommandBatchService(int availableSlaves) {
if (commandExecutor instance of CommandBatchService) {
return (CommandBatchService) commandExecutor;
}
BatchOptions options = ().syncSlaves(availableSlaves, 1, );
return new CommandBatchService(commandExecutor, options);
}
...
}
public class CommandBatchService extends CommandAsyncService {
...
public CommandBatchService(CommandAsyncExecutor executor, BatchOptions options) {
this((), options, (), );
}
private CommandBatchService(ConnectionManager connectionManager, BatchOptions options, RedissonObjectBuilder objectBuilder, referenceType) {
super(connectionManager, objectBuilder, referenceType);
= options;
}
...
}
public class CommandAsyncService implements CommandAsyncExecutor {
final ConnectionManager connectionManager;
final RedissonObjectBuilder objectBuilder;
final referenceType;
public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, referenceType) {
= connectionManager;
= objectBuilder;
= referenceType;
}
...
@Override
public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
//Get the node corresponding to the key
NodeSource source = getNodeSource(key);
//Let the corresponding node execute the lua script request
return evalAsync(source, false, codec, evalCommandType, script, keys, false, params);
}
//Get the Redis Cluster node corresponding to the key
private NodeSource getNodeSource(String key) {
//First calculate the slot value corresponding to the key
int slot = (key);
//Return to node instance
return new NodeSource(slot);
}
//Execute the lua script
private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, boolean noRetry, Object... params) {
if (isEvalCacheActive() && ().equals("EVAL")) {
CompleteFuture<R> mainPromise = new CompleteFuture<>();
Object[] pps = copy(params);
CompleteFuture<R> promise = new CompleteFuture<>();
String sha1 = calcSHA(script);
RedisCommand cmd = new RedisCommand(evalCommandType, "EVALSHA");
List<Object> args = new ArrayList<Object>(2 + () + );
(sha1);
(());
(keys);
((params));
//The NodeSource instance obtained by performing CRC16 operation according to the key and then modulo 16384 will be encapsulated into the Redis executor RedisExecutor
RedisExecutor<T, R> executor = new RedisExecutor<>(readOnlyMode, nodeSource, codec, cmd, (), promise, false, connectionManager, objectBuilder, referenceType, noRetry);
// By executing the Redis executor Redis Executor, the lua script request is sent to the corresponding Redis node for processing
();
...
}
...
}
...
}
public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public static final int MAX_SLOT = 16384;//Redis Cluster has 16384 slots by default
...
//CRC16 operation on the key, and then take the modulus of 16384
@Override
public int calcSlot(String key) {
if (key == null) {
return 0;
}
int start = ('{');
if (start != -1) {
int end = ('}');
if (end != -1 && start + 1 < end) {
key = (start + 1, end);
}
}
int result = CRC16.crc16(()) % MAX_SLOT;
return result;
}
...
}
(5) How to get the corresponding node according to the slot value
Because the Excut() method of RedisExecutor encapsulates the NodeSource instance will be executed, and the slot value corresponding to the lock name key will be encapsulated in the NodeSource instance, the Excut() method of RedisExecutor can obtain the connection of the corresponding node through the getConnection() method.
The getConnection() method of RedisExecutor will call the connectionWriteOp() method of MasterSlaveConnectionManager. This method will obtain the node according to the slot value by calling the getEntry() method of ConnectionManager, that is, the getEntry() method of Redis will be obtained by the getEntry() method of ClusterConnectionManager to obtain the main node of Redis.
In fact, when initializing the connection manager ClusterConnectionManager, which slots are mapped to the Redis master node according to the configuration.
public class RedisExecutor<V, R> {
NodeSource source;
...
public void execute() {
...
//Asynchronously obtain the established Redis connection
CompletableFuture<RedisConnection> connectionFuture = getConnection().toCompletableFuture();
...
}
protected CompletableFuture<RedisConnection> getConnection() {
...
connectionFuture = (source, command);
return connectionFuture;
}
...
}
public class MasterSlaveConnectionManager implements ConnectionManager {
...
@Override
public CompletableFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry entry = getEntry(source);
...
}
private MasterSlaveEntry getEntry(NodeSource source) {
if (() != null) {
return getEntry(());
}
MasterSlaveEntry entry = ();
if (() != null) {
entry = getEntry(());
}
if (entry == null && () != null) {
//Get Redis's master node based on slot
entry = getEntry(());
}
return entry;
}
...
}
public class ClusterConnectionManager extends MasterSlaveConnectionManager {
//Atomic mapping array of slot and Redis master node
private final AtomicReferenceArray<MasterSlaveEntry> slot2entry = new AtomicReferenceArray<>(MAX_SLOT);
//Map relationship between Redis client connection and Redis master node
private final Map<RedisClient, MasterSlaveEntry> client2entry = new ConcurrentHashMap<>();
...
@Override
public MasterSlaveEntry getEntry(int slot) {
//Get Redis's master node based on slot
return (slot);
}
...
//When initializing the connection manager ClusterConnectionManager
//The slots mapped to the Redis master node have been initialized according to the configuration
public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
...
for (String address : ()) {
...
CompletableFuture<Collection<ClusterPartition>> partitionsFuture = parsePartitions(nodes);
Collection<ClusterPartition> partitions = ();
List<CompletableFuture<Void>> masterFutures = new ArrayList<>();
for (ClusterPartition partition: partitions) {
...
CompleteFuture<Void> masterFuture = addMasterEntry(partition, cfg);
(masterFuture);
}
...
}
...
}
private CompletableFuture<Void> addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) {
...
CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, (), configEndpointHostName);
((connection, ex1) -> {
//Processing when a successful connection is
if (ex1 != null) {
("Can't connect to master: {} with slot ranges: {}", (), ());
(ex1);
return;
}
MasterSlaveServersConfig config = create(cfg);
(().toString());
//Create Redis's master node
MasterSlaveEntry entry;
if (()) {
entry = new SingleEntry(, config);
} else {
Set<String> slaveAddresses = ().stream().map(r -> ()).collect(());
(slaveAddresses);
entry = new MasterSlaveEntry(, config);
}
CompletableFuture<RedisClient> f = (new RedisURI(()), configEndpointHostName);
((masterClient, ex3) -> {
if (ex3 != null) {
("Can't add master: " + () + " for slot ranges: " + (), ex3);
(ex3);
return;
}
//Add slot value to the created Redis master node
for (Integer slot : ()) {
addEntry(slot, entry);
(slot, partition);
}
...
});
});
...
}
//Add the mapping relationship of slot to the corresponding node
private void addEntry(Integer slot, MasterSlaveEntry entry) {
MasterSlaveEntry oldEntry = (slot, entry);
if (oldEntry != entry) {
();
shutdownEntry(oldEntry);
}
((), entry);
}
...
}
4. WatchDog can reenter the lock source code to maintain lock logic
(1) The asynchronous execution of the locked lua script will trigger the execution of the callback in thenApply()
(2) Decide whether to create a scheduled scheduling task based on the return value of the locked lua script execution
(3) The timing scheduling task is implemented by Netty's time wheel mechanism
(4) After 10 seconds, the timing task will be executed and it will be determined whether another timing task will be created and executed after 10 seconds.
If a client fails to release the lock after a few minutes after locking, and the expiration time of the key corresponding to the lock is actually only set for 30 seconds, then in this scenario, the lock cannot be automatically released after 30 seconds of locking.
There is a WatchDog mechanism in RedissonLock:
When the client successfully locks the key myLock, a timing task will be created. This timing task will default to 10 seconds to update the key myLock in 30 seconds. Of course, the premise is that the client has always held the lock myLock and has not released the lock yet. As long as the client holds the myLock lock and does not release it, a timed task will be continuously created and the expiration time of the myLock key will be updated in 10 seconds.
The following details the timing task after the lock is successfully introduced, and how to update the expiration time of the lock every 10 seconds.
(1) The asynchronous execution of the locked lua script will trigger the execution of the callback in thenApply()
After asynchronously executing the locking lua script content in the() method, the callback in the thenApply() method that executes the ttlRemainingFuture will be triggered. The locking lua script will return the Long type ttlRemaining variable, and the callback parameter in the thenApply() method of ttlRemainingFuture is this variable.
Specifically, we will first define a variable named ttlRemainingFuture of RFuture in the tryAcquireAsync() method of RedissonLock. This variable encapsulates the remaining survival time of the key corresponding to the current lock, in milliseconds. This remaining survival time is returned when the locking lua script is executed.
Then add a callback to ttlRemainingFuture through the thenApply() method of RFuture. In this way, when the lua script is executed, the callback in the thenApply() method of ttlRemainingFuture will be triggered, and the parameter ttlRemaining in the callback is the return value of the locked lua script.
public class RedissonLock extends RedissonBaseLock {
protected long internalLockLeaseTime;
protected final LockPubSub pubSub;
final CommandAsyncExecutor commandExecutor;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
= commandExecutor;
//InternalLockLeaseTime related to WatchDog
//The connection manager ConnectionManager can be obtained through the command executor CommandExecutor
//The configuration information of Redis can be obtained through the ConnectionManager ConnectionManager.
//The lockWatchdogTimeout timeout can be obtained through Redis's configuration information class Config
= ().getCfg().getLockWatchdogTimeout();
= ().getSubscribeService().getLockPubSub();
}
// Add lock
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
...
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//Thread ID, used to generate the value of setting Hash
long threadId = ().getId();
//Try to add lock, and execute the () method to pass the default leaseTime=-1
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// ttl is null means that the lock is successfully
if (ttl == null) {
return;
}
//Processing when locking fails
...
}
...
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//By default waitTime and leaseTime are both -1, the get method called below is the get() method from RedissonObject
//It can be understood as asynchronous to synchronous: to convert asynchronous tryAcquireAsync to synchronously by get
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// By default, since leaseTime=-1, internalLockLeaseTime when initializing the RedissonLock instance will be used.
//The default value of internalLockLeaseTime is the default value of lockWatchdogTimeout, 30 seconds
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, , threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = (ttlRemaining -> {
//Asequently after the asynchronous execution of tryLockInnerAsync() adds the content of the lock lua script
//The callback in execution() will be triggered
//The ttlRemaining returned by the lock is null means that the lock is successful
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = (leaseTime);
} else {
//Pass the thread ID that was successfully locked and start a timed task
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompleteFutureWrapper<>(f);
}
//By default, when the external incoming leaseTime=-1, the default value of lockWatchdogTimeout will be taken = 30 seconds
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), command,
"if (('exists', KEYS[1]) == 0) then " +
"('hincrby', KEYS[1], ARGV[2], 1); " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (('hexists', KEYS[1], ARGV[2]) == 1) then " +
"('hincrby', KEYS[1], ARGV[2], 1); " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return ('pttl', KEYS[1]);",
(getRawName()),//The name of the lock: KEYS[1]
(leaseTime),//Expiration time: ARGV[1], default is 30 seconds
getLockName(threadId)//ARGV[2], the value is UUID + thread ID
);
}
...
}
(2) Decide whether to create a scheduled scheduling task based on the return value of the locked lua script execution
When the return value of ttlRemaining of the locked lua script is null, it indicates that the lock is successfully acquired.
If the lock is successfully acquired and the expiration time of the lock is specified, that is, the leaseTime is not the default -1, then a timed task will not be created at this time. If the lock is successfully acquired and the expiration time of the lock is not specified, that is, the leaseTime is the default -1, then a timed scheduling task will be created at this time and it will be created based on Netty's time round.
Therefore, calling scheduleExpirationRenewal() method will create a timed scheduled task. As long as the lock is still held by the client, the timed task will continuously update the expiration time of the corresponding key of the lock.
public class RedissonLock extends RedissonBaseLock {
...
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, , threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = (ttlRemaining -> {
//After asynchronously executing the contents in tryLockInnerAsync(), the logic in () will be executed
//The ttlRemaining returned by the lock is null means that the lock is successful
if (ttlRemaining == null) {
if (leaseTime != -1) {
//If the expiration time of the lock is specified, the timing task will not be started
internalLockLeaseTime = (leaseTime);
} else {
//Pass the thread ID that was successfully locked and start a timed task
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompleteFutureWrapper<>(f);
}
...
}
(3) The timing scheduling task is implemented by Netty's time wheel mechanism
The underlying layer of Redisson's WatchDog mechanism is not a scheduling thread pool, but a Netty time round.
scheduleExpirationRenewal() method will create a timed scheduled task TimerTask handed to the HashedWheelTimer and execute it after 10 seconds.
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
...
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
(threadId);
} else {
(threadId);
try {
//Create a scheduled scheduled task that updates expiration time
renewExpiration();
} finally {
if (().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
//Update expiration time
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//Used Netty's timing task mechanism: HashedWheelTimer + TimerTask + Timeout
//Create a timed scheduling task that updates the expiration time, and the () method will be called below
//Create a timed scheduling task TimerTask and hand it over to the HashedWheelTimer, and execute it after 10 seconds
Timeout task = ().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
...
}
}, internalLockLeaseTime / 3, );
(task);
}
...
}
public class MasterSlaveConnectionManager implements ConnectionManager {
private HashedWheelTimer timer;//Netty's time round
...
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
try {
//Add task to Netty's time round
return (task, delay, unit);
} catch (IllegalStateException e) {
if (isShuttingDown()) {
return DUMMY_TIMEOUT;
}
throw e;
}
}
...
}
//Netty's time round HashedWheelTimer
public class HashedWheelTimer implements Timer {
private final HashedWheelBucket[] wheel;
private final Queue<HashedWheelTimeout> timeouts = ();
private final Executor taskExecutor;
...
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
...
//Start the Worker thread
start();
...
//Encapsulate the timerTask instance to the HashedWheelTimeout instance
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
(timeout);
return timeout;
}
...
private final class Worker implements Runnable {
private long tick;//round
...
@Override
public void run() {
...
do {
final long deadline = waitForNextTick();
if (deadline > 0) {
int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];
//Convert the timed scheduling task HashedWheelTimeout to the time wheel bucket
transferTimeoutsToBuckets();
//Processing the expiration task in the HashedWheelBucket of the time round bucket
(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get() == WORKER_STATE_STARTED);
...
}
private void transferTimeoutsToBuckets() {
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = ();
...
long calculated = /tickDuration;
= (calculated - tick) / ;
final long ticks = (calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);//Type the mask
HashedWheelBucket bucket = wheel[stopIndex];
(timeout);
}
}
...
}
private static final class HashedWheelBucket {
...
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
//process all timeouts
while (timeout != null) {
...
//Expiration processing of timeout of timed scheduling task timeout
();
...
}
}
...
}
private static final class HashedWheelTimeout implements Timeout, Runnable {
private final HashedWheelTimer timer;
...
public void expire() {
...
//Execute the timed scheduling task and let the Executor execute the run method of HashedWheelTimeout
(this);
...
}
@Override
public void run() {
...
//Execute scheduled tasks
(this);
...
}
...
}
...
}
(4) After 10 seconds, the timing task will be executed and it will be determined whether another timing task will be created and executed after 10 seconds.
The TimerTask instance of the timing task will call the RedissonBaseLock's renewExpirationAsync() method to execute the lua script, and the renewExpirationAsync() method will pass in the thread ID that obtained the lock.
In the lua script, the hexists command of Redis will determine whether the mapping of field UUID + thread ID exists in the Hash value where the key is the lock name. Among them, KEYS[1] is the name of the lock such as myLock, and ARGV[2] is UUID + thread ID.
If it exists, it means that the thread that acquired the lock is still holding the lock and has not released the lock. Then, through Redis's pexpire command, set the expiration time of the key to 30 seconds.
After asynchronously executing the lua script, a callback of the Boolean value triggers the() method will be passed. The callback will determine whether to continue to call the renewExpiration() method recursively based on the Boolean value.
That is to say, if the thread that acquires the lock is still holding the lock, the expiration time of resetting the lock is 30 seconds, and the lua script will return 1. Then, in the callback of the() method, continue to call the RedissonBaseLock's renewExpiration() method to recreate the timing scheduling task.
If the thread that acquires the lock has released the lock, the lua script will return 0. Then in the callback of the() method, the cancelExpirationRenewal() method of RedissonBaseLock will be called to perform the cleaning work.
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
...
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
(threadId);
} else {
(threadId);
try {
//Create a scheduled schedule task that updates expiration time
renewExpiration();
} finally {
if (().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
//Update expiration time
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//Used Netty's timing task mechanism: HashedWheelTimer + TimerTask + Timeout
//Create a timed scheduling task that updates the expiration time, and the () method will be called below
//Create a timed scheduling task TimerTask and hand it over to the HashedWheelTimer, and execute it after 10 seconds
Timeout task = ().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ();
if (threadId == null) {
return;
}
//Execute the lua script asynchronously to update the expiration time of the lock
RFuture<Boolean> future = renewExpirationAsync(threadId);
((res, e) -> {
if (e != null) {
("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
//res is the return value of the lua script in renewExpirationAsync()
if (res) {
//Reschedule yourself
renewExpiration();
} else {
//Execute cleaning work
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, );
(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
// where KEYS[1] is the name of the lock such as myLock, and ARGV[2] is UUID+thread ID;
return evalWriteAsync(getRawName(), RedisCommands.EVAL_BOOLEAN,
"if (('hexists', KEYS[1], ARGV[2]) == 1) then " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
}
protected void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
(threadId);
}
if (threadId == null || ()) {
Timeout timeout = ();
if (timeout != null) {
();
}
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
...
}
Note: If the machine holding the lock goes down, the WatchDog on the lock will not be executed. Therefore, the lock key will automatically expire within 30 seconds, releasing the lock. At this time, other clients can get the lock after waiting for up to 30 seconds.
5. Reentrant lock logic of reentrant lock source code
(1) The core one is the naming of the key: the first layer key is the lock name, and the second layer field is UUID + thread ID
(2) The second core is Redis's exists command, hexists command, and hincrby command
public class Application {
public static void main(String[] args) throws Exception {
Config config = new Config();
().addNodeAddress("redis://192.168.1.110:7001");
//Create a RedissonClient instance
RedissonClient redisson = (config);
//Acquiring the reentrant lock
RLock lock = ("myLock");
//The lock is added for the first time
();
//Re-enter and add lock
();
//Re-enter and add lock
();
();
();
();
...
}
}
public class RedissonLock extends RedissonBaseLock {
...
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), command,
"if (('exists', KEYS[1]) == 0) then " +
"('hincrby', KEYS[1], ARGV[2], 1); " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (('hexists', KEYS[1], ARGV[2]) == 1) then " +
"('hincrby', KEYS[1], ARGV[2], 1); " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return ('pttl', KEYS[1]);",
(getRawName()),//The name of the lock: KEYS[1]
(leaseTime),//Expiration time: ARGV[1], default is 30 seconds
getLockName(threadId)//ARGV[2], the value is UUID + thread ID
);
}
...
}
The core of realizing reentrance and locking is the locking lua script.
(1) The core one is the naming of the key: the first layer key is the lock name, and the second layer field is UUID+thread ID
The value of KEYS[1] is the name of the lock, and the value of ARGV[2] is the client UUID + thread ID. Since the second layer field contains thread ID, it can be distinguished whether the thread applying for locking is reentering. Increasing the value of field UUID + thread ID in Hash value with key as lock name is incremented by 1.
(2) The second core is Redis's exists command, hexists command, and hincrby command
To determine whether the lock exists, use the Redis exists command. To determine whether the lock is being reentered by a thread, the hexists command of Redis is used. Increment the value of the field map, using Redis's hincrby command.
Therefore, every time the thread holding the lock reenters the lock, it increments the value of the field map by 1. For example: hincrby key UUID:ThreadID 1.
6. Mutual-exclusive blocking logic of locks that can reenter the lock source code
(1) When the lock is acquired, the pttl command of Redis will be executed to return the remaining survival time of the lock.
(2) The callback method of ttlRemainingFuture found that ttlRemaining is not null
(3) RedissonLock's tryAcquire() method will return the remaining survival time ttl
(4) RedissonLock's lock() method will judge the ttl returned by the tryAcquire() method
(5) RedissonLock's lock() method uses Semaphore to block when ttl is not null
(1) When the lock is acquired, the pttl command of Redis will be executed to return the remaining survival time of the lock.
In the lua script that executes locking: first, use "exists myLock" to determine whether the lock key exists, and find that it already exists. Then judge "hexists myLock Another UUID: Another Thread's ID" and find that it does not exist. So execute "pttl myLock" and return, that is, return the remaining survival time of the lock myLock.
(2) The callback method of ttlRemainingFuture found that ttlRemaining is not null
When the Lua script that executes the locking script finds that the locking is unsuccessful and returns the remaining survival time of the lock, the callback method of ttlRemainingFuture finds that ttlRemaining is not null, so it will not create a timed scheduling task to check whether the lock is still held by the applied thread after 10 seconds, but will encapsulate the returned remaining survival time into RFuture and return upward.
(3) RedissonLock's tryAcquire() method will return the remaining survival time ttl
The main purpose is to obtain the ttl value encapsulated in RFuture through the get() method of RedisObject. Among them, asynchronous to synchronization is to convert the asynchronous tryAcquireAsync() method into synchronization through the get() method.
(4) RedissonLock's lock() method will judge the ttl returned by the tryAcquire() method
If the current thread is locking for the first time, then ttl must be null. If the current thread is locked multiple times (reentered locks), then ttl must also be null. If the current thread lock is not successful and the lock is occupied by other machines or threads, then ttl is the remaining survival time of the key corresponding to the lock obtained when executing the lock lua script.
If ttl is null, it means that the current thread locks successfully, so it will be returned directly. If ttl is not null, it means that the current thread locking is not successful, and blocking logic will be executed.
(5) RedissonLock's lock() method uses Semaphore to block when ttl is not null
If ttl is not null, that is, the locking is not successful, then it will enter a while(true) dead loop. In the dead loop, execute RedissonLock's tryAcquire() method again to try to acquire the lock. If the ttl returned when acquiring the lock again is null, that is, the lock is acquired, and the while loop is exited. If the ttl returned when the lock is acquired again is not null, it means that other clients or threads still hold the lock.
So the synchronization component Semaphore is used to block and wait for a period of time. After blocking and waiting for a period of time, the logic in the while loop will continue to be executed and try to acquire the lock again. Repeat this cycle until the lock is obtained.
public class RedissonLock extends RedissonBaseLock {
protected long internalLockLeaseTime;
protected final LockPubSub pubSub;
final CommandAsyncExecutor commandExecutor;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
= commandExecutor;
//InternalLockLeaseTime related to WatchDog
//The connection manager ConnectionManager can be obtained through the command executor CommandExecutor
//The configuration information of Redis can be obtained through the ConnectionManager ConnectionManager.
//The lockWatchdogTimeout timeout can be obtained through Redis's configuration information class Config
= ().getCfg().getLockWatchdogTimeout();
= ().getSubscribeService().getLockPubSub();
}
...
// Add lock
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//Thread ID, used to generate the value of setting Hash
long threadId = ().getId();
//Try to add lock, and execute the () method to pass the default leaseTime=-1
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// ttl is null means that the lock is successfully
if (ttl == null) {
return;
}
//Processing when locking fails
CompleteFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
(future);
} else {
(future);
}
try {
while (true) {
//Try to get the lock again
ttl = tryAcquire(-1, leaseTime, unit, threadId);
//The returned ttl is null. After obtaining the lock, it exits the while loop
if (ttl == null) {
break;
}
//The returned ttl is not null, which means that other clients or threads still hold locks
//Then use the synchronous component Semaphore to block and wait for a period of ttl
if (ttl >= 0) {
try {
(future).getLatch().tryAcquire(ttl, );
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
(future).getLatch().tryAcquire(ttl, );
}
} else {
if (interruptibly) {
(future).getLatch().acquire();
} else {
(future).getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe((future), threadId);
}
}
...
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//By default waitTime and leaseTime are both -1, the get method called below is the get() method from RedissonObject
//It can be understood as asynchronous to synchronous: to convert asynchronous tryAcquireAsync to synchronously by get
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// By default, since leaseTime=-1, internalLockLeaseTime when initializing the RedissonLock instance will be used.
//The default value of internalLockLeaseTime is the default value of lockWatchdogTimeout, 30 seconds
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, , threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = (ttlRemaining -> {
//The ttlRemaining returned by the lock is null means that the lock is successful
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = (leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompleteFutureWrapper<>(f);
}
//By default, when the external incoming leaseTime=-1, the default value of lockWatchdogTimeout will be taken = 30 seconds
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), command,
"if (('exists', KEYS[1]) == 0) then " +
"('hincrby', KEYS[1], ARGV[2], 1); " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (('hexists', KEYS[1], ARGV[2]) == 1) then " +
"('hincrby', KEYS[1], ARGV[2], 1); " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return ('pttl', KEYS[1]);",
(getRawName()),//The name of the lock: KEYS[1]
(leaseTime),//Expiration time: ARGV[1], default is 30 seconds
getLockName(threadId)//ARGV[2], the value is UUID + thread ID
);
}
...
}
public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {
private final Semaphore latch;
...
public Semaphore getLatch() {
return latch;
}
...
}
7. Release lock logic of reentering lock source code
(1) Automatically release the lock when the shutdown
(2) The process of thread actively releasing locks
(3) Analysis of lua script for actively releasing lock
(1) Automatically release the lock when the shutdown
If the machine where the lock is acquired is down, then after 10 seconds, check whether the lock is still in the scheduled task held by the thread will be gone. So the key in Redis will be deleted within up to 30 seconds. After that, other clients can successfully acquire the lock.
Of course, the premise is that the lock's survival time is not set when creating it. If leaseTime is specified, there will be no scheduled task to check the lock after 10 seconds. At this time, the machine where the lock is acquired is down, and the key corresponding to the lock will expire after the maximum leaseTime.
(2) The process of thread actively releasing locks
The unlock() method of RedissonLock is called. In the unlock() method of RedissonLock, the get(unlockAsync()) method will be called. That is, first call the unlockAsync() method of RedissonBaseLock, and then call the get() method of RedissonObject.
Among them, the unlockAsync() method is an asynchronous execution method, and the operation of releasing the lock is executed asynchronously. The get() method of RedisObject will wait to obtain the result of asynchronous execution through RFuture synchronization. Get(unlockAsync()) can be understood as asynchronous to synchronization.
In the unlockAsync() method of RedissonBaseLock: First, the unlockInnerAsync() method of RedissonLock will be called to asynchronously release the lock, and then the timing scheduling task will be cancelled asynchronously after completing the process of releasing the lock.
public class Application {
public static void main(String[] args) throws Exception {
Config config = new Config();
().addNodeAddress("redis://192.168.1.110:7001");
//Create a RedissonClient instance
RedissonClient redisson = (config);
//Acquiring the reentrant lock
RLock lock = ("myLock");
();//Acquiring the lock
();//Release the lock
...
}
}
public class RedissonLock extends RedissonBaseLock {
...
@Override
public void unlock() {
...
//Asynchronous to synchronous
//The first call is the unlockAsync() method of RedissonBaseLock
//Then the call is the get() method of RedissonObject
get(unlockAsync(().getId()));
...
}
//Asynchronously execute the lua script that releases the lock
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(
getRawName(),
, RedisCommands.EVAL_BOOLEAN,
"if (('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = ('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"('del', KEYS[1]); " +
"('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
(getRawName(), getChannelName()),//KEYS[1] + KEYS[2]
LockPubSub.UNLOCK_MESSAGE,//ARGV[1]
internalLockLeaseTime,//ARGV[2]
getLockName(threadId)//ARGV[3]
);
}
...
}
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
...
@Override
public RFuture<Void> unlockAsync(long threadId) {
//Asynchronously execute the lua script that releases the lock
RFuture<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = ((opStatus, e) -> {
//Cancel the scheduled task
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompleteFutureWrapper<>(f);
}
...
}
abstract class RedissonExpirable extends RedissonObject implements RExpirable {
...
RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
...
}
public abstract class RedissonObject implements RObject {
protected final CommandAsyncExecutor commandExecutor;
protected String name;
protected final Codec codec;
public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
= codec;
= commandExecutor;
if (name == null) {
throw new NullPointerException("name can't be null");
}
setName(name);
}
...
protected final <V> V get(RFuture<V> future) {
//The following will call the() method
return (future);
}
...
}
public class CommandAsyncService implements CommandAsyncExecutor {
...
@Override
public <V> V get(RFuture<V> future) {
if (().getName().startsWith("redisson-netty")) {
throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
}
try {
return ().get();
} catch (InterruptedException e) {
(true);
().interrupt();
throw new RedisException(e);
} catch (ExecutionException e) {
throw convertException(e);
}
}
...
}
(3) Analysis of lua script for actively releasing lock
First, determine whether the mapping of field UUID + thread ID exists in the Hash value where key is the lock name. If it does not exist, it means that the lock has been released and will be returned directly. If present, the value value of field UUID + thread ID is decremented by 1 in the Hash value where key is lock name. That is, call Redis's hincrby command and decrement by 1.
Then, the result after decreasing by 1 is judged as follows: If the result after decreasing by 1 is greater than 0, it means that the thread is still holding the lock. This corresponds to the thread holding the lock reentering the lock multiple times, and the expiration time required to reset the lock is 30 seconds. If the result after decrementing by 1 is less than 0, it means that the thread no longer holds the lock, so the key corresponding to the lock is deleted, and an event is published through the Redis publish command.
public class RedissonLock extends RedissonBaseLock {
...
//Asynchronously execute the lua script that releases the lock
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(
getRawName(),
, RedisCommands.EVAL_BOOLEAN,
"if (('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = ('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"('del', KEYS[1]); " +
"('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
(getRawName(), getChannelName()),//KEYS[1] + KEYS[2]
LockPubSub.UNLOCK_MESSAGE,//ARGV[1]
internalLockLeaseTime,//ARGV[2]
getLockName(threadId)//ARGV[3]
);
}
...
}
8. The reenter lock source code to obtain lock timeout and lock timeout automatically release logic
(1) Try to acquire the lock timeout
(2) Lock timeout automatically releases
For the following code method, if the lock cannot be acquired for more than 60 seconds, the lock will be automatically abandoned and there will be no permanent blockage. If the lock is not released actively within 10 seconds after obtaining the lock, the lock will be automatically released.
public class Application {
public static void main(String[] args) throws Exception {
Config config = new Config();
().addNodeAddress("redis://192.168.1.110:7001");
//Create a RedissonClient instance
RedissonClient redisson = (config);
//Acquiring the reentrant lock
RLock lock = ("myLock");
boolean res = (60, 10, );//Try to acquire the lock
If (res) ();// If you try to acquire the lock successfully, release the lock
...
}
}
(1) Try to acquire the lock timeout
RedissonLock's tryLock() method will pass in waitTime and leaseTime to try to acquire the lock. Among them, waitTime is the maximum time to wait for a lock, such as 60 seconds. leaseTime is the automatic expiration time of the lock after obtaining it, such as 10 seconds.
If the time to acquire the lock for the first time exceeds the maximum time waiting for the lock to be acquired, false will be returned.
If the time to acquire the lock for the first time has not exceeded the maximum time waiting for the lock to be acquired, then enter the while loop and try to acquire the lock again.
If the attempt to acquire the lock again succeeds, then true is returned. If the attempt to acquire the lock fails again, then calculate how much time is left to continue waiting to acquire the lock. That is, use time to self-decrease the time consumed for each attempt to acquire the lock and self-decrease the time consumed for each waiting.
If the time is found to be less than 0, it means that the lock cannot be acquired after waiting for waitTime, then false is returned. If you find that time is greater than 0, continue with the next while loop and try to acquire the lock + time self-decrease time.
public class RedissonLock extends RedissonBaseLock {
...
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = (waitTime);//How much time can you wait for to acquire the lock
long current = ();//current is the time stamp before the first attempt to obtain the lock
long threadId = ().getId();
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
//Try to obtain the lock successfully
if (ttl == null) {
return true;
}
//Failed to get the lock for the first time
//The current time minus the timestamp before the first time acquiring the lock is the time spent trying to acquire the lock, and use time to self-decrease
time -= () - current;
//If the first time to acquire the lock exceeds the maximum time waiting for the lock to be acquired, then the lock acquisition failure will be returned directly
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
//If the first time to acquire the lock has not reached the maximum time waiting for the lock to be acquired?
current = ();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
().get(time, );
} catch (ExecutionException | TimeoutException e) {
if (!(false)) {
((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= () - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
long currentTime = ();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
//Try to get the lock again successfully
if (ttl == null) {
return true;
}
//The remaining available waiting time is used to reduce the time spent on each attempt to acquire the lock
time -= () - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
//The returned ttl is not null, which means that other clients or threads still hold locks
//Then use the synchronous component Semaphore to block and wait for a period of ttl
currentTime = ();
if (ttl >= 0 && ttl < time) {
(subscribeFuture).getLatch().tryAcquire(ttl, );
} else {
(subscribeFuture).getLatch().tryAcquire(time, );
}
//Use time to reduce the time spent each waiting
time -= () - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe((subscribeFuture), threadId);
}
}
...
}
(2) Lock timeout automatically releases
When using the method to try to acquire the lock, the incoming leaseTime is not -1, but a specified latch survival time, and the lock timeout will be automatically released.
When leaseTime is not -1:
1. The expiration time of the lock is not lockWatchdogTimeout=30 seconds, but leaseTime
2. After the locking lua script is successfully executed, a timed scheduled task will not be created and checked for the lock after 10 seconds.
Summary: When the leaseTime is specified, the lock must be deleted after the leaseTime at most. Because there is no scheduled task at this time to check that the lock is still held by the thread and update the expiration time. Therefore, the lock is either automatically deleted or expires automatically after the leaseTime survives.
public class RedissonLock extends RedissonBaseLock {
...
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//By default waitTime and leaseTime are both -1, the get method called below is the get() method from RedissonObject
//It can be understood as asynchronous to synchronous: to convert asynchronous tryAcquireAsync to synchronously by get
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// By default, since leaseTime=-1, internalLockLeaseTime when initializing the RedissonLock instance will be used.
//The default value of internalLockLeaseTime is the default value of lockWatchdogTimeout, 30 seconds
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, , threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = (ttlRemaining -> {
//The ttlRemaining returned by the lock is null means that the lock is successful
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = (leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompleteFutureWrapper<>(f);
}
...
}
9. Summary of the source code of reentrant lock
(1) Add lock
(2) WatchDog maintains locking
(3) Reentrant lock
(4) Lock mutually exclusive
(5) Manually release the lock
(6) Automatically release lock when the shutdown
(7) Try to lock timeout
(8) The timeout lock is automatically released
(1) Add lock
Set two layers of Hash data structures in Redis, the default expiration time is 30 seconds. The key of the first layer of Hash value is the lock name, and the key of the second layer of Hash value is UUID + thread ID. Redis's exists command, hexists command, and hincrby command.
(2) WatchDog maintains locking
If the thread that acquires the lock always holds the lock, the key in Redis will remain alive. When a lock is successfully acquired, a timed task will be created for 10 seconds before checking whether the lock is still held by the thread. If the thread is still holding the lock, the expiration time of the key will be reset to 30 seconds, and a new timing task will be created and then 10 seconds to continue to check whether the lock is still held by the thread.
(3) Reentrant lock
The same thread can be locked multiple times, and the key of the second layer is UUID + the Hash value of thread ID, and each time the lock is acquired, it is accumulated by 1.
(4) Lock mutually exclusive
When different threads try to add locks, the locking will not be successful due to the different keys of the second layer of Hash. That is, when executing the lock lua script, the remaining expiration time of the lock will be returned ttl. Then use the synchronous component Semaphore to block and wait for a period of ttl time. After blocking and waiting for a while, continue to try to acquire the lock again in the while loop. With this loop, wait + try until the lock is obtained.
(5) Manually release the lock
Use the hincrby command to decrement the Hash value of the second layer key (UUID + thread ID) by 1. Decreasing by 1 and more than 0 means that the lock is reentered and the expiration time of the lock needs to be reset. Decreasing by 1 or less means that the lock has been released, and the lock key needs to be deleted and the timing scheduling task needs to be cancelled.
(6) Automatically release lock when the shutdown
If the client holding the lock goes down, the lock's WatchDog timing scheduling task is gone. At this time, the expiration time of the lock key will not be reset, so the lock key will be automatically released.
(7) Try to lock timeout
Continuously trying to acquire the lock in a while loop. Use time to indicate the time when the lock can also be acquired. Each loop is time: Self-decreasing the time taken to acquire the lock + Self-decreasing the time taken to each wait. If the lock is not successfully added within the specified time (that is, the time is less than 0), the loop is exited, indicating that the locking has failed.
(8) The timeout lock is automatically released
When a leaseTime is specified, if the acquire lock does not manually release the lock in the leaseTime, the lock key in Redis will automatically expire and the lock will be automatically released.