Location>code7788 >text

Solving the problem of duplicate insertion of IM chat data using distributed locks

Popularity:986 ℃/2024-08-22 07:46:26

navigator

  • Operational background
  • Problem analysis and positioning
  • Explore viable solutions
    • Database Level Processing - Unique Indexes
    • Application Level Processing - Distributed Locks
  • Distributed Lock Overview
    • What are the required characteristics of a distributed lock?
    • What are the implementations of distributed locks?
      • Database-based realization
      • Based on the Redisson implementation
  • Introduction to Redission
    • summarize
    • reentrant lock
  • Based on Redisson solutions
    • Program combing
    • Springboot Integration with Redisson
  • concluding remarks
  • consultation

This article was first publishedUsing Distributed Locks to Solve the Problem of Duplicate Insertion of IM Chat Data.

Operational context and issues

In the IM chat business, in addition to building their own chat servers and structuring closed-loop consulting chats, it is often necessary to access the IM traffic of three-party platforms.

This one will have to adapt to the various platforms to push the flow.

In our self-built IM chat service solution, IM session creation and message reception are two separate modules (interfaces).
This design approach separates the two processes at the client level and ensures sequentiality, effectively avoiding unforeseen problems.

However, the three-way traffic platform's are pitching traffic to us via message push flow, and we must complete the creation of clients, sessions, and messages in the process of receiving the traffic.



If all messages are queued and executed one by one, then the process is fine.

However, we found that when three parties push messages it occasionally happens that multiple messages from the same customer are pushed, and this concurrent writing leads to duplicate data writes.

This situation can then lead to new clients being created multiple times and corresponding sessions being created multiple times.



And it also brings up the occasional data query inselectOneThe anomaly.

desc":": nested exception is : Expected one result (or null) to be returned by selectOne(), but found: 2

Until a specific problem is identified, we add a specific query to thelimit 1Restrictions, in principle, take the most recent one.

Problem analysis and positioning

For chat scenarios, this generation of dirty data is not tolerated.

In order to find a fundamental solution to the problem, we started a special investigation.

I walked through the code and found no obvious bugs at the code level. however, from the data the odds are that concurrent message delivery is causing it.

To prove this conjecture, I wrote a test case to verify it.

To do this, write a python script program that simulates 10 threads, each of which calls the message-receiving business interface, and each of whichfromUsercap (a poem)toUserIt's all the same.

The core idea is to push multiple messages to a person at the same time.

It was verified that the issue of duplicate data writes recurred. The cause of concurrent requests has been solidified.

A simple schematic is given here to explain the flow of concurrent requests.



Exploration of feasibility options

We've thought about this ourselves, and there are two broad solutions:

  • Data-level solutions
  • Application level resolution

Data-level solutions

This is well understood, using Mysql field unique indexes to stop duplicate inserts, which is the database's own mechanism.
However, becauseuserstatistical tablestenantUserIdFields are initially designed for unique indexing.

ALTER TABLE user ADD UNIQUE uk_tenant_user_id( tenantUserId );

in case oftenantUserIdAfter the columns are added with unique indexes, when the above concurrency situation occurs, one of the request 1 and request 2 will definitely prioritize the completion of the data insertion operation, while the other will get a similar error. Therefore, it is ultimately guaranteed thatuserThere is only one entry in the tabletenantUserId= xxx records exist.

 Cause: : Duplicate entry 'xxx' for key 'tenantUserId'\n##

After evaluation, the current single table is already just 2000w of data. It is not practical to upgrade in a short time.
And the restoration of historical data is no small feat.

Application level resolution

Another way of thinking about the solution is that we don't rely on the underlying database to provide us with a guarantee of uniqueness, but instead rely on the application's own code logic to avoid concurrency conflicts.

The reason why we encounter the problem of repeated insertion of data is because the actions of "checking if the data already exists" and "inserting the data" are separated. Since these two steps are not atomic, two different requests can pass the first step at the same time. If we can combine these two actions into one atomic operation, we can avoid data conflicts. At this point we need to realize the atomicity of this code block by adding locks.


Considering that our application API is deployed on multiple machines, we decided to use the industry's more mature distributed locking scheme.

Distributed Lock Overview

What are the required characteristics of a distributed lock?

  • In a distributed system environment, only one thread on one machine can acquire the lock at the same time
  • Highly Available Lock Acquisition and Release
  • High Performance Lock Acquisition and Release
  • With reentrant features
  • Lock deactivation mechanism to prevent deadlocks
  • Non-blocking lock feature, i.e., no lock acquisition will return a failure to acquire a lock.

There are three main distributed lock implementations as follows:

  • Database-based implementation of distributed locks
  • Implementing Distributed Locks Based on Zookeeper
  • Implementing Distributed Locks Based on Redis

Specific implementations of each can be found inWhat are distributed locks? Three Ways to Implement Distributed Locks

In addition to the above three distributed lock implementations, there is another one based on theRedissionRealization approach.
Because our business interface is based on the Springboot framework, so check out the relevant information we choose aRedissionRealization.

Introduction to Redission

summarize

Redisson is a Java resident in-memory data grid (In-Memory Data Grid) implemented on the basis of Redis. It not only provides a series of distributed Java common objects , but also provides many distributed services. These include (BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch. Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson provides the easiest and most convenient way to use Redis. Redisson aims to facilitate the Separation of Concern so that users can focus more on business logic.

Here is the structure of Redisson:

Redisson can be used as a standalone node to independently execute remote tasks posted by other nodes to the Distributed Execution Service and the Distributed Scheduling Task Service.



Reentrant Lock

Redisson Distributed Reentrant Lock RLock Java object based on Redis implements the interface. Interfaces for Async, Reactive and RxJava2 standards are also provided.

RLock lock = ("anyLock");
// The most common use
().

As you know, if the Redisson node responsible for storing this distributed lock goes down, and the lock happens to be in a locked state, the lock will appear to be locked. In order to avoid this situation, Redisson internally provides a watchdog that monitors the locks, and its role is to continuously extend the validity of the locks before the Redisson instance is shut down. By default, the watchdog's timeout for checking locks is 30 seconds, which can also be changed by modifying theto be designated separately.

Additionally, Redisson provides a releaseTime parameter to the lock method to specify how long the lock is to be released. After this time, the lock is automatically released.

Additionally Redisson provides locking methods through theleaseTimeparameter to specify how long the lock is to be applied. The lock is automatically unlocked after this time.

// Unlock automatically after 10 seconds of locking
// No need to call the unlock method to manually unlock the door.
(10, ); // no need to call the unlock method to manually unlock the door.

// Attempt to lock, wait up to 100 seconds, unlock 10 seconds after locking.
boolean res = (100, 10, ); if (res) { boolean res = (10, )
if (res) {
   try {
     ...
   } finally {
       (); }
   }
}

Redisson also provides methods related to asynchronous execution for distributed locks:

RLock lock = ("anyLock");
();
(10, );
Future<Boolean> res = (100, 10, );

The RLock object is fully compliant with Java's Lock specification. That is, only the process that owns the lock can unlock it, while other processes will throw an IllegalMonitorStateException if they unlock it. However, if you need other processes to be able to unlock the lock, use a distributed semaphore object.

with respect toRedissonFor more information, please go toRedisson Chinese Documentation

Based on Redisson solutions

In this case, we have used a Redisson-based implementation of distributed locking.

Program combing

The technical solution is determined, but it still needs to be rationally applied in conjunction with practical scenarios.
So, where do we put the locks?



Once again, we have combed through the message reception processing flow and added distributed locks to the original.

Springboot Integration with Redisson

Introduction of redisson in

<dependency>
   <groupId></groupId>
   <artifactId>redisson</artifactId>
   <version>3.34.1</version>
</dependency>  

redis configuration in yml file

  redis:
    enabled: true
    host: xxxx
    port: 6371
    password: xxx
    database: 2
    timeout: 10000
    connectionPoolSize: 15
    connectionMinimumIdleSize: 5

@Configuration
@ConditionalOnExpression("${}")
public class RedissonConfig {

    @Value("${}")
    private String host;

    @Value("${}")
    private String port;

    @Value("${}")
    private String timeout;

    @Value("${}")
    private String password;

    @Value("${}")
    private int database;

    @Value("${}")
    private int connectionPoolSize;

    @Value("${}")
    private int connectionMinimumIdleSize;

    @Bean(name = "redissonClient")
    public RedissonClient redissonClient() {
        Config config = new Config();
        (new StringCodec());
        SingleServerConfig singleServerConfig =
                ()
                        .setAddress("redis://" + host + ":" + port)
                        .setDatabase(database)
                        .setConnectionPoolSize(connectionPoolSize)
                        .setConnectionMinimumIdleSize(connectionMinimumIdleSize)
                        .setTimeout((timeout));
        if ((password)) {
            (password);
        }
        return (config);
    }
}

Once the above is ready, it's in use.

Core Code Implementation

        // New creation to add distributed locks
        String mutex = ("im:lock:user:{}", ());
        RLock lock = (mutex);
        boolean successLock = ();
        if (!successLock) {
            // Failure to acquire distributed lock
            (("{\"Method\":\"%s\",\"content\":\"%s\"}", "[getOrCreateUser]", (createUserDto))); }
            throw new BizException("The customer is already being created", ResponseCodeEnum.GET_R_LOCK_FAIL.getCode()); }
        }
        // Create the user
        User visitor = new User();
        (());
        //...
        //Message creation process, the first time to create a customer, a session.
        // Add retry mechanism in case of lock acquisition failure
        try {
            receiveMessage(inputDto).
        }catch (BizException ex)
        {
            (("{\"Method\":\"%s\",\"content\":\"%s\"}","[receiveMessage]", ex));
            if(().equals(ResponseCodeEnum.GET_R_LOCK_FAIL.getCode())) {
                // retry once
                (1000);
                ((("{\"Method\":\"%s\",\"content\":\"%s\"}","[]", (inputDto))));
                receiveMessage(inputDto);
            }
        }

Notes: Springboot on how to use Redisson, more specific implementation of the code please move toThe Spring Boot Practical Warfare Chronicle, available in the project source code.

test case

Make sure to write code that is adjustable. -Review and Reflections on Several Overnight Releases.

Over the years of my career, I've gradually figured out a dumb way to ensure code quality - single-step debugging.

Here we also write a test case. Specifically is the idea also mentioned earlier, will not repeat here.

import json
import requests
import time
import uuid
import threading

def receive_xhs_msg():

    try:
          #requestingurl
          url = """http://localhost:7071/api/message/receive"""

          # 增加requesting头
          headers = {
            "Content-Type": "application/json; charset=UTF-8"
          }
          message_id=str(uuid.uuid4())
          print('message_id:'+message_id)
          userInfo={
                "header_image":"",
                "nickname":"- ",
                "user_id":"63038d28000000001200d311"
          }
   
          payload={
                    "content":"6ED5KduMqTDJZ1ztw+ZPgw==~split~OMo7DD2gqsJqBafx9WKsZlnNNkcEYD4hLLPczczIFmr+YMtTB9Wz4ZI0MYCM4cF28kG7rfqnXdR9cRmamEJzHmKLfTmVxv5jzGUFVQOU00iimtunMAEJ4x76oJDrdAVUc4bJfV5zFLotz/Bm0WM9TADvD2cLhpHsVmaZRXaiJ96wMQgqx+K727l5S15jmMa5PiLqZqBO2q/G+WEkJSbfLQ==",
                    "from_user_id":"63038d28000000001200d311",
                    "intentCommentId":"",
                    "message_id":message_id,
                    "message_source":2,
                    "message_type":"HINT",
                    "timestamp":"1723268668573",
                    "to_user_id":"575d2c135e87e733f0162b88",
                    "user_info":[userInfo]
            }

          #convertjson
          getJson=(payload)
          #构造发送requesting
          response=(url=url,data=getJson,headers=headers)
          #Printing response data
          print()
          (1)
    except Exception as e:
          print('Error:',e)
    finally:
          print('Implementation completed')

if __name__ == '__main__':
      threads = []
      for _ in range(10):  # Cyclic Creation10thread
            t = (target=receive_xhs_msg)
            (t)
      for t in threads:  # cyclic activation10thread
            ()
            ()

concluding remarks

Distributed locks are widely used in daily work, such as interface dithering (anti-duplicate commit), concurrency processing, and so on.

It happened to be a vivid practice in recent IM message handling.

A bit of shallow experience to share in the hope that it will serve to throw a wrench in the works.

consultation

  • Redisson Chinese Documentation
  • What are distributed locks? Three Ways to Implement Distributed Locks
  • Flexible Use of Distributed Locks to Resolve Duplicate Data Insertion Problems
  • Using Distributed Locks to Solve the Problem of Duplicate Insertion of IM Chat Data.