Location>code7788 >text

Based on Redission, Implement the Master Selection of Distributed Scheduling Tasks

Popularity:475 ℃/2025-04-15 11:24:35

In the Spring Cloud microservice architecture, the master selection and heartbeat monitoring of distributed scheduling tasks can be implemented based on Redisson, which can be achieved through the following complete solution. This solution combines Redisson's distributed locking and publish/subscribe capabilities to ensure high reliability and real-time:


1. Overall architecture design

graph TD A[Microservice Example 1] -->|Competition Lock| B(Redisson) C[Microservice Example 2] -->|Competition Lock| B D[Microservice Example 3] -->|Competition Lock| B B -->|Heartbeat Event| E[Redis Pub/Sub] E --> A E --> C E --> D

2. Core implementation steps

1. Add dependencies

<!-- Redisson Starter -->
<dependency>
    <groupId></groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.23.2</version>
</dependency>

2. Master selection service implementation

import ;
 import ;
 import ;
 import ;

 import ;
 import ;
 import ;

 @Service
 public class LeaderElectionService {

     private static final String LEADER_LOCK_KEY = "scheduler:leader:lock";
     private static final String HEARTBEAT_CHANNEL = "scheduler:heartbeat";

     @Autowired
     private RedissonClient redisson;
    
     private volatile boolean isLeader = false;
     private RLock leaderLock;
     private Thread heartbeatThread;

     @PostConstruct
     public void init() {
         leaderLock = (LEADER_LOCK_KEY);
         startElection();
         startHeartbeatListener();
     }

     private void startElection() {
         // Try to obtain leadership (non-blocking)
         new Thread(() -> {
             while (!().isInterrupted()) {
                 try {
                     // Try to acquire the lock, the lock expires for 30 seconds
                     boolean acquired = (0, 30, );
                     if (acquired) {
                         isLeader = true;
                         ("The current node is elected as Leader");
                         startHeartbeatTask(); // Start the heartbeat task
                         break;
                     }
                     (5000); // Retry every 5 seconds
                 } catch (InterruptedException e) {
                     ().interrupt();
                 }
             }
         }).start();
     }

     private void startHeartbeatTask() {
         heartbeatThread = new Thread(() -> {
             while (isLeader && !().isInterrupted()) {
                 try {
                     // 1. Renewal lock (watchdog mechanism will automatically handle)
                     // 2. Release a heartbeat
                     (HEARTBEAT_CHANNEL)
                            .publish(());
                    
                     (10000); // Send a heartbeat every 10 seconds
                 } catch (InterruptedException e) {
                     ().interrupt();
                 }
             }
         });
         ();
     }

     private void startHeartbeatListener() {
         // Listen to the Leader's heartbeat
         (HEARTBEAT_CHANNEL)
                .addListener(, (channel, heartbeatTime) -> {
                    ("Received Leader Heartbeat: " + heartbeatTime);
                    // The last heartbeat time can be updated here
                });
     }

     @PreDestroy
     public void shutdown() {
         if (isLeader && ()) {
             ();
             isLeader = false;
             if (heartbeatThread != null) {
                 ();
             }
         }
     }

     public boolean isLeader() {
         return isLeader;
     }
 }

3. Health check enhancement

@Service
 public class HealthCheckService {
    
     @Autowired
     private RedissonClient redisson;
    
     private volatile long lastHeartbeatTime = 0;
    
     @PostConstruct
     public void init() {
         // Check the Leader status regularly
         ()
                 .scheduleAtFixedRate(this::checkLeaderStatus, 0, 5, );
     }
    
     private void checkLeaderStatus() {
         Long currentTime = ("scheduler:leader:heartbeat").get();
         if (currentTime != null) {
             lastHeartbeatTime = currentTime;
         }
        
         // No heartbeat received for more than 30 seconds. I think the Leader is invalid.
         if (() - lastHeartbeatTime > 30000) {
             ("Leader may have been down, triggering a re-election");
             // Active lock-up logic can be triggered here
         }
     }
 }

3. Key optimization points

1. Multi-level fault detection

Test method Trigger condition Resuming action
Redisson watchdog timeout Lock renewal failed (default 30 seconds) Automatically release the lock, other nodes can compete
Active heartbeat timeout Custom threshold (such as 30 seconds) Forced release of locks and re-election
Redis connection disconnected Pause elections until connections resume

2. Optimized configuration of election performance

#
 redisson:
   lock:
     watchdog-timeout: 30000 # Watchdog timeout (ms)
   threads: 16 # Number of event processing threads
   netty-threads: 32 # Netty worker thread count

3. Schizobra protection plan

// Use Redisson's MultiLock to implement multi-Redis node lock
 RLock lock1 = (LEADER_LOCK_KEY);
 RLock lock2 = (LEADER_LOCK_KEY);
 RLock multiLock = (lock1, lock2);

 boolean acquired = (0, 30, );

4. Production environment deployment suggestions

1. Redis architecture selection

Deployment Mode Applicable scenarios Recommended configuration
Sentinel mode High availability requirements 3 Sentry + 3Redis instance
Cluster mode Large data volume + high performance At least 6 nodes (3 masters and 3 slaves)
Single node Develop tests only Not recommended for production use

2. Monitoring indicators

// Expose Redisson metrics (with Spring Boot Actuator)
 @Bean
 public RedissonMetricsBinder redissonMetrics(RedissonClient redisson) {
     return new RedissonMetricsBinder(redisson);
 }

Monitoring key indicators:

  • .active_threads: Number of active threads
  • : Number of subscriptions
  • : Active connections

3. Disaster recovery plan

  • Double Live Data Center:passRedissonClientConfigure multi-region endpoints
    Config config = new Config();
    ()
        .addNodeAddress("redis://dc1-node1:6379")
        .addNodeAddress("redis://dc2-node1:6379");
    
  • Downgrade strategy: The last known state of local cache
    @Bean
    @Primary
    public LeaderService fallbackLeaderService() {
        return new FallbackLeaderService(redisLeaderService, localCache);
    }
    

5. Integrate with Spring Cloud

1. Scheduling task control

@Scheduled(fixedRate = 5000)
 public void scheduledTask() {
     if (()) {
         // Only the logic executed by Leader
         processBatchData();
     }
 }

2. Dynamic configuration update

@RefreshScope
 @RestController
 @RequestMapping("/leader")
 public class LeaderController {
    
     @Value("${:30000}")
     private long electionTimeout;
    
     @Autowired
     private LeaderElectionService electionService;
    
     @PostMapping("/timeout")
     public void updateTimeout(@RequestParam long timeout) {
         // Dynamic adjustment of election timeout
         (timeout);
     }
 }

6. Summary of the advantages of the plan

  1. Sub-second fault detection: Real-time notifications through Redis Pub/Sub
  2. Automatic failover: Redisson watchdog mechanism guarantees lock release
  3. Elastic expansion: Support dynamic addition and decrease of microservice instances
  4. Minimum dependency: Redis cluster only, no additional components required
  5. Seamless integration with Spring Eco: Perfect cooperation@ScheduledComponents