Outline
1. Use volatile in the flag position modification scenario first (service is elegantly shut down)
2. Atomic class (heartbeat counter) is preferred in scenarios that increase numerical value
3. Shared variables are only preferred for scenes visible to the current thread. ThreadLocal (edits log processing)
4. Scenarios where more reads and writes need to be locked are preferred (concurrent read and write of the service registry)
5. Minimize the lock time of thread occupancy (edits log segment locking)
6. Minimize the granularity of thread locking data (inventory segmentation deduction)
7. Try to separate the lock according to different functions (avoid locking in the loop)
8. Minimize competition between high concurrent threads on locks (multi-level caching)
9. Summary of deadlocks for lock failures
10. Lock-free programming
1. Use volatile in the flag position modification scenario first (service is elegantly shut down)
(1) Visibility scenarios such as flag position modification are preferred to use volatile
(2) Case of elegant service shutdown through volatile flags
(1) Visibility scenarios such as flag position modification are preferred to use volatile
Distributed systems and middleware systems both require a large number of threads to be turned on for processing. When accessing shared variables through multiple threads, you must first clarify whether to write concurrently or read concurrently.
If only one thread will write a shared variable, such as flag bit. In addition, multiple threads will read the value of this flag bit, so volatile is preferred at this time.
Therefore, the principle of lock optimization is: try not to lock if you can. Because it may lead to lock contention and lock conflict, resulting in a significant reduction in system throughput and performance.
(2) Case of elegant service shutdown through volatile flags
For example, the following shutdown() method uses volatile to complete lock optimization. A isRunning flag will be used during service elegant downtime, which will be shared by the heartbeat thread and the thread that pulls the registry in batches. However, the isRunning flag will be modified only when a certain thread executes the shutdown() method. Other cases are the heartbeat thread and the thread that pulls the registry in batches read the isRunning flag bit, so volatile is used.
// When the service starts, it is responsible for communicating with register-server
public class RegisterClient {
public static final String SERVICE_NAME = "inventory-service";
public static final String IP = "192.168.31.207";
public static final String HOSTNAME = "inventory01";
public static final int PORT = 9000;
private static final Long HEARTBEAT_INTERVAL = 30 * 1000L;
//Service instance ID
private String serviceInstanceId;
//HTTP communication component
private HttpSender httpSender;
//Heartbeak thread
private HeartbeatWorker heartbeatWorker;
//Is the service instance running
private volatile Boolean isRunning;
//The registry of client cache
private CachedServiceRegistry registry;
public RegisterClient() {
= ().toString().replace("-", "");
= new HttpSender();
= new HeartbeatWorker();
= true;
= new CachedServiceRegistry(this, httpSender);
}
//Start RegisterClient component
public void start() {
try {
//Open a thread to complete the registration
//If the registration is completed, it will enter the while loop and send a heartbeat request every 30 seconds
RegisterWorker registerWorker = new RegisterWorker();
();
();
//Start the heartbeat thread and send heartbeat information regularly
();
//Initialize the service registry component of the client cache
();
} catch (Exception e) {
();
}
}
//Stop RegisterClient component
public void shutdown() {
= false;
();//Interrupt the heartbeat thread through interrupt
();//Operate the registry thread through interrupt
(SERVICE_NAME, serviceInstanceId);//Notify the server to go offline
}
//Service registration thread
private class RegisterWorker extends Thread {
@Override
public void run() {
RegisterRequest registerRequest = new RegisterRequest();
(SERVICE_NAME);
(IP);
(HOSTNAME);
(PORT);
(serviceInstanceId);
RegisterResponse registerResponse = (registerRequest);
("The result of service registration is: " + () + "...");
}
}
//Heartbeak thread
private class HeartbeatWorker extends Thread {
@Override
public void run() {
//If the registration is successful, enter the while loop
HeartbeatRequest heartbeatRequest = new HeartbeatRequest();
(SERVICE_NAME);
(serviceInstanceId);
HeartbeatResponse heartbeatResponse = null;
while (isRunning) {
try {
heartbeatResponse = (heartbeatRequest);
("The result of heartbeat is: " + () + "...");
(HEARTBEAT_INTERVAL);
} catch (Exception e) {
();
}
}
}
}
//Return whether the RegisterClient is running, call the background thread that batch pulls the registry backend call
public Boolean isRunning() {
return isRunning;
}
}
2. Atomic class (heartbeat counter) is preferred in scenarios that increase numerical value
(1) Atomic atomic class is preferred for numerical incremental scenarios
(2) Case of using Atomic atomic class for service heartbeat counter
(1) Atomic atomic class is preferred for numerical incremental scenarios
When multi-threaded concurrent access to shared variables, first of all, you only need to ensure visibility. If so, use volatile optimization. Then determine whether atomicity needs to be guaranteed. If so, determine whether it is a simple numerical accumulation or a change operation. If you only need to simply accumulate numerical values, it is recommended to use Atomic atomic class, otherwise use heavyweight locks such as synchronized or ReentrantLock. Atomic atomic class implements lock-free programming based on the CAS mechanism, and its concurrency is better than synchronized.
(2) Case of using Atomic atomic class for service heartbeat counter
There is a heartbeat measurement counter in the service protection mechanism:
//Heartbeat Measurement Counter
public class HeartbeatCounter {
//Single instance
private static HeartbeatCounter instance = new HeartbeatCounter();
//The number of heartbeats in the last minute
private AtomicLong latestMinuteHeartbeatRate = new AtomicLong(0L);
//The time stamp of the last minute
private long latestMinuteTimestamp = ();
private HeartbeatCounter() {
Daemon daemon = new Daemon();
(true);
();
}
//Get singleton instance
public static HeartbeatCounter getInstance() {
return instance;
}
//Increase the number of heartbeats in the last minute once
//Once locked, it will cause a large number of threads to update heartbeats concurrently, resulting in locked serialization
//In turn, it leads to a reduction in the performance of multi-threaded concurrency
public /**synchronized*/ void increment() {
//If there are many service instances, such as 10,000 service instances, then there may be many requests every second to update the heartbeat
//If synchronized is added here, it will affect the performance of concurrency
//But after being replaced with AtomicLong atomic class, no locking is added, no locking is used, CAS operation is used to ensure atomicity, and can also be concurrent with multiple threads
();
}
//Get the number of heartbeats in the last minute
public /**synchronized*/ long get() {
return ();
}
private class Daemon extends Thread {
@Override
public void run() {
while(true) {
try {
long currentTime = ();
if (currentTime - latestMinuteTimestamp > 60 * 1000) {
while(true) {
Long expectedValue = ();
if ((expectedValue, 0L)) {
break;
}
}
latestMinuteTimestamp = ();
}
(1000);
} catch (Exception e) {
();
}
}
}
}
}
3. Shared variables are only preferred for scenes visible to the current thread. ThreadLocal (edits log processing)
(1) Shared variables are only preferred for scenes visible to the current thread. ThreadLocal is only preferred for scenes visible to the current thread.
(2) Use ThreadLocal when processing edits logs by distributed storage systems
(1) Shared variables are only preferred for scenes visible to the current thread. ThreadLocal is only preferred for scenes visible to the current thread.
In a multithreaded environment, when multiple threads access a shared variable at the same time, ThreadLocal can be used if the operation on the shared variable is only visible to the current thread.
ThreadLocal will provide each thread with independent storage space to store copies of shared variables. Each thread will only operate on copies of shared variables, and this operation is not visible to other threads.
volatile, Atomic, and ThreadLocal are the three major tools for lock optimization and should be given priority. If you can't use them, consider heavyweight locks such as synchronized and ReentrantLock.
(2) Use ThreadLocal when processing edits logs by distributed storage systems
//The core components responsible for managing edits logs
public class FSEditlog {
...
//Each thread's own local txid copy
private ThreadLocal<Long> localTxid = new ThreadLocal<Long>();
//Record edits log
public void logEdit(String content) {
//The lock must be directly added here. When there is a thread that executes the logSync() method, no other thread can come in here.
synchronized(this) {
//Get the globally unique incremental txid, representing the sequence number of edits log
txidSeq++;
long txid = txidSeq;
(txid);
//Construct an edits log object
EditLog log = new EditLog(txid, content);
//Write edits log into memory buffer, not directly flushing into disk files
(log);
}
logSync();
}
...
}
4. Scenarios where more reads and writes need to be locked are preferred (concurrent read and write of the service registry)
(1) Read and write locks are preferred for scenarios where there is more read and write, and read locks are required.
(2) Case of using read and write locks in concurrent read and write scenarios of service registry
(1) Read and write locks are preferred for scenarios where there is more read and write, and read locks are required.
If volatile, Atomic, and ThreadLocal are not applicable, then multi-thread concurrent access to a piece of shared data needs to be locked. However, the use of read and write locks will be preferred at this time rather than synchronized heavyweight locks.
For example, in the scenario where more reads and less writes, reads and writes can be separated. Read lock -> A large number of threads can read concurrently,
Write lock -> When there are threads to write data, other threads cannot write at the same time or read at the same time.
(2) Case of using read and write locks in concurrent read and write scenarios of service registry
The Service Registration Center processes the service registry in the following two ways:
1. The incremental registry will be pulled every 30 seconds
2. Heartbeat requests will be processed every 30 seconds
Pulling the incremental registry is a read operation, and updating the registry when a heartbeat is abnormal is a write operation. It is obvious that registry data will not be updated in most cases, so this is a scenario where more reads, less writes.
If reading the registry and updating the registry uses synchronized heavyweight locks, a large number of registry operations will be serialized, thereby reducing the concurrent performance of the service registry.
Therefore, the read and write operations of the service registry are separated using read and write locks. Use a read lock when reading the service registry, and use a write lock when updating the service registry, so that all read operations can be executed concurrently.
//Service Registration
public class ServiceRegistry {
public static final Long RECENTLY_CHANGED_ITEM_CHECK_INTERVAL = 3000L;
public static final Long RECENTLY_CHANGED_ITEM_EXPIRED = 3 * 60 * 1000L;
//The registry is a singleton
private static ServiceRegistry instance = new ServiceRegistry();
//Core memory data structure:Registration form,Map:keyIt's the service name,valueAll service instances of this service
private Map<String, Map<String, ServiceInstance>> registry = new HashMap<String, Map<String, ServiceInstance>>();
//Queue of recently changed service instances
private LinkedList<RecentlyChangedServiceInstance> recentlyChangedQueue = new LinkedList<RecentlyChangedServiceInstance>();
//Service Registration的锁
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private ReadLock readLock = ();
private WriteLock writeLock = ();
//Constructor
private ServiceRegistry() {
//Start the background thread to monitor the recently changed queue
RecentlyChangedQueueMonitor recentlyChangedQueueMonitor = new RecentlyChangedQueueMonitor();
(true);
();
}
//获取Service Registration的单例实例
public static ServiceRegistry getInstance() {
return instance;
}
//Add reading lock
public void readLock() {
();
}
//Release the read lock
public void readUnlock() {
();
}
//Write lock
public void writeLock() {
();
}
//Release the write lock
public void writeUnlock() {
();
}
//Service registration
public void register(ServiceInstance serviceInstance) {
try {
//Write lock
();
("Service registration......【" + serviceInstance + "】");
//Put service instances into the queue of the most recently changed
RecentlyChangedServiceInstance recentlyChangedItem = new RecentlyChangedServiceInstance(serviceInstance, (), );
(recentlyChangedItem);
("Recent change queue:" + recentlyChangedQueue);
//将Service examples放入Registration form中
Map<String, ServiceInstance> serviceInstanceMap = (());
if (serviceInstanceMap == null) {
serviceInstanceMap = new HashMap<String, ServiceInstance>();
((), serviceInstanceMap);
}
((), serviceInstance);
("Registration form:" + registry);
} finally {
();
}
}
//从Registration formdelete一个Service examples
public void remove(String serviceName, String serviceInstanceId) {
try {
//Write lock
();
("Service removal【" + serviceName + ", " + serviceInstanceId + "】");
//Get a service instance
Map<String, ServiceInstance> serviceInstanceMap = (serviceName);
ServiceInstance serviceInstance = (serviceInstanceId);
//Put service instance change information into the queue
RecentlyChangedServiceInstance recentlyChangedItem = new RecentlyChangedServiceInstance(serviceInstance, (), );
(recentlyChangedItem);
("Recent change queue:" + recentlyChangedQueue);
//从Service RegistrationdeleteService examples
(serviceInstanceId);
("Registration form:" + registry);
} finally {
();
}
}
//Get a service instance
public ServiceInstance getServiceInstance(String serviceName, String serviceInstanceId) {
try {
();
Map<String, ServiceInstance> serviceInstanceMap = (serviceName);
return (serviceInstanceId);
} finally {
();
}
}
//获取整个Registration form
public Map<String, Map<String, ServiceInstance>> getRegistry() {
return registry;
}
//获取最近有变化的Registration form
public DeltaRegistry getDeltaRegistry() {
Long totalCount = 0L;
for (Map<String, ServiceInstance> serviceInstanceMap : ()) {
totalCount += ();
}
DeltaRegistry deltaRegistry = new DeltaRegistry(recentlyChangedQueue, totalCount);
return deltaRegistry;
}
//Recent changes in service instances
class RecentlyChangedServiceInstance {
//Service examples
ServiceInstance serviceInstance;
//Time stamp of change
Long changedTimestamp;
//Change operation
String serviceInstanceOperation;
public RecentlyChangedServiceInstance(ServiceInstance serviceInstance, Long changedTimestamp, String serviceInstanceOperation) {
= serviceInstance;
= changedTimestamp;
= serviceInstanceOperation;
}
@Override
public String toString() {
return "RecentlyChangedServiceInstance [serviceInstance=" + serviceInstance + ", changedTimestamp=" + changedTimestamp + ", serviceInstanceOperation=" + serviceInstanceOperation + "]";
}
}
//Service examples操作
class ServiceInstanceOperation {
public static final String REGISTER = "register";//register
public static final String REMOVE = "REMOVE";//delete
}
//Recent change queue的监控线程
class RecentlyChangedQueueMonitor extends Thread {
@Override
public void run() {
while(true) {
try {
try {
();
RecentlyChangedServiceInstance recentlyChangedItem = null;
Long currentTimestamp = ();
while ((recentlyChangedItem = ()) != null) {
//判断如果一个Service examples变更信息已经在队列里存在超过3It's been a minute,Remove from the queue
if (currentTimestamp - > RECENTLY_CHANGED_ITEM_EXPIRED) {
();
}
}
} finally {
();
}
(RECENTLY_CHANGED_ITEM_CHECK_INTERVAL);
} catch (Exception e) {
();
}
}
}
}
}
//Microservice survival status monitoring component
public class ServiceAliveMonitor {
//检查Service examples是否存活的间隔
private static final Long CHECK_ALIVE_INTERVAL = 60 * 1000L;
//Background thread responsible for monitoring the survival status of microservices
private Daemon daemon;
public ServiceAliveMonitor() {
= new Daemon();
(true);
("ServiceAliveMonitor");
}
//Start the background thread
public void start() {
();
}
//Background thread responsible for monitoring the survival status of microservices
private class Daemon extends Thread {
private ServiceRegistry registry = ();
@Override
public void run() {
Map<String, Map<String, ServiceInstance>> registryMap = null;
while (true) {
try {
//You can tell whether you want to enable the self-protection mechanism
SelfProtectionPolicy selfProtectionPolicy = ();
if (()) {
(CHECK_ALIVE_INTERVAL);
continue;
}
//定义要delete的Service examples的集合
List<ServiceInstance> removingServiceInstances = new ArrayList<ServiceInstance>();
//开始读Service Registration的数据,In this process,Other threads can be read but not written
try {
//对整个Service Registration,Add reading lock
();
registryMap = ();
for (String serviceName : ()) {
Map<String, ServiceInstance> serviceInstanceMap = (serviceName);
for (ServiceInstance serviceInstance : ()) {
//说明Service examples距离上一次Send a heartbeat已经超过90In seconds,Think this service is dead,从Registration form中摘除这个Service examples
if (!()) {
(serviceInstance);
}
}
}
} finally {
();
}
//将所有的要delete的Service examples,从Service Registrationdelete
for (ServiceInstance serviceInstance : removingServiceInstances) {
((), ());
//Update the threshold for self-protection mechanism
synchronized() {
(() - 2);
((long)(() * 0.85));
}
}
(CHECK_ALIVE_INTERVAL);
} catch (Exception e) {
();
}
}
}
}
}
//Responsible for receivingregister-clientThe processor of the sent request
public class RegisterServerController {
private ServiceRegistry registry = ();
//Service registration
public RegisterResponse register(RegisterRequest registerRequest) {
RegisterResponse registerResponse = new RegisterResponse();
try {
//在Registration form中加入这个Service examples
ServiceInstance serviceInstance = new ServiceInstance();
(());
(());
(());
(());
(());
(serviceInstance);
//Update the threshold for self-protection mechanism
synchronized() {
SelfProtectionPolicy selfProtectionPolicy = ();
(() + 2);
((long)(() * 0.85));
}
();
} catch (Exception e) {
();
();
}
return registerResponse;
}
//Service offline
public void cancel(String serviceName, String serviceInstanceId) {
//从Service registration中摘除实例
(serviceName, serviceInstanceId);
//Update the threshold for self-protection mechanism
synchronized() {
SelfProtectionPolicy selfProtectionPolicy = ();
(() - 2);
((long)(() * 0.85));
}
}
//Send a heartbeat
public HeartbeatResponse heartbeat(HeartbeatRequest heartbeatRequest) {
HeartbeatResponse heartbeatResponse = new HeartbeatResponse();
try {
ServiceInstance serviceInstance = ((), ());
if (serviceInstance != null) {
();
}
//Record the number of heartbeats per minute
HeartbeatCounter heartbeatMeasuredRate = ();
();
();
} catch (Exception e) {
();
();
}
return heartbeatResponse;
}
//拉取全量Registration form
public Applications fetchFullRegistry() {
try {
();
return new Applications(());
} finally {
();
}
}
//拉取增量Registration form
public DeltaRegistry fetchDeltaRegistry() {
try {
();
return ();
} finally {
();
}
}
}
5. Minimize the lock time of thread occupancy (edits log segment locking)
(1) Minimize the time spent on locks by threads as much as possible
(2) Case of edits log reducing locking time by adding segmented locks
(1) Minimize the time spent on locks by threads as much as possible
Whether it is a synchronized lock or a read-write lock, it is necessary to ensure that the locking time is relatively short. Time-consuming disk file reading and writing, network IO reading and writing, etc. cannot be performed for a long time after the locking is added. Generally, locks are added when operating data in memory, and not when operating data in database. Otherwise, it may lead to long-term lock occupancy, resulting in a significant decrease in the performance and throughput of thread concurrency.
(2) Case of edits log reducing locking time by adding segmented locks
The edits log segmented locking mechanism in distributed storage systems is to minimize the locking time. The locked part is simple variable judgment and the execution time is very fast.
//Responsible for managementedits logThe core components of logs
public class FSEditlog {
//The current increment totxidSerial number
private long txidSeq = 0L;
//Memory double buffer
private DoubleBuffer editLogBuffer = new DoubleBuffer();
//Is the memory buffering to disk currently?
private volatile Boolean isSyncRunning = false;
//Is there any thread currently waiting for the next batch to refreshedits logGo to disk
private volatile Boolean isWaitSync = false;
//The largest one in sync to disktxid
private volatile Long syncMaxTxid = 0L;
//Each thread is localtxidCopy
private ThreadLocal<Long> localTxid = new ThreadLocal<Long>();
//Recordedits loglog
public void logEdit(String content) {
//Add lock here directly,Threaded executionlogSync()No other thread can come in here when the method is
synchronized(this) {
//Get globally unique incrementaltxid,Representsedits logSerial number
txidSeq++;
long txid = txidSeq;
(txid);
//Construct aedits logObject
EditLog log = new EditLog(txid, content);
//Willedits logWrite to memory buffer,Not directly flashing into disk files
(log);
}
//Try to allow an executionlogEdit()The thread of the method,一次性Will内存缓冲中of数据刷入到磁盘文件中
logSync();
}
//Will内存缓冲中of数据Flash to disk文件中
//在这里尝试允许某一个线程一次性Will内存缓冲中of数据刷入到磁盘文件中
//相当于批量Will内存缓冲of数据Flash to disk
private void logSync() {
//Try adding lock again,Only one thread can come in,This process is very fast,Nanosecond level,This is the first section of locking
synchronized(this) {
//If there is currently a thread that is flashing memory and buffering it to disk
if (isSyncRunning) {
//If a thread is puttingtxid = 6,7,8,9,10,11,12ofedits logfromsyncBufferFlash to disk
//at this timesyncMaxTxid = 12,代表of是正在Flash to diskof最大txid
//那么刷盘of线程Release the lock进行刷盘后,At this time来一个线程对应oftxid = 10,at this time它可以直接返回
//因为它对应ofedits log被刷盘of线程正在刷入或者已经Flash to disk了,At this timetxid = 12of线程就不需要等待
long txid = ();
if (txid <= syncMaxTxid) {
return;
}
//at this time如果来of是一个txid = 13of线程,Then you will find that there are threads waiting to flush the next batch of data to disk,at this time会直接返回
if (isWaitSync) {
return;
}
//at this time如果来of是一个txid = 14of线程,And the disk has not been finished,
//那么就在这里等待或者成为下一个刷盘of线程,There is only one thread waiting
isWaitSync = true;
while (isSyncRunning) {
try {
wait(2000);//Release the lock and wait for yourself2Wait for someone to wake up in seconds
} catch (Exception e) {
();
}
}
isWaitSync = false;
}
//Swap two buffers
();
//然后保存当前要同步到磁盘中of最大txid,at this timeeditLogBuffer中ofsyncBufferThere may be multiple data after the exchange
//而且里面ofedits logoftxid一定是from小到大of,at this time要同步oftxid = 6,7,8,9,10,11,12,sosyncMaxTxid = 12
syncMaxTxid = ();
//设置当前正在同步到磁盘of标志位
isSyncRunning = true;
}
//Release the lock,开始同步内存缓冲of数据到磁盘文件里去
//This process is actually slow,It's basically milliseconds,It will take dozens of milliseconds if it doesn't work out
();
//Here is another lock
synchronized(this) {
//After synchronizing the disk,就会Will标志位复位,再Release the lock
isSyncRunning = false;
//唤醒可能正在等待他同步完磁盘of线程
notifyAll();
}
}
//Represents一条edits log(Internal class)
private class EditLog {
long txid;
String content;
public EditLog(long txid, String content) {
= txid;
= content;
}
}
//Memory double buffering(Internal class)
private class DoubleBuffer {
//Specifically used to carry thread writingedits log
LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>();
//专门用来Will数据同步到磁盘中去of一块缓冲
LinkedList<EditLog> syncBuffer = new LinkedList<EditLog>();
//Willedits logWrite it into the memory buffer
public void write(EditLog log) {
(log);
}
//Swap two buffers,Preparation for synchronizing memory data to disk
public void setReadyToSync() {
LinkedList<EditLog> tmp = currentBuffer;
currentBuffer = syncBuffer;
syncBuffer = tmp;
}
//Getsync buffer缓冲区里of最大of一个txid
public Long getSyncMaxTxid() {
return ().txid;
}
//WillsyncBuffer缓冲区中of数据Flash to disk中
public void flush() {
for (EditLog log : syncBuffer) {
("Willedit logWrite to disk file:" + log);
//Normally,就是用文件输出流Will数据Write to disk file
}
();
}
}
}
6. Minimize the granularity of thread locking data (inventory segmentation deduction)
(1) Minimize the granularity of threads locking data as much as possible
(2) Case of inventory deductions in thousands of orders per second
(1) Minimize the granularity of threads locking data as much as possible
If there is a shared data, it contains multiple sub-data. You can lock the complete data. As long as the thread accesses this data, it will compete for the lock. You can also lock only some of the sub-data in the data, thereby reducing the granularity of locking and reducing competition. For example, when inventory deduction, you can only lock some inventory data instead of locking the complete inventory data.
1.Edits log segment locking mechanism to minimize the time a thread takes up locks as much as possible
2. The segmented deduction mechanism of inventory, which minimizes the granularity of a thread locking data as much as possible.
(2) Case of inventory deductions in thousands of orders per second
1. The problem of overselling inventory when not locked
Suppose the order system is deployed on two machines, and at some point different users have to buy 10 products, so the order system instances on these two machines receive purchase requests from different users respectively. Then each order system instance went to the database to check and found that the current product inventory was 12. So each order system instance sends SQL to the database to place an order, that is, 10 inventory is deducted. One of the instances deducted the inventory from 12 to 2, and the other instance deducted the inventory from 2 to -8, and the inventory was oversold.
2. Use distributed locks to solve the problem of inventory oversold
A distributed lock is the same lock key, and only one client can get the lock at the same time. In this way, only the client that acquires the lock can execute the subsequent inventory deduction logic.
Of course, in addition to using distributed locks to solve the problem of inventory oversold, you can also use pessimistic locks, optimistic locks, queue serialization, asynchronous queue scattering, Redis atomic operation, etc. to solve it.
3. Problems of distributed locking scheme in high concurrency scenarios
Once a thread of a certain client acquires the distributed lock of a certain product, then other threads of the client or all threads of other clients must wait serially for the order request of this product.
Assume that after acquiring the lock and before releasing the lock, check the inventory -> Create an order -> Deducting the inventory, it only takes 20 milliseconds, then 1 second (1000 milliseconds) can only accommodate 50 client threads to place orders on the same product.
Therefore, if you deal with e-commerce systems with low concurrency and no flash sale scenarios, distributed locks can still be solved. Because the concurrency volume is very low, there are less than 10 requests per second, and there is no case of high concurrency killing a single product in seconds. However, for the case of high concurrency flash sale products, it is obvious that distributed locks only support 50 threads.
4. How to optimize high concurrency for distributed locks
Divide the inventory data into many segments, each segment corresponding to a separate lock. When multiple threads modify inventory data concurrently, they can modify inventory data in different segments concurrently, thus avoiding that only one thread can exclusively modify the inventory data of a certain product at the same time.
The LongAdder class in Java also adopts a similar idea for segmented CAS operations. When a certain segment of CAS operation fails, it will automatically migrate to the next segment for CAS operation, thereby reducing the thread spins for a long time in high concurrency scenarios.
Therefore, the distributed lock optimization ideas under inventory issues are as follows:
Split one inventory into N inventory segments, such as 10 inventory segments, each inventory segment has 100 inventory. When each thread processes an order request, it will select one of 10 inventory segments to lock and deduct inventory. This will transform the competition for one inventory lock into a competition for 10 inventory locks, thereby improving concurrent performance by 10 times.
If a segment inventory is 0, then release the lock first, then traverse the next segment until it is found that the segment inventory is not 0 and then try to add the lock. If a segment inventory is greater than the purchased quantity, then add locks and deduct them directly. If a segment inventory is less than the purchased quantity, then multiple segment inventory must be locked and deducted.
The disadvantage of this segment locking solution is that it is more complicated to implement:
First, you need to store an inventory data in segments. For example, an inventory field should be divided into 10 fields. Secondly, every time you process inventory, you need to write a random algorithm and randomly select a segment to deduct it. Finally, if there is insufficient inventory in a certain segment, it is necessary to automatically switch to the next segment for processing.
public class RedisLockOptimizeDemo {
public static void main(String[] args) throws Exception {
Long goodsSkuId = 1L;//ID
Long purchaseCount = 50L;//Purchase quantity
int stockSegmentSeq = new Random().nextInt(10) + 1;//Random inventory segment
InventoryDAO inventoryDAO = new InventoryDAO();
RLock lock = new RLock("stock_" + goodsSkuId + "_" + stockSegmentSeq);
// Check the inventory of segments after adding segments
();
Long stock = (goodsSkuId, stockSegmentSeq);
//If the inventory is found to be 0, then check the next segment
if (stock == 0L) {
();
boolean foundOtherStockSegment = false;
for (int i = 1; i <= 10; i++) {
if (i == stockSegmentSeq) {
continue;
}
lock = new RLock("stock_" + goodsSkuId + "_" + i);
();
stock = (goodsSkuId, i);
if (stock != 0) {
stockSegmentSeq = i;
foundOtherStockSegment = true;
//The inventory of this segment is not 0, then the logic behind it will be unlocked after the end
break;
} else {
//The inventory of this segment is 0, then you have to unlock. Continue to traverse the next segment.
();
}
}
if (!foundOtherStockSegment) {
("Insufficient commodity inventory");
return;
}
}
//If the inventory segment is exactly larger than the quantity to be purchased
if (stock >= purchaseCount) {
(goodsSkuId, stockSegmentSeq, stock - purchaseCount);
();
return;
}
//The code execution reaches this point, which means that the current inventory of this segment is less than the quantity to be purchased. Merge the segment and add locks.
Long totalStock = stock;
Map<RLock, Long> otherLocks = new HashMap<RLock, Long>();
for (int i = 1; i <= 10; i++) {
if (i == stockSegmentSeq) {
continue;
}
RLock otherLock = new RLock("stock_" + goodsSkuId + "_" + i);
();
Long otherStock = (goodsSkuId, i);
if (otherStock == 0) {
();
continue;
}
totalStock += otherStock;
(otherLock, otherStock);
if (totalStock >= purchaseCount) {
break;
}
}
//After trying all other segments, it is still impossible to meet the purchase quantity, and you need to release the lock for those segments
if (totalStock < purchaseCount) {
("Insufficient commodity inventory");
for (<RLock, Long> otherLockEntry : ()) {
().unlock();
}
return;
}
//Deduct the initial segment inventory first
Long remainsReducingStock = purchaseCount - stock;
(goodsSkuId, stockSegmentSeq, 0L);
();
for (<RLock, Long> otherLockEntry : ()) {
RLock otherLock = ();
int otherStockSegmentSeq = ((() - 1));
Long otherStock = ();
if (otherStock <= remainsReducingStock) {
remainReducingStock -= otherStock;
(goodsSkuId, otherStockSegmentSeq, 0L);
} else {
remainReducingStock = 0L;
(goodsSkuId, otherStockSegmentSeq, otherStock - remainReducingStock);
}
();
}
}
static class RLock {
String name;
public RLock(String name) { = name; }
public void lock() { ("add distributed lock:" + name); }
public void unlock() { ("Release distributed lock:" + name); }
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : ());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) { return true; }
if (obj == null) { return false; }
if (getClass() != ()) { return false; }
RLock other = (RLock) obj;
if (name == null) {
if ( != null) {
return false;
}
} else if (!()) {
return false;
}
return true;
}
}
static class InventoryDAO {
public Long getStock(Long goodsSkuId, Integer stockSegmentSeq) { return 1000L; }
public void updateStock(Long goodsSkuId, Integer stockSegmentSeq, Long stock) { }
}
}
7. Try to separate the lock according to different functions (avoid locking in the loop)
(1) Separate the locks according to different functions as much as possible
(2) Avoid frequent locking and releasing locks during the cycle
(1) Separate the locks according to different functions as much as possible
If you can split a lock into multiple locks according to the different functions of use. Then when using different functions, different locks can be used, thereby reducing the conflicts when threads compete for the same lock.
For example, if LinkedBlockingQueue blocks the queue with bounded blocking, two locks are used. Use a take lock at the head of the queue, and use a put lock at the tail of the queue. When executing the put() method, first acquire the put lock, and then join the queue element from the tail of the queue. When executing the take() method, first acquire the take lock, and then dequeue the element from the head of the queue. In this way, there will be no lock conflicts in the enqueue and dequeue operations of the blocking queue at the same time.
(2) Avoid frequent locking and releasing locks during the cycle
Try to avoid frequent locking and releasing locks in the for loop.
8. Minimize competition between high concurrent threads on locks (multi-level caching)
(1) How to reduce thread-to-lock competition in high concurrency scenarios
(2) The service registry reduces the frequency of lock competition through multi-level caching
(1) How to reduce thread-to-lock competition in high concurrency scenarios
1. Use read and write locks
Read and write locks are mainly used to resolve that read requests and read requests do not conflict and can be executed in parallel, but write requests and read requests still conflict, and write requests and write requests also conflict.
2. Lock separation according to different functions
For example, according to the different usage functions, split a lock into multiple locks.
3. Reduce the time taken by locks
For example, edits log segment lock in distributed storage systems.
4. Data segmentation locking
For example, segmented inventory deductions for distributed locks.
5. Reduce the competition frequency of thread-to-lock
For example, multi-level cache is used to reduce the frequency of lock competition.
(2) The service registry reduces the frequency of lock competition through multi-level caching
1. There is still concurrent read and write conflict in read and write
Although the Service Registry has used a read-write lock to handle the read-write service registry, when writing operations on the service registry is required, a large number of read operations may occur. Now assume that concurrent read and write conflicts occur every minute is 10 times.
2. Multi-level cache operation logic
Therefore, a multi-level caching mechanism can be used to optimize the read and write concurrency conflict problem of the service registry. The multi-level cache of the service registry is: first-level cache (read-only cache) and second-level cache (read-write cache).
All read requests read data from the first-level cache (read-only cache). If the first-level cache (read-only cache) is not found, then search from the second-level cache (read-write cache). If the secondary cache (read and write cache) is not found, it will be loaded from the service registry.
3. Expiration strategy for multi-level caching
The expiration strategy of data in the first-level cache (read-write cache) is: through a timing task, the data in the second-level cache is kept synchronized (consistent) every 30 seconds.
The expiration strategy of data in the secondary cache (read and write cache) is: when a service instance is registered or offline, it will actively expire (update) its corresponding secondary cache. In addition, when building a Level 2 cache, the default expiration time is specified to be 180 seconds, which will automatically expire after 180 seconds.
4. How to reduce the frequency of concurrent read and writes by multi-level cache
Level 1 cache is implemented using ConcurrentHashMap, and Level 2 cache is implemented using Guava Cache. The first-level cache is mainly used to reduce the write frequency per minute, while the second-level cache is mainly used to separate hot and cold data.
When the data corresponding to a key in the service registry is updated, the data cached in the second level will be updated immediately, and the data cached in the first level will be updated after 30 seconds.
When a key data in the service registry is not updated, the secondary cached data will expire in 180 seconds, and the primary cached data will become null after 30 seconds of expiration. When accessing the first-level cache in the future, the corresponding data is obtained from the service registry and written to the second-level cache.
Through these two-level caches, the number of conflicts in the registry per minute of concurrent read and write cache per minute can be greatly reduced. For example, the first-level cache can reduce the number of concurrent read and write conflicts per minute from 10 to 2 times, because the first-level cache The data is updated at most every 30 seconds. The secondary cache implements cold data without access to 180 seconds, and does not cache it, reducing the amount of hot data that needs to be cached.
5. The nature of multi-level cache
Level 1 cache is read-only cache, and level 2 cache is read-write cache. There are very few concurrent read and write situations in Level 1 cache (read-only cache), twice a minute. In the secondary cache (read and write cache), hot and cold data will be separated, and cold data will not be cached.
After the data of the service registry is updated, it will be updated to the first-level cache at most 30 seconds. Multi-level cache sacrifices real-time consistency between reading and writing data to reduce the frequency of reading and writing competition.
public class MutilCache {
//Level 1 cache (read-only cache)
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
//Level 2 cache (read and write cache)
private final LoadingCache<Key, Value> readWriteCacheMap;
public MultiCache() {
= ()
.expireAfterWrite(180, )
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) {
//Get data from the registry
return getFromRegistry(key);
}
});
...
}
//Read the value of a key in the service registry
public Value get(final Key key) {
return getValue(key);
}
public Value getValue(final Key key) {
Value payload = null;
try {
//First read the data of the first-level cache (read-only cache)
final Value currentPayload = (key);
if (currentPayload != null) {
payload = currentPayload;
} else {
//Then read the data of the secondary cache (read and write cache)
payload = (key);
(key, payload);
}
} catch (Throwable t) {
("Cannot get value for key : {}", key, t);
}
return payload;
}
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
//Transfer the first level cache (read-only cache)
for (Key key : ()) {
Value cacheValue = (key);
Value currentCacheValue = (key);
//Judge whether the data in the second-level cache is the same as the data in the first-level cache. If it is not the same, update the data in the first-level cache.
if (cacheValue != currentCacheValue) {
(key, cacheValue);
}
}
}
}
}
...
}
9. Summary of deadlocks for lock failures
(1) Example of deadlock
(2) Necessary conditions for the occurrence of deadlock
(3) How to solve the deadlock problem
(4) Common ways to avoid deadlocks
(1) Case of deadlock
Case 1:
If there are time-consuming operations in the <clint>() method of a class, it may cause blockage when multiple threads initialize the class at the same time.
If both classes try to initialize each other by reflection in their respective static statement blocks, concurrent calls to the initialization methods of both classes may result in a deadlock.
Case 2:
Simple lock order deadlocks, different classes.
synchronized(left)
synchronized(right)
------------------------
synchronized(right)
synchronized(left)
Case Three:
Dynamic lock sequence deadlocks, the same class, but different object instances, such as mutual transfer scenarios.
synchronized(fromAccount)
synchronized(toAccount)
(2) Necessary conditions for the occurrence of deadlock
Whether it is a thread-level deadlock or a database-level deadlock, it can only be solved through manual intervention. There are four conditions that cause deadlock. If these four conditions are met at the same time, a deadlock will be generated.
Condition 1: Mutual Exclusion Condition
Shared resources X and Y can only be occupied by one thread.
Condition 2: Occupation and waiting conditions
Thread 1 has obtained the shared resource X, and does not release the shared resource X while waiting for the shared resource Y.
Condition 3: Do not preempt conditions
Other threads cannot forcibly seize the resources occupied by thread 1.
Condition 4: Circular waiting conditions
Thread 1 is waiting for the shared resources occupied by thread 2, and thread 2 is waiting for the shared resources occupied by thread 1.
(3) How to solve the deadlock problem
As long as any of the four conditions that generate deadlocks are destroyed, deadlocks can be avoided. Among them, the mutually exclusive conditions are not corruption because this is the basic constraint of the mutex lock, and all the other three conditions can be corrupted.
1. Destroy possession and wait for conditions
To destroy the resource X while still waiting for resource Y, you can apply for resource X and resource Y at one time. Each thread applies for all resources at once, so there is no longer any part of it to take up a part.
2. Destroy the conditions that cannot be seized
When a thread that occupies part of the resources further applies for other resources, if it cannot apply, it can actively release the resources it possesses, so that the conditions for non-preemption will be destroyed. The core of destroying the non-preempt condition is that the current thread can actively release the resources it attempts to occupy.
Synchronized will directly enter the blocking state when it cannot apply for resources. Once the thread is blocked, it will no longer be able to release the already occupied resources. The tryLock() method in the Lock interface can try to preempt the lock, and the preemption will not block the thread if it fails. So synchronized cannot implement to break this condition, but () can be implemented.
3. Destroy the cycle waiting conditions
Number resources in a certain order, and all applications for locking resources are obtained in a certain order. For example, when all threads apply for resources, they first apply for a small resource number and then apply for a large resource number.
(4) Common ways to avoid deadlocks
1. Avoid one thread obtaining multiple locks at the same time
2. Avoid one thread occupying multiple resources at the same time in the lock, and try to ensure that each lock only occupies one resource.
3. Try to use a timeout instead of using an internal lock
4. For database locks, locking and unlocking must be in the same database connection.
10. Lock-free programming
(1) A lock-free queue that writes and reads (memory barrier)
(2) Lock-free queue with one write and multiple reads (volatile keyword)
(3) Lockless queue with more writing and reading (CAS mechanism)
(3) Lockless queue with more writing and reading (CAS mechanism)
(4) Lockless stack with more writing and multiple reading (CAS mechanism)
(5) A chainless table with more writing and reading
Lock in Java mainly refers to synchronized keywords and Lock interfaces, and lock in Linux mainly refers to pthread and mutex.
(1) A lock-free queue that writes and reads (memory barrier)
A lock-free queue that writes and reads is a kfifo queue for the Linux kernel. Two threads are written and read one by one, and there is no need for locks, just memory barriers.
(2) Lock-free queue with one write and multiple reads (volatile keyword)
Disruptor is an open source concurrency framework that can implement queue concurrent operations without locks. The reason why Disruptor's RingBuffer can be completely lock-free is because of single-thread writing. RingBuffer has a header pointer, corresponding to a producer thread. RingBuffer has multiple tail pointers, corresponding to multiple consumer threads. Pointer types are all volatile.
(3) Lockless queue with more writing and reading (CAS mechanism)
Based on CAS and linked lists, a lock-free queue with multiple writes and multiple reads can be implemented. The linked list has a head pointer head and a tail pointer tail. When joining the queue, perform CAS operation on the tail pointer, and when dequeuing the queue, perform CAS operation on the head pointer.
(4) Lockless stack with more writing and multiple reading (CAS mechanism)
The lock-free stack is simpler than the lock-free queue, and only requires CAS operation on the head pointer. When entering the stack, the head pointer is CAS operation, and when leaving the stack, the head pointer is CAS operation.
(5) A chainless table with more writing and reading
ConcurrentSkipListMap concurrent jump table query is based on chainless lists.