Location>code7788 >text

The use and analysis of zk basics

Popularity:473 ℃/2025-04-05 23:54:33

Outline

1. Basic zk data operations based on Curator

2. Implement cluster metadata management based on Curator

3. Implement HA master-secure automatic switching based on Curator

4. Implement Leader election based on Curator

5. Implement distributed Barrier based on Curator

6. Implement distributed counters based on Curator

7. Implementing zk's node and child node monitoring mechanism based on Curator

8. Source code analysis of creating client instances based on Curator

How to establish a connection with zk at startup

10. Source code analysis of adding, deleting, modifying and checking nodes based on Curator

11. Implementation of source code of node listening callback mechanism based on Curator

12. Implementation source code of Leader election mechanism based on Curator

 

1. Basic zk data operations based on Curator

Guava is to Java what Curator is to ZooKeeper, the following is introduced:

<dependencies>
    <dependency>
        <groupId></groupId>
        <artifactId>curator-framework</artifactId>
        <version>2.12.0</version>
    </dependency>
    <dependency>
        <groupId></groupId>
        <artifactId>curator-recipes</artifactId>
        <version>2.12.0</version>
    </dependency>
</dependencies>

The example of Curator implementing the addition, deletion, modification and search of znode is as follows, where CuratorFramework represents a client instance. Note: The cascade creation of the specified node can be performed through the creationParentsIfNeeded() method.

public class CrudDemo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
         ();//Start the client and establish a connection
     
         ("Curator client has been started");

         ()
             .creatingParentsIfNeeded()//Create cascade
             .withMode()//Specify the node type
             .forPath("/my/path", "10".getBytes());//Increase

         byte[] dataBytes = ().forPath("/my/path");//Check
         (new String(dataBytes));

         ().forPath("/my/path", "11".getBytes());//Change
         dataBytes = ().forPath("/my/path");
         (new String(dataBytes));

         List<String> children = ().forPath("/my");//Check
         (children);

         ().forPath("/my/path");//Delete
         (Integer.MAX_VALUE);
     }
 }

 

2. Implement cluster metadata management based on Curator

Curator can operate zk. For example, a self-developed distributed system is similar to Kafka and Canal, and wants to put all the core metadata of the cluster operation into zk. At this time, you can create some znodes through Curator and write the corresponding value into it.

 

It is recommended to use json format for the written values. For example, Kafka writes json format data to zk. In this way, other clients can read cluster metadata from it when they need it.

 

3. Implement HA master-secure automatic switching based on Curator

HDFS, Kafka, and Canal all use zk for Leader elections, so HA master and standby automatic switching can be achieved based on Curator.

 

HDFS NameNode can deploy HA architecture, with two main and backup machines. If the main machine goes down, the backup machine can sense and elect as the Leader, so the backup machine can provide services to the outside world as the new NameNode.

 

The controller in Kafka is responsible for managing the collaboration of the entire cluster. Any Broker in Kafka can become a controller, similar to the role of Leader.

 

Canal will also deploy two main and backup machines. If the main machine is hung up, the backup machine can follow.

 

4. Implement Leader election based on Curator

(1) LeaderLatch, the first way to implement Leader elections by Curator

(2) The second way of Curator implementing Leader elections: LeaderSelector

 

(1) LeaderLatch, the first way to implement Leader elections by Curator

Implement Leader election through Curator's LeaderLatch:

public class LeaderLatchDemo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
         ();
        
         //"/leader/latch" This is actually a znode order node
         LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");
         ();
         ();// Until he waits for him to become the leader and then executes it later

         //Similar to HDFS, two machines, one of which becomes the leader, starts working
         // Another machine can block here through await until the leader hangs, and it will become the leader and continue to work
         Boolean hasLeaderShip = ();//Judge whether to become a Leader
         ("Whether to become a leader:" + hasLeaderShip);

         (Integer.MAX_VALUE);
     }
 }

(2) The second way of Curator implementing Leader elections: LeaderSelector

The Leader election is implemented through Curator's LeaderSelector as follows: Among them, LeaderSelector has two listeners that can pay attention to the connection status.

public class LeaderSelectorDemo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
         ();

         LeaderSelector leaderSelector = new LeaderSelector(
             Client,
             "/leader/election",
             new LeaderSelectorListener() {
                 public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                     ("You have become the Leader...");
                     //Do everything about Leader here, the method cannot exit at this time
                     (Integer.MAX_VALUE);
                 }
                
                 public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                     ("The change in connection state is no longer a leader...");
                     if (()) {
                         throw new CancelLeadershipException();
                     }
                 }
             }
         );
         ();//Try to compete with other nodes on the node "/leader/election" to become a leader
         (Integer.MAX_VALUE);
     }
 }

 

5. Distributed Barrier based on Curator implementation

(1) Distributed Barrier

(2)Distributed Dual Barrier

 

(1) Distributed Barrier

Many machines can create a Barrier, and they are blocked at this time. They will not be blocked unless a condition (setBarrier() or removeBarrier()) is satisfied.

//DistributedBarrier
public class DistributedBarrierDemo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
        ();

        DistributedBarrier barrier = new DistributedBarrier(client, "/barrier");
        ();
    }
}

(2)Distributed Dual Barrier

//DistributedDoubleBarrier
 public class DistributedDoubleBarrierDemo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
         ();

         DistributedDoubleBarrier doubleBarrier = new DistributedDoubleBarrier(client, "/barrier/double", 10);
         ();//Every machine will be blocked here in enter
         // Until 10 machines call enter, they will be executed from enter
         //You can do some calculation tasks at this time

         ();//Every machine will be blocked in the leave until 10 machines call leave
         //You can continue to execute
     }
 }

 

6. Implement distributed counters based on Curator

If you really want to implement distributed counters, it is best to use Redis to implement them. Because Redis has higher concurrency, better performance, more powerful functions, and can also use lua scripts to embed in to implement complex business logic. However, Redis's natural asynchronous synchronization mechanism poses the risk of data asynchronous caused by machine downtime. However, the data synchronization mechanism of zk under the ZAB protocol will not cause the data out of synchronization.

//SharedCount: Implemented by the value of a node
 public class SharedCounterDemo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
         ();

         SharedCount sharedCount = new SharedCount(client, "/shared/count", 0);
         ();

         (new SharedCountListener() {
             public void countHasChanged(SharedCountReader sharedCountReader, int i) throws Exception {
                 ("Distributed counter has changed...");
             }
             public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                 ("The connection state has changed...");
             }
         });

         Boolean result = (1);
         (());
     }
 }

 

7. Implementing zk's node and child node monitoring mechanism based on Curator

(1) Child node monitoring mechanism based on Curator to implement zk

(2) Node data monitoring mechanism based on Curator to implement zk

 

We use zk mainly for:

1. Add, delete, modify and check metadata, and listen to changes in metadata

2. Conduct Leader elections

 

There are three types of nodes that can be listened to:

1. The child nodes listen to PathCache

2. Node listening to NodeCache

3. TreeCache listens to TreeCache below the entire node

 

(1) Child node monitoring mechanism based on Curator to implement zk

Here is an example of child node listening in PathCache implementation:

public class PathCacheDemo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
         ();

         PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/cluster", true);
         //cache is to cache the data in zk to the client
         //You can add a listener to the cached data to observe the changes in the data in zk
         ().addListener(new PathChildrenCacheListener() {
             public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {

             }
         });
         ();
     }
 }

(2) Node data monitoring mechanism based on Curator to implement zk

The following is an example of node listening in NodeCache implementation:

public class NodeCacheDemo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
        ();

        final NodeCache nodeCache = new NodeCache(client, "/cluster");
        ().addListener(new NodeCacheListener() {
            public void nodeChanged() throws Exception {
                Stat stat = ().forPath("/cluster");
                if (stat == null) {
                    
                } else {
                    ();
                }
            }
        });
        ();
    }
}

 

8. Source code analysis of creating client instances based on Curator

(1) Creating a CuratorFramework instance uses the constructor mode

(2) Creating a CuratorFramework instance will initialize the CuratorZooKeeperClient instance

 

(1) Creating a CuratorFramework instance uses the constructor mode

The () method uses the constructor mode. First, create the Builder instance object through the builder() method, then set all parameters to the properties of the Builder instance object, and finally pass the Builder instance object into the construction method of the target class through the build() method.

public class Demo {
     public static void main(String[] args) throws Exception {
       RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
       CuratorFramework client = (
               "127.0.0.1:2181", //zk's address
               5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
               3000, //Timeout time when connecting zk
               retryPolicy
       );
       ();
       ("Curator client has been started");
     }
 }

 public class CuratorFrameworkFactory {
     //Creating CuratorFramework instances uses the constructor mode
     public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {
         return builder().
             connectString(connectString).
             sessionTimeoutMs(sessionTimeoutMs).
             connectionTimeoutMs(connectionTimeoutMs).
             retryPolicy(retryPolicy).
             build();
     }
     ...
     public static Builder builder() {
         return new Builder();
     }
    
     public static class Builder {
         ...
         private EnsembleProvider ensembleProvider;
         private int sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS;
         private int connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS;
         private RetryPolicy retryPolicy;
         ...
         public Builder connectString(String connectString) {
             ensembleProvider = new FixedEnsembleProvider(connectString);
             return this;
         }
        
         public Builder sessionTimeoutMs(int sessionTimeoutMs) {
              = sessionTimeoutMs;
             return this;
         }
        
         public Builder connectionTimeoutMs(int connectionTimeoutMs) {
              = connectionTimeoutMs;
             return this;
         }
        
         public Builder retryPolicy(RetryPolicy retryPolicy) {
              = retryPolicy;
             return this;
         }
         ...
         public CuratorFramework build() {
             return new CuratorFrameworkImpl(this);
         }
     }
     ...
 }

 public class CuratorFrameworkImpl implements CuratorFramework {
     ...
     public CuratorFrameworkImpl( builder) {
         ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(());
          = new CuratorZookeeperClient(
             localZookeeperFactory,
             (),
             (),
             (),
             (),
             new Watcher() {//A watcher with zk is registered here
                 @Override
                 public void process(WatchedEvent watchedEvent) {
                     CuratorEvent event = new CuratorEventImpl(, , ().getIntValue(), unfixForNamespace(()), null, null, null, null, null, watchedEvent, null, null);
                     processEvent(event);
                 }
             },
             (),
             (),
             ()
         );
         ...
     }
     ...
 }

(2) Creating a CuratorFramework instance will initialize the CuratorZooKeeperClient instance

The CuratorFramework instance represents a zk client. When CuratorFramework is initialized, a CuratorZooKeeperClient instance is initialized.

 

CuratorZooKeeperClient is the client that Curator encapsulates ZooKeeper.

 

When initializing the CuratorZooKeeperClient, a Watcher listener will be passed in.

 

So the main work of the newClient() method of CuratorFrameworkFactory is: Initialize CuratorFramework -> Initialize CuratorZooKeeperClient -> Initialize ZookeeperFactory + Register a Watcher.

 

The client initiates a connection to zk and registers a Watcher listener, which is triggered by the start() method of CuratorFramework.

 

How to establish a connection with zk at startup

The start() method of ConnectionStateManager will start a thread to process eventQueue. The eventQueue stores the network connection change event with zk. When eventQueue receives such an event, it will notify the ConnectionStateListener.

 

The start() method of CuratorZookeeperClient will initialize the native zk client, establish a TCP long connection with the zk server, and will also register a ConnectionState type Watcher listener to receive notification events sent by the zk server.

public class CuratorFrameworkImpl implements CuratorFramework {
     private final CuratorZookeeperClient client;
     private final ConnectionStateManager connectionStateManager;
     private volatile ExecutorService executorService;
     ...
     public CuratorFrameworkImpl( builder) {
         ...
          = new CuratorZookeeperClient(...);
         connectionStateManager = new ConnectionStateManager(this, (),
             (),
             ().getSimulatedSessionExpirationPercent(),
             ()
         );
         ...
     }
     ...
     @Override
     public void start() {
         ("Starting");
         if (!(, )) {
             throw new IllegalStateException("Cannot be started more than once");
         }
         ...
         //1. Start a thread listening and zk network connection change event
         ();
         //2. Add a listener to monitor and zk network connection changes
         final ConnectionStateListener listener = new ConnectionStateListener() {
             @Override
             public void stateChanged(CuratorFramework client, ConnectionState newState) {
                 if ( == newState || == newState) {
                     (true);
                 }
             }
             @Override
             public boolean doNotDecorate() {
                 return true;
             }
         };
         ().addListener(listener);
         //3. Create a native zk client
         ();
         //4. Create a thread pool and perform background operations
         executorService = (threadFactory);
         (new Callable<Object>() {
             @Override
             public Object call() throws Exception {
                 backgroundOperationsLoop();
                 return null;
             }
         });
         if (ensembleTracker != null) {
             ();
         }
         (());
     }
     ...
 }

 public class ConnectionStateManager implements Closeable {
     private final ExecutorService service;
     private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
     ...
     public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerDecorator connectionStateListenerDecorator) {
         ...
         service = (threadFactory);
         ...
     }
     ...
     public void start() {
         ((, ), "Cannot be started more than once");
         //Start a thread
         (
             new Callable<Object>() {
                 @Override
                 public Object call() throws Exception {
                     processEvents();
                     return null;
                 }
             }
         );
     }
    
     private void processEvents() {
         while (() == ) {
             int useSessionTimeoutMs = getUseSessionTimeoutMs();
             long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : () - startOfSuspendedEpoch;
             long pollMaxMs = useSessionTimeoutMs - elapsedMs;

             final ConnectionState newState = (pollMaxMs, );
             if (newState != null) {
                 if (() == 0) {
                     ("There are no ConnectionStateListeners registered.");
                 }
                 (listener -> (client, newState));
             } else if (sessionExpirationPercent > 0) {
                 synchronized(this) {
                     checkSessionExpiration();
                 }
             }
         }
     }
     ...
 }

 public class CuratorZookeeperClient implements Closeable {
     private final ConnectionState state;
     ...
     public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
             int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,
             RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) {
         ...
         state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
         ...
     }
     ...
     public void start() throws Exception {
         ("Starting");
         if (!(false, true)) {
             throw new IllegalStateException("Already started");
         }
         ();
     }
     ...
 }

 class ConnectionState implements Watcher, Closeable {
     private final HandleHolder zooKeeper;
     ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
             int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher,
             AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) {
          = ensembleProvider;
          = sessionTimeoutMs;
          = connectionTimeoutMs;
          = tracer;
          = connectionHandlingPolicy;
         if (parentWatcher != null) {
             (parentWatcher);
         }
         //Register yourself as a Watcher with HandleHolder
         zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
     }
     ...
     void start() throws Exception {
         ("Starting");
         ();
         reset();
     }
    
     synchronized void reset() throws Exception {
         ("reset");
         ();
         (false);
         connectionStartMs = ();
         //Create a connection between client and zk
         ();
         ();//initiate connection
     }
     ...
 }

 class HandleHolder {
     private final ZookeeperFactory zookeeperFactory;
     private final Watcher watcher;
     private final EnsembleProvider ensembleProvider;
     private final int sessionTimeout;
     private final boolean canBeReadOnly;
     private volatile Helper helper;
     ...
     HandleHolder(ZookeeperFactory zookeeperFactory, Watcher watcher, EnsembleProvider ensembleProvider, int sessionTimeout, boolean canBeReadOnly) {
          = zookeeperFactory;
          = watcher;
          = ensembleProvider;
          = sessionTimeout;
          = canBeReadOnly;
     }
    
     private interface Helper {
         ZooKeeper getZooKeeper() throws Exception;
         String getConnectionString();
         int getNegotiatedSessionTimeoutMs();
     }
    
     ZooKeeper getZooKeeper() throws Exception {
         return (helper != null) ? () : null;
     }
    
     void closeAndReset() throws Exception {
         internalClose(0);
         helper = new Helper() {
             private volatile ZooKeeper zooKeeperHandle = null;
             private volatile String connectionString = null;
             @Override
             public ZooKeeper getZooKeeper() throws Exception {
                 synchronized(this) {
                     if (zooKeeperHandle == null) {
                         connectionString = ();
                         //Create a connection with zk and initialize the variable zooKeeperHandle
                         zooKeeperHandle = (connectionString, sessionTimeout, watcher, canBeReadOnly);
                     }
                     ...
                     return zooKeeperHandle;
                 }
             }
             @Override
             public String getConnectionString() {
                 return connectionString;
             }
             @Override
             public int getNegotiatedSessionTimeoutMs() {
                 return (zooKeeperHandle != null) ? () : 0;
             }
         };
     }
     ...
 }

 //Create a connection between client and zk
 public class DefaultZookeeperFactory implements ZookeeperFactory {
     @Override
     public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception {
         return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
     }
 }

 

10. Source code analysis of adding, deleting, modifying and checking nodes based on Curator

(1) Create znode node based on Curator

(2) Query znode node based on Curator

(3) Modify znode node based on Curator

(4) Delete znode node based on Curator

 

The CURD operation of Curator is done by calling the native API of zk.

 

(1) Create znode node based on Curator

Creating nodes also uses the constructor mode: first create a CreateBuilder instance through the create() method of CuratorFramework, then set the CreateBuilder variable through the CreateBuilder withMode() and other methods, and finally create the znode node through the CreateBuilder forPath() method + retry call.

 

When creating a node, the getZooKeeper() method of CuratorFramework will be called to obtain the zk client instance, and then the node will be created through the native zk client API.

public class Demo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         CuratorFramework client = (
             "127.0.0.1:2181", //zk's address
             5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
             3000, //Timeout time when connecting zk
             retryPolicy
         );
         ();
         ("Curator client has been started");
         //Create a node
         ().creatingParentsIfNeeded().withMode().forPath("/my/path", "100".getBytes());
     }
 }

 public class CuratorFrameworkImpl implements CuratorFramework {
     ...
     @Override
     public CreateBuilder create() {
         checkState();
         return new CreateBuilderImpl(this);
     }
     ...
 }

 public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String> {
     private final CuratorFrameworkImpl client;
     private CreateMode createMode;
     private Backgrounding backgrounding;
     private boolean createParentsIfNeeded;
     ...
     CreateBuilderImpl(CuratorFrameworkImpl client) {
          = client;
         createMode = ;
         backgrounding = new Backgrounding();
         acling = new ACLing(());
         createParentsIfNeeded = false;
         createParentsAsContainers = false;
         compress = false;
         setDataIfExists = false;
         storingStat = null;
         ttl = -1;
     }
    
     @Override
     public String forPath(final String givenPath, byte[] data) throws Exception {
         if (compress) {
             data = ().compress(givenPath, data);
         }

         final String adjustedPath = adjustPath((givenPath, ()));
         List<ACL> aclList = (adjustedPath);
         ().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);

         String returnPath = null;
         if (()) {
             pathInBackground(adjustedPath, data, givenPath);
         } else {
             //Create a node
             String path = protectedPathInForeground(adjustedPath, data, aclList);
             returnPath = (path);
         }
         return returnPath;
     }
    
     private String protectedPathInForeground(String adjustedPath, byte[] data, List<ACL> aclList) throws Exception {
         return pathInForeground(adjustedPath, data, aclList);
     }
    
     private String pathInForeground(final String path, final byte[] data, final List<ACL> aclList) throws Exception {
         OperationTrace trace = ().startAdvancedTracer("CreateBuilderImpl-Foreground");
         final AtomicBoolean firstTime = new AtomicBoolean(true);
         //Retry the call
         String returnPath = (
             (),
             new Callable<String>() {
                 @Override
                 public String call() throws Exception {
                     boolean localFirstTime = (false) && !debugForceFindProtectedNode;
                     (client, createMode);
                     String createdPath = null;
                     if (!localFirstTime && ()) {
                         debugForceFindProtectedNode = false;
                         createdPath = findProtectedNodeInForeground(path);
                     }
                     if (createdPath == null) {
                         //When creating a znode node, first call () to obtain the zk client instance
                         //Then we create nodes through the native zk client API
                         try {
                             if (client.isZk34CompatibilityMode()) {
                                 createdPath = ().create(path, data, aclList, createMode);
                             } else {
                                 createdPath = ().create(path, data, aclList, createMode, storingStat, ttl);
                             }
                         } catch (e) {
                             if (createParentsIfNeeded) {
                                 //This is the implementation of cascading node creation
                                 ((), path, false, (), createParentsAsContainers);
                                 if (client.isZk34CompatibilityMode()) {
                                     createdPath = ().create(path, data, (path), createMode);
                                 } else {
                                     createdPath = ().create(path, data, (path), createMode, storingStat, ttl);
                                 }
                             } else {
                                 throw e;
                             }
                         } catch (e) {
                             if (setDataIfExists) {
                                 Stat setStat = ().setData(path, data, setDataIfExistsVersion);
                                 if (storingStat != null) {
                                     (setStat, storingStat);
                                 }
                                 createdPath = path;
                             } else {
                                 throw e;
                             }
                         }
                     }
                     if (failNextCreateForTesting) {
                         failNextCreateForTesting = false;
                         throw new ();
                     }
                     return createdPath;
                 }
             }
         );
         (data).setPath(path).commit();
         return returnPath;
     }
     ...
 }

 public class CuratorFrameworkImpl implements CuratorFramework {
     private final CuratorZookeeperClient client;
     public CuratorFrameworkImpl( builder) {
         ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(());
          = new CuratorZookeeperClient(
             localZookeeperFactory,
             (),
             (),
             (),
             (),
             new Watcher() {
                 ...
             },
             (),
             (),
             ()
         );
         ...
     }
     ...
     ZooKeeper getZooKeeper() throws Exception {
         return ();
     }
     ...
 }

 public class CuratorZookeeperClient implements Closeable {
     private final ConnectionState state;
     ...
     public ZooKeeper getZooKeeper() throws Exception {
         ((), "Client is not started");
         return ();
     }
     ...
 }

 class ConnectionState implements Watcher, Closeable {
     private final HandleHolder zooKeeper;
     ...
     ZooKeeper getZooKeeper() throws Exception {
         if (()) {
             throw new ();
         }
         Exception exception = ();
         if (exception != null) {
             new EventTrace("background-exceptions", ()).commit();
             throw exception;
         }
         boolean localIsConnected = ();
         if (!localIsConnected) {
             checkTimeouts();
         }
         //Get ZooKeeper instance through HandleHolder
         return ();
     }
     ...
 }

(2) Query znode node based on Curator

The query node also uses the constructor mode: first create a GetDataBuilder instance through the getData() method of CuratorFramework, and then query the znode node through the forPath() method of GetDataBuilder + retry call.

public class Demo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         CuratorFramework client = (
             "127.0.0.1:2181", //zk's address
             5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
             3000, //Timeout time when connecting zk
             retryPolicy
         );
         ();
         ("Curator client has been started");

         //Query node
         byte[] dataBytes = ().forPath("/my/path");
         (new String(dataBytes));
         //Query child nodes
         List<String> children = ().forPath("/my");
         (children);
     }
 }

 public class CuratorFrameworkImpl implements CuratorFramework {
     ...
     @Override
     public GetDataBuilder getData() {
         checkState();
         return new GetDataBuilderImpl(this);
     }
    
     @Override
     public GetChildrenBuilder getChildren() {
         checkState();
         return new GetChildrenBuilderImpl(this);
     }
     ...
 }

 public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, ErrorListenerPathable<byte[]> {
     private final CuratorFrameworkImpl client;
     ...
     @Override
     public byte[] forPath(String path) throws Exception {
         ().getSchema(path).validateWatch(path, () || ());
         path = (path);
         byte[] responseData = null;
         if (()) {
             (new OperationAndData<String>(this, path, (), null, (), watching), null);
         } else {
             //Query node
             responseData = pathInForeground(path);
         }
         return responseData;
     }
    
     private byte[] pathInForeground(final String path) throws Exception {
         OperationTrace trace = ().startAdvancedTracer("GetDataBuilderImpl-Foreground");
         //Retry the call
         byte[] responseData = (
             (),
             new Callable<byte[]>() {
                 @Override
                 public byte[] call() throws Exception {
                     byte[] responseData;
                     //Get the native zk client instance through CuratorFramework, and then call its getData() to get the node
                     if (()) {
                         responseData = ().getData(path, true, responseStat);
                     } else {
                         responseData = ().getData(path, (path), responseStat);
                         ((), false);
                     }
                     return responseData;
                 }
             }
         );
         (responseData).setPath(path).setWithWatcher(()).setStat(responseStat).commit();
         return decompress ? ().decompress(path, responseData) : responseData;
     }
     ...
 }

(3) Modify znode node based on Curator

Modifying nodes also uses the constructor mode: first create a SetDataBuilder instance through the setData() method of CuratorFramework, and then modify the znode node through the forPath() method of SetDataBuilder + retry call.

public class Demo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         CuratorFramework client = (
             "127.0.0.1:2181", //zk's address
             5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
             3000, //Timeout time when connecting zk
             retryPolicy
         );
         ();
         ("Curator client has been started");

         //Modify node
         ().forPath("/my/path", "110".getBytes());
         byte[] dataBytes = ().forPath("/my/path");
         (new String(dataBytes));
     }
 }

 public class CuratorFrameworkImpl implements CuratorFramework {
     ...
     @Override
     public SetDataBuilder setData() {
         checkState();
         return new SetDataBuilderImpl(this);
     }
     ...
 }

 public class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<Stat> {
     private final CuratorFrameworkImpl client;
     ...
     @Override
     public Stat forPath(String path, byte[] data) throws Exception {
         ().getSchema(path).validateGeneral(path, data, null);
         if (compress) {
             data = ().compress(path, data);
         }
         path = (path);
         Stat resultStat = null;
         if (()) {
             (new OperationAndData<>(this, new PathAndBytes(path, data), (), null, (), null), null);
         } else {
             //Modify node
             resultStat = pathInForeground(path, data);
         }
         return resultStat;
     }
    
     private Stat pathInForeground(final String path, final byte[] data) throws Exception {
         OperationTrace trace = ().startAdvancedTracer("SetDataBuilderImpl-Foreground");
         //Retry the call
         Stat resultStat = (
             (),
             new Callable<Stat>() {
                 @Override
                 public Stat call() throws Exception {
                     //Get the native zk client instance through CuratorFramework, and then call its setData() to modify the node
                     return ().setData(path, data, version);
                 }
             }
         );
         (data).setPath(path).setStat(resultStat).commit();
         return resultStat;
     }
     ...
 }

(4) Delete znode node based on Curator

Deleting nodes also uses the constructor mode: first create a DeleteBuilder instance through the delete() method of CuratorFramework, and then delete the znode node through the forPath() method of DeleteBuilder + retry call.

public class Demo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         CuratorFramework client = (
             "127.0.0.1:2181", //zk's address
             5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
             3000, //Timeout time when connecting zk
             retryPolicy
         );
         ();
         ("Curator client has been started");

         //Delete node
         ().forPath("/my/path");
     }
 }

 public class CuratorFrameworkImpl implements CuratorFramework {
     ...
     @Override
     public DeleteBuilder delete() {
         checkState();
         return new DeleteBuilderImpl(this);
     }
     ...
 }

 public class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>, ErrorListenerPathable<Void> {
     private final CuratorFrameworkImpl client;
     ...
     @Override
     public Void forPath(String path) throws Exception {
         ().getSchema(path).validateDelete(path);
         final String unfixedPath = path;
         path = (path);
         if (()) {
             <String> errorCallback = null;
             if (guaranteed) {
                 errorCallback = new <String>() {
                     @Override
                     public void retriesExhausted(OperationAndData<String> operationAndData) {
                         ().addFailedOperation(unfixedPath);
                     }
                 };
             }
             (new OperationAndData<String>(this, path, (), errorCallback, (), null), null);
         } else {
             //Delete node
             pathInForeground(path, unfixedPath);
         }
         return null;
     }

     private void pathInForeground(final String path, String unfixedPath) throws Exception {
         OperationTrace trace = ().startAdvancedTracer("DeleteBuilderImpl-Foreground");
         //Retry the call
         (
             (),
             new Callable<Void>() {
                 @Override
                 public Void call() throws Exception {
                     try {
                         //Get the native zk client instance through CuratorFramework, and then call its delete() to delete the node
                         ().delete(path, version);
                     } catch (e) {
                         if (!quietly) {
                             throw e;
                         }
                     } catch (e) {
                         if (deletingChildrenIfNeeded) {
                             ((), path, true);
                         } else {
                             throw e;
                         }
                     }
                     return null;
                 }
             }
         );
         (path).commit();
     }
 }

 

Implementation source code of node monitoring callback mechanism

(1) Implementation source code of PathCache child node monitoring mechanism

(2) Implementation source code of NodeCache node monitoring mechanism

(3) Description of the asynchronous callback registration of children's nodes and background for asynchronous callbacks

(4) PathCache realizes the effect of automatically re-registering the listener

(5) NodeCache realizes the effect of monitoring node change event

 

(1) Implementation source code of PathCache child node monitoring mechanism

PathChildrenCache will call the getChildren() method of the native zk client object and pass a listener childrenWatcher into the method. When an event occurs in a child node, the childWatcher will be notified, and the watcher will call the Listener registered to PathChildrenCache. Note: The watcher will be registered repeatedly in the incoming listener Watcher.

public class Demo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         CuratorFramework client = (
             "127.0.0.1:2181", //zk's address
             5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
             3000, //Timeout time when connecting zk
             retryPolicy
         );
         ();
         ("Curator client has been started");

         //PathCache, listen for changes in child nodes under /cluster
         PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/cluster", true);
         ().addListener(new PathChildrenCacheListener() {
             public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                 ...
             }
         });
         ();
     }
 }

 public class PathChildrenCache implements Closeable {
     private final WatcherRemoveCuratorFramework client;
     private final String path;
     private final boolean cacheData;
     private final boolean dataIsCompressed;
     private final CloseableExecutorService executorService;
     private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
     ...
     //initialization
     public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) {
          = ();
          = (path);
          = cacheData;
          = dataIsCompressed;
          = executorService;
         ensureContainers = new EnsureContainers(client, path);
     }
    
     //Get the container listeners used to store the Listener
     public ListenerContainer<PathChildrenCacheListener> getListenable() {
         return listeners;
     }
    
     //Start listening to child nodes
     public void start() throws Exception {
         start();
     }
    
     private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
         @Override
         public void stateChanged(CuratorFramework client, ConnectionState newState) {
             // Handle the changes in connection status
             handleStateChange(newState);
         }
     };
    
     public void start(StartMode mode) throws Exception {
         ...
         //Add Listener to the established zk connection
         ().addListener(connectionStateListener);
         ...
         //PathChildrenCache into RefreshOperation
         //The following code is actually calling PathChildrenCache's refresh() method
         offerOperation(new RefreshOperation(this, ));
         ...
     }
    
     //Submit a task to the thread pool for processing
     void offerOperation(final Operation operation) {
         if ((operation)) {
             submitToExecutor(
                 new Runnable() {
                     @Override
                     public void run() {
                         ...
                         (operation);
                         //In fact, it is to call PathChildrenCache's refresh() method
                         ();
                         ...
                     }
                 }
             );
         }
     }
    
     private synchronized void submittedToExecutor(final Runnable command) {
         if (() == ) {
             //Submit a task to the thread pool for processing
             (command);
         }
     }
     ...
 }

 class RefreshOperation implements Operation {
     private final PathChildrenCache cache;
     private final mode;
    
     RefreshOperation(PathChildrenCache cache, mode) {
          = cache;
          = mode;
     }
    
     @Override
     public void invoke() throws Exception {
         //Calling PathChildrenCache refresh method, that is, initiate listening to child nodes
         (mode);
     }
     ...
 }

 public class PathChildrenCache implements Closeable {
     ...
     private volatile Watcher childrenWatcher = new Watcher() {
         //Repeat the listener
         //When a child node changes event, the method will be called
         @Override
         public void process(WatchedEvent event) {
             //The following code is actually calling PathChildrenCache's refresh() method
             offerOperation(new RefreshOperation(, ));
         }
     };
    
     void refresh(final RefreshMode mode) throws Exception {
         ensurePath();
         //Create a callback, and the execution of the callback will be triggered if the following execution () is successful
         final BackgroundCallback callback = new BackgroundCallback() {
             @Override
             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                 if (reRemoveWatchersOnBackgroundClosed()) {
                     return;
                 }
                 if (() == ()) {
                     //Processing child node data
                     processChildren((), mode);
                 } else if (() == ()) {
                     if (mode == RefreshMode.NO_NODE_EXCEPTION) {
                         (" received for getChildren() and refresh has failed. Resetting ensureContainers but not refreshing. Path: [{}]", path);
                         ();
                     } else {
                         (" received for getChildren(). Resetting ensureContainers. Path: [{}]", path);
                         ();
                         offerOperation(new RefreshOperation(, RefreshMode.NO_NODE_EXCEPTION));
                     }
                 }
             }
         };
         //The following code will finally call the getChildren method of the native zk client to initiate listening to the child nodes
         //And add a listener called childrenWatcher and a background asynchronous callback called callback
         ().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
     }
     ...
 }

 // When a child node changes event, the invoke() method that executes EventOperation will be triggered in the end
 class EventOperation implements Operation {
     private final PathChildrenCache cache;
     private final PathChildrenCacheEvent event;
    
     EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event) {
          = cache;
          = event;
     }
    
     @Override
     public void invoke() {
         //Calling the Listener of PathChildrenCache
         (event);
     }
     ...
 }

(2) Implementation source code of NodeCache node monitoring mechanism

NodeCache will call the exists() method of the native zk client object and pass a listener watcher into the method. When an event occurs in a child node, the watcher will be notified, and the watcher will call the Listener registered to NodeCache. Note: The watcher will be registered repeatedly in the incoming listener Watcher.

public class Demo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         CuratorFramework client = (
             "127.0.0.1:2181", //zk's address
             5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
             3000, //Timeout time when connecting zk
             retryPolicy
         );
         ();
         ("Curator client has been started");

         //NodeCache
         final NodeCache nodeCache = new NodeCache(client, "/cluster");
         ().addListener(new NodeCacheListener() {
             public void nodeChanged() throws Exception {
                 Stat stat = ().forPath("/cluster");
                 if (stat == null) {
                 } else {
                     ();
                 }
             }
         });
         ();
     }
 }

 public class NodeCache implements Closeable {
     private final WatcherRemoveCuratorFramework client;
     private final String path;
     private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
     ...
     private ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
         @Override
         public void stateChanged(CuratorFramework client, ConnectionState newState) {
             if ((newState == ) || (newState == )) {
                 if ((false, true)) {
                     reset();
                 }
             } else {
                 (false);
             }
         }
     };
    
     //Initialize a Watcher and add it to the () method executed by the reset() method as a listener
     private Watcher watcher = new Watcher() {
         //Repeat the listener
         @Override
         public void process(WatchedEvent event) {
             reset();
         }
     };
    
     //Initialize a callback, the callback will be executed when the reset() method is executed successfully () below.
     private final BackgroundCallback backgroundCallback = new BackgroundCallback() {
         @Override
         public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
             processBackgroundResult(event);
         }
     };
    
     //Initialize NodeCache
     public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) {
          = ();
          = (path);
          = dataIsCompressed;
     }
    
     //Get the ListenerContainer that stores Listener
     public ListenerContainer<NodeCacheListener> getListenable() {
         (() != , "Closed");
         return listeners;
     }
    
     //Start listening to node
     public void start() throws Exception {
         start(false);
     }
    
     public void start(boolean buildInitial) throws Exception {
         ((, ), "Cannot be started more than once");
         //Add Listener to the established zk connection
         ().addListener(connectionStateListener);
         if (buildInitial) {
             //Calling the exists() method of the native zk client to listen to the node
             ().creatingParentContainersIfNeeded().forPath(path);
             internalRebuild();
         }
         reset();
     }
    
     private void reset() throws Exception {
         if ((() == ) && ()) {
             //The following code will finally call the exists() method of the native zk client to listen to the node
             //And add a listener called watcher and a background asynchronous callback called backgroundCallback
             ().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
         }
     }
    
     private void processBackgroundResult(CuratorEvent event) throws Exception {
         switch (()) {
             case GET_DATA: {
                 if (() == ()) {
                     ChildData childData = new ChildData(path, (), ());
                     setNewData(childData);
                 }
                 break;
             }
             case EXISTS: {
                 if (() == ()) {
                     setNewData(null);
                 } else if (() == ()) {
                     if (dataIsCompressed) {
                         ().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
                     } else {
                         ().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
                     }
                 }
                 break;
             }
         }
     }
     ...
 }

(3) Description of the asynchronous callback registration of children's nodes and background for asynchronous callbacks

The watcher registered by the getChildren() method is only one-time, and the registered callback is an asynchronous callback.

public class Demo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         CuratorFramework client = (
             "127.0.0.1:2181", //zk's address
             5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
             3000, //Timeout time when connecting zk
             retryPolicy
         );
         ();
         ("The Curator client has been started and the connection to zk has been completed");

         ().creatingParentsIfNeeded().withMode().forPath("/test", "10".getBytes());
         ("Create Node'/test");

         ().usingWatcher(new CuratorWatcher() {
             public void process(WatchedEvent event) throws Exception {
                 //As long as the zk node changes are notified, it will not be notified here again
                 //It is the first notification that is valid. After being executed once, it will not be executed again.
                 ("Received a notification from zk: " + event);
             }
         }).inBackground(new BackgroundCallback() {
             //Background callback notification means that () will be executed asynchronously in the background
             //As soon asynchronous execution of the () method in the background will be called back to notify
             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                 ("Received a background callback notification: " + event);
             }
         }).forPath("/test");
     }
 }

(4) PathCache realizes the effect of automatically re-registering the listener

Whenever a node changes, the call to the childEvent() method is triggered.

public class Demo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         final CuratorFramework client = (
             "127.0.0.1:2181", //zk's address
             5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
             3000, //Timeout time when connecting zk
             retryPolicy
         );
         ();
         ("The Curator client has been started and the connection to zk has been completed");

         final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/test", true);
         ().addListener(new PathChildrenCacheListener() {
             //As long as the child node changes, no matter how many times it changes, each change will trigger the call of childEvent here.
             public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                 ("The listening child node changed, and an event notification was received: " + pathChildrenCacheEvent);
             }
         });
         ();
         ("Complete the listening and startup of the child node");
     }
 }

(5) NodeCache realizes the effect of monitoring node change event

Whenever a node changes, the call to the nodeChanged() method will be triggered.

public class Demo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         final CuratorFramework client = (
             "127.0.0.1:2181", //zk's address
             5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
             3000, //Timeout time when connecting zk
             retryPolicy
         );
         ();
         ("The Curator client has been started and the connection to zk has been completed");

         final NodeCache nodeCache = new NodeCache(client, "/test/child/id");
         ().addListener(new NodeCacheListener() {
             //As long as the node changes, no matter how many times it changes, each change will trigger the call of nodeChanged here.
             public void nodeChanged() throws Exception {
                 Stat stat = ().forPath("/test/child/id");
                 if (stat != null) {
                     byte[] dataBytes = ().forPath("/test/child/id");
                     ("Node data has changed:" + new String(dataBytes));
                 } else {
                     ("Node was deleted");
                 }
             }
         });
         ();
     }
 }

 

12. Implementation source code of Leader election mechanism based on Curator

(1) Source code of LeaderLatch, the first Leader election mechanism

(2) Source code of the second Leader election mechanism LeaderSelector

 

Using Curator's CRUD+ listening callback mechanism can meet the scenarios in which most systems use zk. It should be noted that if you use the native zk to register a listener to a node or child node, when the corresponding event occurs in the node or child node, the client will be notified once, but the next time there will be a corresponding event, it will not be notified. When using zk's native API, the client needs to re-register the listener every time he receives an event notification. However, Curator's PathCache + NodeCache will automatically re-register the listener.

 

(1) Source code of LeaderLatch, the first Leader election mechanism

The Curator client will compete to become a leader by creating temporary sequential nodes. The implementation of LeaderLatch's election is almost the same as that of distributed locks.

 

After each Curator client creates a temporary sequential node, it will call the getChildren() method on the /leader/latch directory to get all the child nodes. The result of calling the getChildren() method will be notified through the backgroundCallback callback. Then the client sorts the obtained child nodes to determine whether it is the first child node.

 

If the client finds that it is the first child node, it is the Leader. If the client finds that it is not the first child node, add a listener to the previous node. When adding a listener, you will use the getData() method to get your previous node. After the getData() method is successfully executed, the backgroundCallback callback will be called.

 

When the client corresponding to the previous node releases the Leader role, the previous node will disappear. At this time, the client corresponding to the second node will be notified to execute the listener added by the getData() method.

 

So if the listener of the getData() method is triggered, that is, it is found that the previous node does not exist, the client will call the getChildren() method to re-get the child node list and determine whether it is a Leader.

 

Note: Using getData() instead of exists() can avoid resource leakage caused by unnecessary Watchers.

public class Demo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         final CuratorFramework client = (
             "127.0.0.1:2181", //zk's address
             5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
             3000, //Timeout time when connecting zk
             retryPolicy
         );
         ().addListener(new ConnectionStateListener() {
             public void stateChanged(CuratorFramework client, ConnectionState newState) {
                 switch (newState) {
                     case LOST:
                         // When the Leader is disconnected from zk, the current Leader needs to be suspended
                 }
             }
         });
         ();
         ("The Curator client has been started and the connection to zk has been completed");

         LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");
         ();
         ();//Block and wait until the current client becomes the Leader
         Boolean hasLeaderShip = ();
         ("Whether to become a Leader: " + hasLeaderShip);
     }
 }

 public class LeaderLatch implements Closeable {
     private final WatcherRemoveCuratorFramework client;
     private final ConnectionStateListener listener = new ConnectionStateListener() {
         @Override
         public void stateChanged(CuratorFramework client, ConnectionState newState) {
             handleStateChange(newState);
         }
     };
     ...
     //Add this instance to the leadership election and attempt to acquire leadership.
     public void start() throws Exception {
         ...
         //Add Listener to the established zk connection
         ().addListener(listener);
         reset();
         ...
     }
    
     @VisibleForTesting
     void reset() throws Exception {
         setLeadership(false);
         setNode(null);
         //Callback is a callback after successfully creating temporary sequential nodes
         BackgroundCallback callback = new BackgroundCallback() {
             @Override
             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                 ...
                 if (() == ()) {
                     setNode(());
                     if (() == ) {
                         setNode(null);
                     } else {
                         //Successfully create temporary sequential nodes, you need to use getChildren() to get the child node list
                         getChildren();
                     }
                 } else {
                     ("getChildren() failed. rc = " + ());
                 }
             }
         };
         //Create temporary sequential nodes
         ().creatingParentContainersIfNeeded().withProtection()
             .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback)
             .forPath((latchPath, LOCK_NAME), (id));
     }
    
     //Get the list of child nodes
     private void getChildren() throws Exception {
         //Callback is a callback after successfully obtaining the child node list
         BackgroundCallback callback = new BackgroundCallback() {
             @Override
             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                 if (() == ()) {
                     checkLeadership(());
                 }
             }
         };
         ().inBackground(callback).forPath((latchPath, null));
     }
    
     //Check whether you are the first node
     private void checkLeadership(List<String> children) throws Exception {
         if (debugCheckLeaderShipLatch != null) {
             ();
         }
         final String localOurPath = ();
         //Sort the obtained nodes
         List<String> sortedChildren = (LOCK_NAME, sorter, children);
         int ourIndex = (localOurPath != null) ? ((localOurPath)) : -1;
         if (ourIndex < 0) {
             ("Can't find our node. Resetting. Index: " + ourIndex);
             reset();
         } else if (ourIndex == 0) {
             // If you are the first node, mark yourself as Leader
             setLeadership(true);
         } else {
             //If you are not the first node, add listening to the previous node
             String watchPath = (ourIndex - 1);
             Watcher watcher = new Watcher() {
                 @Override
                 public void process(WatchedEvent event) {
                     if ((() == ) && (() == ) && (localOurPath != null)) {
                         //Reget the child node list
                         getChildren();
                     }
                 }
             };
             BackgroundCallback callback = new BackgroundCallback() {
                 @Override
                 public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                     if (() == ()) {
                         reset();
                     }
                 }
             };
             //use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
             //Using getData() instead of exists() can avoid resource leakage caused by unnecessary Watchers
             ().usingWatcher(watcher).inBackground(callback).forPath((latchPath, watchPath));
         }
     }
     ...
     //Block and wait until it becomes a leader
     public void await() throws InterruptedException, EOFException {
         synchronized(this) {
             while ((() == ) && !()) {
                 wait();//The wait() method of the Objetc object, blocking and waiting
             }
         }
         if (() != ) {
             throw new EOFException();
         }
     }
    
     //Set the current client to become a leader and notifyAll() to notify the blocking thread before blocking
     private synchronized void setLeadership(boolean newValue) {
         boolean oldValue = (newValue);
         if (oldValue && !newValue) { // Lost leadership, was true, now false
             (new Function<LeaderLatchListener, Void>() {
                 @Override
                 public Void apply(LeaderLatchListener listener) {
                     ();
                     return null;
                 }
             });
         } else if (!oldValue && newValue) { // Gained leadership, was false, now true
             (new Function<LeaderLatchListener, Void>() {
                 @Override
                 public Void apply(LeaderLatchListener input) {
                     ();
                     return null;
                 }
             });
         }
         notifyAll();//The thread that executed the wait() method before wakes up
     }
 }

(2) Source code of the second Leader election mechanism LeaderSelector

By judging whether a distributed lock was successfully acquired, we can judge whether to compete to become a leader. Because it is the leader by holding a distributed lock, the() method cannot be exited, otherwise the lock will be released. Once the lock is released, other clients will compete for the lock and become the new leader.

public class Demo {
     public static void main(String[] args) throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         final CuratorFramework client = (
             "127.0.0.1:2181", //zk's address
             5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
             3000, //Timeout time when connecting zk
             retryPolicy
         );
         ();
         ("The Curator client has been started and the connection to zk has been completed");

         LeaderSelector leaderSelector = new LeaderSelector(
             Client,
             "/leader/election",
             new LeaderSelectorListener() {
                 public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                     ("You have become the Leader...");
                     //Do everything about Leader here, the method cannot exit at this time
                     (Integer.MAX_VALUE);
                 }
                 public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                     ("The change in connection state is no longer a leader...");
                     if (()) {
                         throw new CancelLeadershipException();
                     }
                 }
             }
         );
         ();//Try to compete with other nodes on the node "/leader/election" to become a leader
         (Integer.MAX_VALUE);
     }
 }

 public class LeaderSelector implements Closeable {
     private final CuratorFramework client;
     private final LeaderSelectorListener listener;
     private final CloseableExecutorService executorService;
     private final InterProcessMutex mutex;
     ...
     public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener) {
         (client, "client cannot be null");
         (leaderPath);
         (listener, "listener cannot be null");

          = client;
          = new WrappedListener(this, listener);
         hasLeadership = false;
          = executorService;
         //Initialize a distributed lock
         mutex = new InterProcessMutex(client, leaderPath) {
             @Override
             protected byte[] getLockNodeBytes() {
                 return (() > 0) ? getIdBytes(id) : null;
             }
         };
     }
    
     public void start() {
         ((, ), "Cannot be started more than once");
         (!(), "Already started");
         (!hasLeadership, "Already has leadership");
         ().addListener(listener);
         request();
     }
    
     public boolean request() {
         (() == , "close() has already been called");
         return internalRequeue();
     }
    
     private synchronized boolean internalRequeue() {
         if (!isQueued && (() == )) {
             isQueued = true;
             //Terminate the election work as a task to the thread pool for execution
             Future<Void> task = (new Callable<Void>() {
                 @Override
                 public Void call() throws Exception {
                     ...
                     doWorkLoop();
                     ...
                     return null;
                 }
             });
             (task);
             return true;
         }
         return false;
     }
    
     private void doWorkLoop() throws Exception {
         ...
         doWork();
         ...
     }
    
     @VisibleForTesting
     void doWork() throws Exception {
         hasLeadership = false;
         try {
             //Try to obtain a distributed lock, if the acquisition fails, it will block
             ();
             //Execute this line of code, which means that the distributed lock is successfully obtained
             hasLeadership = true;
             try {
                 if (debugLeadershipLatch != null) {
                     ();
                 }
                 if (debugLeadershipWaitLatch != null) {
                     ();
                 }
                 //The takeLeadership() method rewritten by the callback user
                 (client);
             } catch (InterruptedException e) {
                 ().interrupt();
                 throw e;
             } catch (Throwable e) {
                 (e);
             } finally {
                 clearIsQueued();
             }
         } catch (InterruptedException e) {
             ().interrupt();
             throw e;
         } finally {
             if (hasLeadership) {
                 hasLeadership = false;
                 boolean wasInterrupted = (); // clear any interrupted tattoo so that () works immediately
                 try {
                     //Release the lock
                     ();
                 } catch (Exception e) {
                     if (failedMutexReleaseCount != null) {
                         ();
                     }
                     (e);
                     ("The leader throw an exception", e);
                 } finally {
                     if (wasInterrupted) {
                         ().interrupt();
                     }
                 }
             }
         }
     }
     ...
 }