Outline
1. Basic zk data operations based on Curator
2. Implement cluster metadata management based on Curator
3. Implement HA master-secure automatic switching based on Curator
4. Implement Leader election based on Curator
5. Implement distributed Barrier based on Curator
6. Implement distributed counters based on Curator
7. Implementing zk's node and child node monitoring mechanism based on Curator
8. Source code analysis of creating client instances based on Curator
How to establish a connection with zk at startup
10. Source code analysis of adding, deleting, modifying and checking nodes based on Curator
11. Implementation of source code of node listening callback mechanism based on Curator
12. Implementation source code of Leader election mechanism based on Curator
1. Basic zk data operations based on Curator
Guava is to Java what Curator is to ZooKeeper, the following is introduced:
<dependencies>
<dependency>
<groupId></groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
</dependencies>
The example of Curator implementing the addition, deletion, modification and search of znode is as follows, where CuratorFramework represents a client instance. Note: The cascade creation of the specified node can be performed through the creationParentsIfNeeded() method.
public class CrudDemo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
();//Start the client and establish a connection
("Curator client has been started");
()
.creatingParentsIfNeeded()//Create cascade
.withMode()//Specify the node type
.forPath("/my/path", "10".getBytes());//Increase
byte[] dataBytes = ().forPath("/my/path");//Check
(new String(dataBytes));
().forPath("/my/path", "11".getBytes());//Change
dataBytes = ().forPath("/my/path");
(new String(dataBytes));
List<String> children = ().forPath("/my");//Check
(children);
().forPath("/my/path");//Delete
(Integer.MAX_VALUE);
}
}
2. Implement cluster metadata management based on Curator
Curator can operate zk. For example, a self-developed distributed system is similar to Kafka and Canal, and wants to put all the core metadata of the cluster operation into zk. At this time, you can create some znodes through Curator and write the corresponding value into it.
It is recommended to use json format for the written values. For example, Kafka writes json format data to zk. In this way, other clients can read cluster metadata from it when they need it.
3. Implement HA master-secure automatic switching based on Curator
HDFS, Kafka, and Canal all use zk for Leader elections, so HA master and standby automatic switching can be achieved based on Curator.
HDFS NameNode can deploy HA architecture, with two main and backup machines. If the main machine goes down, the backup machine can sense and elect as the Leader, so the backup machine can provide services to the outside world as the new NameNode.
The controller in Kafka is responsible for managing the collaboration of the entire cluster. Any Broker in Kafka can become a controller, similar to the role of Leader.
Canal will also deploy two main and backup machines. If the main machine is hung up, the backup machine can follow.
4. Implement Leader election based on Curator
(1) LeaderLatch, the first way to implement Leader elections by Curator
(2) The second way of Curator implementing Leader elections: LeaderSelector
(1) LeaderLatch, the first way to implement Leader elections by Curator
Implement Leader election through Curator's LeaderLatch:
public class LeaderLatchDemo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
();
//"/leader/latch" This is actually a znode order node
LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");
();
();// Until he waits for him to become the leader and then executes it later
//Similar to HDFS, two machines, one of which becomes the leader, starts working
// Another machine can block here through await until the leader hangs, and it will become the leader and continue to work
Boolean hasLeaderShip = ();//Judge whether to become a Leader
("Whether to become a leader:" + hasLeaderShip);
(Integer.MAX_VALUE);
}
}
(2) The second way of Curator implementing Leader elections: LeaderSelector
The Leader election is implemented through Curator's LeaderSelector as follows: Among them, LeaderSelector has two listeners that can pay attention to the connection status.
public class LeaderSelectorDemo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
();
LeaderSelector leaderSelector = new LeaderSelector(
Client,
"/leader/election",
new LeaderSelectorListener() {
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
("You have become the Leader...");
//Do everything about Leader here, the method cannot exit at this time
(Integer.MAX_VALUE);
}
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
("The change in connection state is no longer a leader...");
if (()) {
throw new CancelLeadershipException();
}
}
}
);
();//Try to compete with other nodes on the node "/leader/election" to become a leader
(Integer.MAX_VALUE);
}
}
5. Distributed Barrier based on Curator implementation
(1) Distributed Barrier
(2)Distributed Dual Barrier
(1) Distributed Barrier
Many machines can create a Barrier, and they are blocked at this time. They will not be blocked unless a condition (setBarrier() or removeBarrier()) is satisfied.
//DistributedBarrier
public class DistributedBarrierDemo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
();
DistributedBarrier barrier = new DistributedBarrier(client, "/barrier");
();
}
}
(2)Distributed Dual Barrier
//DistributedDoubleBarrier
public class DistributedDoubleBarrierDemo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
();
DistributedDoubleBarrier doubleBarrier = new DistributedDoubleBarrier(client, "/barrier/double", 10);
();//Every machine will be blocked here in enter
// Until 10 machines call enter, they will be executed from enter
//You can do some calculation tasks at this time
();//Every machine will be blocked in the leave until 10 machines call leave
//You can continue to execute
}
}
6. Implement distributed counters based on Curator
If you really want to implement distributed counters, it is best to use Redis to implement them. Because Redis has higher concurrency, better performance, more powerful functions, and can also use lua scripts to embed in to implement complex business logic. However, Redis's natural asynchronous synchronization mechanism poses the risk of data asynchronous caused by machine downtime. However, the data synchronization mechanism of zk under the ZAB protocol will not cause the data out of synchronization.
//SharedCount: Implemented by the value of a node
public class SharedCounterDemo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
();
SharedCount sharedCount = new SharedCount(client, "/shared/count", 0);
();
(new SharedCountListener() {
public void countHasChanged(SharedCountReader sharedCountReader, int i) throws Exception {
("Distributed counter has changed...");
}
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
("The connection state has changed...");
}
});
Boolean result = (1);
(());
}
}
7. Implementing zk's node and child node monitoring mechanism based on Curator
(1) Child node monitoring mechanism based on Curator to implement zk
(2) Node data monitoring mechanism based on Curator to implement zk
We use zk mainly for:
1. Add, delete, modify and check metadata, and listen to changes in metadata
2. Conduct Leader elections
There are three types of nodes that can be listened to:
1. The child nodes listen to PathCache
2. Node listening to NodeCache
3. TreeCache listens to TreeCache below the entire node
(1) Child node monitoring mechanism based on Curator to implement zk
Here is an example of child node listening in PathCache implementation:
public class PathCacheDemo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
();
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/cluster", true);
//cache is to cache the data in zk to the client
//You can add a listener to the cached data to observe the changes in the data in zk
().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
}
});
();
}
}
(2) Node data monitoring mechanism based on Curator to implement zk
The following is an example of node listening in NodeCache implementation:
public class NodeCacheDemo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final CuratorFramework client = ("localhost:2181", 5000, 3000, retryPolicy);
();
final NodeCache nodeCache = new NodeCache(client, "/cluster");
().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
Stat stat = ().forPath("/cluster");
if (stat == null) {
} else {
();
}
}
});
();
}
}
8. Source code analysis of creating client instances based on Curator
(1) Creating a CuratorFramework instance uses the constructor mode
(2) Creating a CuratorFramework instance will initialize the CuratorZooKeeperClient instance
(1) Creating a CuratorFramework instance uses the constructor mode
The () method uses the constructor mode. First, create the Builder instance object through the builder() method, then set all parameters to the properties of the Builder instance object, and finally pass the Builder instance object into the construction method of the target class through the build() method.
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = (
"127.0.0.1:2181", //zk's address
5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
3000, //Timeout time when connecting zk
retryPolicy
);
();
("Curator client has been started");
}
}
public class CuratorFrameworkFactory {
//Creating CuratorFramework instances uses the constructor mode
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
build();
}
...
public static Builder builder() {
return new Builder();
}
public static class Builder {
...
private EnsembleProvider ensembleProvider;
private int sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS;
private int connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS;
private RetryPolicy retryPolicy;
...
public Builder connectString(String connectString) {
ensembleProvider = new FixedEnsembleProvider(connectString);
return this;
}
public Builder sessionTimeoutMs(int sessionTimeoutMs) {
= sessionTimeoutMs;
return this;
}
public Builder connectionTimeoutMs(int connectionTimeoutMs) {
= connectionTimeoutMs;
return this;
}
public Builder retryPolicy(RetryPolicy retryPolicy) {
= retryPolicy;
return this;
}
...
public CuratorFramework build() {
return new CuratorFrameworkImpl(this);
}
}
...
}
public class CuratorFrameworkImpl implements CuratorFramework {
...
public CuratorFrameworkImpl( builder) {
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(());
= new CuratorZookeeperClient(
localZookeeperFactory,
(),
(),
(),
(),
new Watcher() {//A watcher with zk is registered here
@Override
public void process(WatchedEvent watchedEvent) {
CuratorEvent event = new CuratorEventImpl(, , ().getIntValue(), unfixForNamespace(()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
},
(),
(),
()
);
...
}
...
}
(2) Creating a CuratorFramework instance will initialize the CuratorZooKeeperClient instance
The CuratorFramework instance represents a zk client. When CuratorFramework is initialized, a CuratorZooKeeperClient instance is initialized.
CuratorZooKeeperClient is the client that Curator encapsulates ZooKeeper.
When initializing the CuratorZooKeeperClient, a Watcher listener will be passed in.
So the main work of the newClient() method of CuratorFrameworkFactory is: Initialize CuratorFramework -> Initialize CuratorZooKeeperClient -> Initialize ZookeeperFactory + Register a Watcher.
The client initiates a connection to zk and registers a Watcher listener, which is triggered by the start() method of CuratorFramework.
How to establish a connection with zk at startup
The start() method of ConnectionStateManager will start a thread to process eventQueue. The eventQueue stores the network connection change event with zk. When eventQueue receives such an event, it will notify the ConnectionStateListener.
The start() method of CuratorZookeeperClient will initialize the native zk client, establish a TCP long connection with the zk server, and will also register a ConnectionState type Watcher listener to receive notification events sent by the zk server.
public class CuratorFrameworkImpl implements CuratorFramework {
private final CuratorZookeeperClient client;
private final ConnectionStateManager connectionStateManager;
private volatile ExecutorService executorService;
...
public CuratorFrameworkImpl( builder) {
...
= new CuratorZookeeperClient(...);
connectionStateManager = new ConnectionStateManager(this, (),
(),
().getSimulatedSessionExpirationPercent(),
()
);
...
}
...
@Override
public void start() {
("Starting");
if (!(, )) {
throw new IllegalStateException("Cannot be started more than once");
}
...
//1. Start a thread listening and zk network connection change event
();
//2. Add a listener to monitor and zk network connection changes
final ConnectionStateListener listener = new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if ( == newState || == newState) {
(true);
}
}
@Override
public boolean doNotDecorate() {
return true;
}
};
().addListener(listener);
//3. Create a native zk client
();
//4. Create a thread pool and perform background operations
executorService = (threadFactory);
(new Callable<Object>() {
@Override
public Object call() throws Exception {
backgroundOperationsLoop();
return null;
}
});
if (ensembleTracker != null) {
();
}
(());
}
...
}
public class ConnectionStateManager implements Closeable {
private final ExecutorService service;
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
...
public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerDecorator connectionStateListenerDecorator) {
...
service = (threadFactory);
...
}
...
public void start() {
((, ), "Cannot be started more than once");
//Start a thread
(
new Callable<Object>() {
@Override
public Object call() throws Exception {
processEvents();
return null;
}
}
);
}
private void processEvents() {
while (() == ) {
int useSessionTimeoutMs = getUseSessionTimeoutMs();
long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : () - startOfSuspendedEpoch;
long pollMaxMs = useSessionTimeoutMs - elapsedMs;
final ConnectionState newState = (pollMaxMs, );
if (newState != null) {
if (() == 0) {
("There are no ConnectionStateListeners registered.");
}
(listener -> (client, newState));
} else if (sessionExpirationPercent > 0) {
synchronized(this) {
checkSessionExpiration();
}
}
}
}
...
}
public class CuratorZookeeperClient implements Closeable {
private final ConnectionState state;
...
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,
RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) {
...
state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
...
}
...
public void start() throws Exception {
("Starting");
if (!(false, true)) {
throw new IllegalStateException("Already started");
}
();
}
...
}
class ConnectionState implements Watcher, Closeable {
private final HandleHolder zooKeeper;
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher,
AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) {
= ensembleProvider;
= sessionTimeoutMs;
= connectionTimeoutMs;
= tracer;
= connectionHandlingPolicy;
if (parentWatcher != null) {
(parentWatcher);
}
//Register yourself as a Watcher with HandleHolder
zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
}
...
void start() throws Exception {
("Starting");
();
reset();
}
synchronized void reset() throws Exception {
("reset");
();
(false);
connectionStartMs = ();
//Create a connection between client and zk
();
();//initiate connection
}
...
}
class HandleHolder {
private final ZookeeperFactory zookeeperFactory;
private final Watcher watcher;
private final EnsembleProvider ensembleProvider;
private final int sessionTimeout;
private final boolean canBeReadOnly;
private volatile Helper helper;
...
HandleHolder(ZookeeperFactory zookeeperFactory, Watcher watcher, EnsembleProvider ensembleProvider, int sessionTimeout, boolean canBeReadOnly) {
= zookeeperFactory;
= watcher;
= ensembleProvider;
= sessionTimeout;
= canBeReadOnly;
}
private interface Helper {
ZooKeeper getZooKeeper() throws Exception;
String getConnectionString();
int getNegotiatedSessionTimeoutMs();
}
ZooKeeper getZooKeeper() throws Exception {
return (helper != null) ? () : null;
}
void closeAndReset() throws Exception {
internalClose(0);
helper = new Helper() {
private volatile ZooKeeper zooKeeperHandle = null;
private volatile String connectionString = null;
@Override
public ZooKeeper getZooKeeper() throws Exception {
synchronized(this) {
if (zooKeeperHandle == null) {
connectionString = ();
//Create a connection with zk and initialize the variable zooKeeperHandle
zooKeeperHandle = (connectionString, sessionTimeout, watcher, canBeReadOnly);
}
...
return zooKeeperHandle;
}
}
@Override
public String getConnectionString() {
return connectionString;
}
@Override
public int getNegotiatedSessionTimeoutMs() {
return (zooKeeperHandle != null) ? () : 0;
}
};
}
...
}
//Create a connection between client and zk
public class DefaultZookeeperFactory implements ZookeeperFactory {
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception {
return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
}
}
10. Source code analysis of adding, deleting, modifying and checking nodes based on Curator
(1) Create znode node based on Curator
(2) Query znode node based on Curator
(3) Modify znode node based on Curator
(4) Delete znode node based on Curator
The CURD operation of Curator is done by calling the native API of zk.
(1) Create znode node based on Curator
Creating nodes also uses the constructor mode: first create a CreateBuilder instance through the create() method of CuratorFramework, then set the CreateBuilder variable through the CreateBuilder withMode() and other methods, and finally create the znode node through the CreateBuilder forPath() method + retry call.
When creating a node, the getZooKeeper() method of CuratorFramework will be called to obtain the zk client instance, and then the node will be created through the native zk client API.
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = (
"127.0.0.1:2181", //zk's address
5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
3000, //Timeout time when connecting zk
retryPolicy
);
();
("Curator client has been started");
//Create a node
().creatingParentsIfNeeded().withMode().forPath("/my/path", "100".getBytes());
}
}
public class CuratorFrameworkImpl implements CuratorFramework {
...
@Override
public CreateBuilder create() {
checkState();
return new CreateBuilderImpl(this);
}
...
}
public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String> {
private final CuratorFrameworkImpl client;
private CreateMode createMode;
private Backgrounding backgrounding;
private boolean createParentsIfNeeded;
...
CreateBuilderImpl(CuratorFrameworkImpl client) {
= client;
createMode = ;
backgrounding = new Backgrounding();
acling = new ACLing(());
createParentsIfNeeded = false;
createParentsAsContainers = false;
compress = false;
setDataIfExists = false;
storingStat = null;
ttl = -1;
}
@Override
public String forPath(final String givenPath, byte[] data) throws Exception {
if (compress) {
data = ().compress(givenPath, data);
}
final String adjustedPath = adjustPath((givenPath, ()));
List<ACL> aclList = (adjustedPath);
().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);
String returnPath = null;
if (()) {
pathInBackground(adjustedPath, data, givenPath);
} else {
//Create a node
String path = protectedPathInForeground(adjustedPath, data, aclList);
returnPath = (path);
}
return returnPath;
}
private String protectedPathInForeground(String adjustedPath, byte[] data, List<ACL> aclList) throws Exception {
return pathInForeground(adjustedPath, data, aclList);
}
private String pathInForeground(final String path, final byte[] data, final List<ACL> aclList) throws Exception {
OperationTrace trace = ().startAdvancedTracer("CreateBuilderImpl-Foreground");
final AtomicBoolean firstTime = new AtomicBoolean(true);
//Retry the call
String returnPath = (
(),
new Callable<String>() {
@Override
public String call() throws Exception {
boolean localFirstTime = (false) && !debugForceFindProtectedNode;
(client, createMode);
String createdPath = null;
if (!localFirstTime && ()) {
debugForceFindProtectedNode = false;
createdPath = findProtectedNodeInForeground(path);
}
if (createdPath == null) {
//When creating a znode node, first call () to obtain the zk client instance
//Then we create nodes through the native zk client API
try {
if (client.isZk34CompatibilityMode()) {
createdPath = ().create(path, data, aclList, createMode);
} else {
createdPath = ().create(path, data, aclList, createMode, storingStat, ttl);
}
} catch (e) {
if (createParentsIfNeeded) {
//This is the implementation of cascading node creation
((), path, false, (), createParentsAsContainers);
if (client.isZk34CompatibilityMode()) {
createdPath = ().create(path, data, (path), createMode);
} else {
createdPath = ().create(path, data, (path), createMode, storingStat, ttl);
}
} else {
throw e;
}
} catch (e) {
if (setDataIfExists) {
Stat setStat = ().setData(path, data, setDataIfExistsVersion);
if (storingStat != null) {
(setStat, storingStat);
}
createdPath = path;
} else {
throw e;
}
}
}
if (failNextCreateForTesting) {
failNextCreateForTesting = false;
throw new ();
}
return createdPath;
}
}
);
(data).setPath(path).commit();
return returnPath;
}
...
}
public class CuratorFrameworkImpl implements CuratorFramework {
private final CuratorZookeeperClient client;
public CuratorFrameworkImpl( builder) {
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(());
= new CuratorZookeeperClient(
localZookeeperFactory,
(),
(),
(),
(),
new Watcher() {
...
},
(),
(),
()
);
...
}
...
ZooKeeper getZooKeeper() throws Exception {
return ();
}
...
}
public class CuratorZookeeperClient implements Closeable {
private final ConnectionState state;
...
public ZooKeeper getZooKeeper() throws Exception {
((), "Client is not started");
return ();
}
...
}
class ConnectionState implements Watcher, Closeable {
private final HandleHolder zooKeeper;
...
ZooKeeper getZooKeeper() throws Exception {
if (()) {
throw new ();
}
Exception exception = ();
if (exception != null) {
new EventTrace("background-exceptions", ()).commit();
throw exception;
}
boolean localIsConnected = ();
if (!localIsConnected) {
checkTimeouts();
}
//Get ZooKeeper instance through HandleHolder
return ();
}
...
}
(2) Query znode node based on Curator
The query node also uses the constructor mode: first create a GetDataBuilder instance through the getData() method of CuratorFramework, and then query the znode node through the forPath() method of GetDataBuilder + retry call.
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = (
"127.0.0.1:2181", //zk's address
5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
3000, //Timeout time when connecting zk
retryPolicy
);
();
("Curator client has been started");
//Query node
byte[] dataBytes = ().forPath("/my/path");
(new String(dataBytes));
//Query child nodes
List<String> children = ().forPath("/my");
(children);
}
}
public class CuratorFrameworkImpl implements CuratorFramework {
...
@Override
public GetDataBuilder getData() {
checkState();
return new GetDataBuilderImpl(this);
}
@Override
public GetChildrenBuilder getChildren() {
checkState();
return new GetChildrenBuilderImpl(this);
}
...
}
public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, ErrorListenerPathable<byte[]> {
private final CuratorFrameworkImpl client;
...
@Override
public byte[] forPath(String path) throws Exception {
().getSchema(path).validateWatch(path, () || ());
path = (path);
byte[] responseData = null;
if (()) {
(new OperationAndData<String>(this, path, (), null, (), watching), null);
} else {
//Query node
responseData = pathInForeground(path);
}
return responseData;
}
private byte[] pathInForeground(final String path) throws Exception {
OperationTrace trace = ().startAdvancedTracer("GetDataBuilderImpl-Foreground");
//Retry the call
byte[] responseData = (
(),
new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
byte[] responseData;
//Get the native zk client instance through CuratorFramework, and then call its getData() to get the node
if (()) {
responseData = ().getData(path, true, responseStat);
} else {
responseData = ().getData(path, (path), responseStat);
((), false);
}
return responseData;
}
}
);
(responseData).setPath(path).setWithWatcher(()).setStat(responseStat).commit();
return decompress ? ().decompress(path, responseData) : responseData;
}
...
}
(3) Modify znode node based on Curator
Modifying nodes also uses the constructor mode: first create a SetDataBuilder instance through the setData() method of CuratorFramework, and then modify the znode node through the forPath() method of SetDataBuilder + retry call.
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = (
"127.0.0.1:2181", //zk's address
5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
3000, //Timeout time when connecting zk
retryPolicy
);
();
("Curator client has been started");
//Modify node
().forPath("/my/path", "110".getBytes());
byte[] dataBytes = ().forPath("/my/path");
(new String(dataBytes));
}
}
public class CuratorFrameworkImpl implements CuratorFramework {
...
@Override
public SetDataBuilder setData() {
checkState();
return new SetDataBuilderImpl(this);
}
...
}
public class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<Stat> {
private final CuratorFrameworkImpl client;
...
@Override
public Stat forPath(String path, byte[] data) throws Exception {
().getSchema(path).validateGeneral(path, data, null);
if (compress) {
data = ().compress(path, data);
}
path = (path);
Stat resultStat = null;
if (()) {
(new OperationAndData<>(this, new PathAndBytes(path, data), (), null, (), null), null);
} else {
//Modify node
resultStat = pathInForeground(path, data);
}
return resultStat;
}
private Stat pathInForeground(final String path, final byte[] data) throws Exception {
OperationTrace trace = ().startAdvancedTracer("SetDataBuilderImpl-Foreground");
//Retry the call
Stat resultStat = (
(),
new Callable<Stat>() {
@Override
public Stat call() throws Exception {
//Get the native zk client instance through CuratorFramework, and then call its setData() to modify the node
return ().setData(path, data, version);
}
}
);
(data).setPath(path).setStat(resultStat).commit();
return resultStat;
}
...
}
(4) Delete znode node based on Curator
Deleting nodes also uses the constructor mode: first create a DeleteBuilder instance through the delete() method of CuratorFramework, and then delete the znode node through the forPath() method of DeleteBuilder + retry call.
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = (
"127.0.0.1:2181", //zk's address
5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
3000, //Timeout time when connecting zk
retryPolicy
);
();
("Curator client has been started");
//Delete node
().forPath("/my/path");
}
}
public class CuratorFrameworkImpl implements CuratorFramework {
...
@Override
public DeleteBuilder delete() {
checkState();
return new DeleteBuilderImpl(this);
}
...
}
public class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>, ErrorListenerPathable<Void> {
private final CuratorFrameworkImpl client;
...
@Override
public Void forPath(String path) throws Exception {
().getSchema(path).validateDelete(path);
final String unfixedPath = path;
path = (path);
if (()) {
<String> errorCallback = null;
if (guaranteed) {
errorCallback = new <String>() {
@Override
public void retriesExhausted(OperationAndData<String> operationAndData) {
().addFailedOperation(unfixedPath);
}
};
}
(new OperationAndData<String>(this, path, (), errorCallback, (), null), null);
} else {
//Delete node
pathInForeground(path, unfixedPath);
}
return null;
}
private void pathInForeground(final String path, String unfixedPath) throws Exception {
OperationTrace trace = ().startAdvancedTracer("DeleteBuilderImpl-Foreground");
//Retry the call
(
(),
new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
//Get the native zk client instance through CuratorFramework, and then call its delete() to delete the node
().delete(path, version);
} catch (e) {
if (!quietly) {
throw e;
}
} catch (e) {
if (deletingChildrenIfNeeded) {
((), path, true);
} else {
throw e;
}
}
return null;
}
}
);
(path).commit();
}
}
Implementation source code of node monitoring callback mechanism
(1) Implementation source code of PathCache child node monitoring mechanism
(2) Implementation source code of NodeCache node monitoring mechanism
(3) Description of the asynchronous callback registration of children's nodes and background for asynchronous callbacks
(4) PathCache realizes the effect of automatically re-registering the listener
(5) NodeCache realizes the effect of monitoring node change event
(1) Implementation source code of PathCache child node monitoring mechanism
PathChildrenCache will call the getChildren() method of the native zk client object and pass a listener childrenWatcher into the method. When an event occurs in a child node, the childWatcher will be notified, and the watcher will call the Listener registered to PathChildrenCache. Note: The watcher will be registered repeatedly in the incoming listener Watcher.
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = (
"127.0.0.1:2181", //zk's address
5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
3000, //Timeout time when connecting zk
retryPolicy
);
();
("Curator client has been started");
//PathCache, listen for changes in child nodes under /cluster
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/cluster", true);
().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
...
}
});
();
}
}
public class PathChildrenCache implements Closeable {
private final WatcherRemoveCuratorFramework client;
private final String path;
private final boolean cacheData;
private final boolean dataIsCompressed;
private final CloseableExecutorService executorService;
private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
...
//initialization
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) {
= ();
= (path);
= cacheData;
= dataIsCompressed;
= executorService;
ensureContainers = new EnsureContainers(client, path);
}
//Get the container listeners used to store the Listener
public ListenerContainer<PathChildrenCacheListener> getListenable() {
return listeners;
}
//Start listening to child nodes
public void start() throws Exception {
start();
}
private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
// Handle the changes in connection status
handleStateChange(newState);
}
};
public void start(StartMode mode) throws Exception {
...
//Add Listener to the established zk connection
().addListener(connectionStateListener);
...
//PathChildrenCache into RefreshOperation
//The following code is actually calling PathChildrenCache's refresh() method
offerOperation(new RefreshOperation(this, ));
...
}
//Submit a task to the thread pool for processing
void offerOperation(final Operation operation) {
if ((operation)) {
submitToExecutor(
new Runnable() {
@Override
public void run() {
...
(operation);
//In fact, it is to call PathChildrenCache's refresh() method
();
...
}
}
);
}
}
private synchronized void submittedToExecutor(final Runnable command) {
if (() == ) {
//Submit a task to the thread pool for processing
(command);
}
}
...
}
class RefreshOperation implements Operation {
private final PathChildrenCache cache;
private final mode;
RefreshOperation(PathChildrenCache cache, mode) {
= cache;
= mode;
}
@Override
public void invoke() throws Exception {
//Calling PathChildrenCache refresh method, that is, initiate listening to child nodes
(mode);
}
...
}
public class PathChildrenCache implements Closeable {
...
private volatile Watcher childrenWatcher = new Watcher() {
//Repeat the listener
//When a child node changes event, the method will be called
@Override
public void process(WatchedEvent event) {
//The following code is actually calling PathChildrenCache's refresh() method
offerOperation(new RefreshOperation(, ));
}
};
void refresh(final RefreshMode mode) throws Exception {
ensurePath();
//Create a callback, and the execution of the callback will be triggered if the following execution () is successful
final BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (reRemoveWatchersOnBackgroundClosed()) {
return;
}
if (() == ()) {
//Processing child node data
processChildren((), mode);
} else if (() == ()) {
if (mode == RefreshMode.NO_NODE_EXCEPTION) {
(" received for getChildren() and refresh has failed. Resetting ensureContainers but not refreshing. Path: [{}]", path);
();
} else {
(" received for getChildren(). Resetting ensureContainers. Path: [{}]", path);
();
offerOperation(new RefreshOperation(, RefreshMode.NO_NODE_EXCEPTION));
}
}
}
};
//The following code will finally call the getChildren method of the native zk client to initiate listening to the child nodes
//And add a listener called childrenWatcher and a background asynchronous callback called callback
().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
}
...
}
// When a child node changes event, the invoke() method that executes EventOperation will be triggered in the end
class EventOperation implements Operation {
private final PathChildrenCache cache;
private final PathChildrenCacheEvent event;
EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event) {
= cache;
= event;
}
@Override
public void invoke() {
//Calling the Listener of PathChildrenCache
(event);
}
...
}
(2) Implementation source code of NodeCache node monitoring mechanism
NodeCache will call the exists() method of the native zk client object and pass a listener watcher into the method. When an event occurs in a child node, the watcher will be notified, and the watcher will call the Listener registered to NodeCache. Note: The watcher will be registered repeatedly in the incoming listener Watcher.
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = (
"127.0.0.1:2181", //zk's address
5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
3000, //Timeout time when connecting zk
retryPolicy
);
();
("Curator client has been started");
//NodeCache
final NodeCache nodeCache = new NodeCache(client, "/cluster");
().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
Stat stat = ().forPath("/cluster");
if (stat == null) {
} else {
();
}
}
});
();
}
}
public class NodeCache implements Closeable {
private final WatcherRemoveCuratorFramework client;
private final String path;
private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
...
private ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if ((newState == ) || (newState == )) {
if ((false, true)) {
reset();
}
} else {
(false);
}
}
};
//Initialize a Watcher and add it to the () method executed by the reset() method as a listener
private Watcher watcher = new Watcher() {
//Repeat the listener
@Override
public void process(WatchedEvent event) {
reset();
}
};
//Initialize a callback, the callback will be executed when the reset() method is executed successfully () below.
private final BackgroundCallback backgroundCallback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
processBackgroundResult(event);
}
};
//Initialize NodeCache
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) {
= ();
= (path);
= dataIsCompressed;
}
//Get the ListenerContainer that stores Listener
public ListenerContainer<NodeCacheListener> getListenable() {
(() != , "Closed");
return listeners;
}
//Start listening to node
public void start() throws Exception {
start(false);
}
public void start(boolean buildInitial) throws Exception {
((, ), "Cannot be started more than once");
//Add Listener to the established zk connection
().addListener(connectionStateListener);
if (buildInitial) {
//Calling the exists() method of the native zk client to listen to the node
().creatingParentContainersIfNeeded().forPath(path);
internalRebuild();
}
reset();
}
private void reset() throws Exception {
if ((() == ) && ()) {
//The following code will finally call the exists() method of the native zk client to listen to the node
//And add a listener called watcher and a background asynchronous callback called backgroundCallback
().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
}
}
private void processBackgroundResult(CuratorEvent event) throws Exception {
switch (()) {
case GET_DATA: {
if (() == ()) {
ChildData childData = new ChildData(path, (), ());
setNewData(childData);
}
break;
}
case EXISTS: {
if (() == ()) {
setNewData(null);
} else if (() == ()) {
if (dataIsCompressed) {
().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
} else {
().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
}
}
break;
}
}
}
...
}
(3) Description of the asynchronous callback registration of children's nodes and background for asynchronous callbacks
The watcher registered by the getChildren() method is only one-time, and the registered callback is an asynchronous callback.
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = (
"127.0.0.1:2181", //zk's address
5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
3000, //Timeout time when connecting zk
retryPolicy
);
();
("The Curator client has been started and the connection to zk has been completed");
().creatingParentsIfNeeded().withMode().forPath("/test", "10".getBytes());
("Create Node'/test");
().usingWatcher(new CuratorWatcher() {
public void process(WatchedEvent event) throws Exception {
//As long as the zk node changes are notified, it will not be notified here again
//It is the first notification that is valid. After being executed once, it will not be executed again.
("Received a notification from zk: " + event);
}
}).inBackground(new BackgroundCallback() {
//Background callback notification means that () will be executed asynchronously in the background
//As soon asynchronous execution of the () method in the background will be called back to notify
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
("Received a background callback notification: " + event);
}
}).forPath("/test");
}
}
(4) PathCache realizes the effect of automatically re-registering the listener
Whenever a node changes, the call to the childEvent() method is triggered.
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final CuratorFramework client = (
"127.0.0.1:2181", //zk's address
5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
3000, //Timeout time when connecting zk
retryPolicy
);
();
("The Curator client has been started and the connection to zk has been completed");
final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/test", true);
().addListener(new PathChildrenCacheListener() {
//As long as the child node changes, no matter how many times it changes, each change will trigger the call of childEvent here.
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
("The listening child node changed, and an event notification was received: " + pathChildrenCacheEvent);
}
});
();
("Complete the listening and startup of the child node");
}
}
(5) NodeCache realizes the effect of monitoring node change event
Whenever a node changes, the call to the nodeChanged() method will be triggered.
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final CuratorFramework client = (
"127.0.0.1:2181", //zk's address
5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
3000, //Timeout time when connecting zk
retryPolicy
);
();
("The Curator client has been started and the connection to zk has been completed");
final NodeCache nodeCache = new NodeCache(client, "/test/child/id");
().addListener(new NodeCacheListener() {
//As long as the node changes, no matter how many times it changes, each change will trigger the call of nodeChanged here.
public void nodeChanged() throws Exception {
Stat stat = ().forPath("/test/child/id");
if (stat != null) {
byte[] dataBytes = ().forPath("/test/child/id");
("Node data has changed:" + new String(dataBytes));
} else {
("Node was deleted");
}
}
});
();
}
}
12. Implementation source code of Leader election mechanism based on Curator
(1) Source code of LeaderLatch, the first Leader election mechanism
(2) Source code of the second Leader election mechanism LeaderSelector
Using Curator's CRUD+ listening callback mechanism can meet the scenarios in which most systems use zk. It should be noted that if you use the native zk to register a listener to a node or child node, when the corresponding event occurs in the node or child node, the client will be notified once, but the next time there will be a corresponding event, it will not be notified. When using zk's native API, the client needs to re-register the listener every time he receives an event notification. However, Curator's PathCache + NodeCache will automatically re-register the listener.
(1) Source code of LeaderLatch, the first Leader election mechanism
The Curator client will compete to become a leader by creating temporary sequential nodes. The implementation of LeaderLatch's election is almost the same as that of distributed locks.
After each Curator client creates a temporary sequential node, it will call the getChildren() method on the /leader/latch directory to get all the child nodes. The result of calling the getChildren() method will be notified through the backgroundCallback callback. Then the client sorts the obtained child nodes to determine whether it is the first child node.
If the client finds that it is the first child node, it is the Leader. If the client finds that it is not the first child node, add a listener to the previous node. When adding a listener, you will use the getData() method to get your previous node. After the getData() method is successfully executed, the backgroundCallback callback will be called.
When the client corresponding to the previous node releases the Leader role, the previous node will disappear. At this time, the client corresponding to the second node will be notified to execute the listener added by the getData() method.
So if the listener of the getData() method is triggered, that is, it is found that the previous node does not exist, the client will call the getChildren() method to re-get the child node list and determine whether it is a Leader.
Note: Using getData() instead of exists() can avoid resource leakage caused by unnecessary Watchers.
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final CuratorFramework client = (
"127.0.0.1:2181", //zk's address
5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
3000, //Timeout time when connecting zk
retryPolicy
);
().addListener(new ConnectionStateListener() {
public void stateChanged(CuratorFramework client, ConnectionState newState) {
switch (newState) {
case LOST:
// When the Leader is disconnected from zk, the current Leader needs to be suspended
}
}
});
();
("The Curator client has been started and the connection to zk has been completed");
LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");
();
();//Block and wait until the current client becomes the Leader
Boolean hasLeaderShip = ();
("Whether to become a Leader: " + hasLeaderShip);
}
}
public class LeaderLatch implements Closeable {
private final WatcherRemoveCuratorFramework client;
private final ConnectionStateListener listener = new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
handleStateChange(newState);
}
};
...
//Add this instance to the leadership election and attempt to acquire leadership.
public void start() throws Exception {
...
//Add Listener to the established zk connection
().addListener(listener);
reset();
...
}
@VisibleForTesting
void reset() throws Exception {
setLeadership(false);
setNode(null);
//Callback is a callback after successfully creating temporary sequential nodes
BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
...
if (() == ()) {
setNode(());
if (() == ) {
setNode(null);
} else {
//Successfully create temporary sequential nodes, you need to use getChildren() to get the child node list
getChildren();
}
} else {
("getChildren() failed. rc = " + ());
}
}
};
//Create temporary sequential nodes
().creatingParentContainersIfNeeded().withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback)
.forPath((latchPath, LOCK_NAME), (id));
}
//Get the list of child nodes
private void getChildren() throws Exception {
//Callback is a callback after successfully obtaining the child node list
BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (() == ()) {
checkLeadership(());
}
}
};
().inBackground(callback).forPath((latchPath, null));
}
//Check whether you are the first node
private void checkLeadership(List<String> children) throws Exception {
if (debugCheckLeaderShipLatch != null) {
();
}
final String localOurPath = ();
//Sort the obtained nodes
List<String> sortedChildren = (LOCK_NAME, sorter, children);
int ourIndex = (localOurPath != null) ? ((localOurPath)) : -1;
if (ourIndex < 0) {
("Can't find our node. Resetting. Index: " + ourIndex);
reset();
} else if (ourIndex == 0) {
// If you are the first node, mark yourself as Leader
setLeadership(true);
} else {
//If you are not the first node, add listening to the previous node
String watchPath = (ourIndex - 1);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if ((() == ) && (() == ) && (localOurPath != null)) {
//Reget the child node list
getChildren();
}
}
};
BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (() == ()) {
reset();
}
}
};
//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
//Using getData() instead of exists() can avoid resource leakage caused by unnecessary Watchers
().usingWatcher(watcher).inBackground(callback).forPath((latchPath, watchPath));
}
}
...
//Block and wait until it becomes a leader
public void await() throws InterruptedException, EOFException {
synchronized(this) {
while ((() == ) && !()) {
wait();//The wait() method of the Objetc object, blocking and waiting
}
}
if (() != ) {
throw new EOFException();
}
}
//Set the current client to become a leader and notifyAll() to notify the blocking thread before blocking
private synchronized void setLeadership(boolean newValue) {
boolean oldValue = (newValue);
if (oldValue && !newValue) { // Lost leadership, was true, now false
(new Function<LeaderLatchListener, Void>() {
@Override
public Void apply(LeaderLatchListener listener) {
();
return null;
}
});
} else if (!oldValue && newValue) { // Gained leadership, was false, now true
(new Function<LeaderLatchListener, Void>() {
@Override
public Void apply(LeaderLatchListener input) {
();
return null;
}
});
}
notifyAll();//The thread that executed the wait() method before wakes up
}
}
(2) Source code of the second Leader election mechanism LeaderSelector
By judging whether a distributed lock was successfully acquired, we can judge whether to compete to become a leader. Because it is the leader by holding a distributed lock, the() method cannot be exited, otherwise the lock will be released. Once the lock is released, other clients will compete for the lock and become the new leader.
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final CuratorFramework client = (
"127.0.0.1:2181", //zk's address
5000, //The heartbeat timeout of the client and zk. If there is no heartbeat after that time, the Session will be disconnected.
3000, //Timeout time when connecting zk
retryPolicy
);
();
("The Curator client has been started and the connection to zk has been completed");
LeaderSelector leaderSelector = new LeaderSelector(
Client,
"/leader/election",
new LeaderSelectorListener() {
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
("You have become the Leader...");
//Do everything about Leader here, the method cannot exit at this time
(Integer.MAX_VALUE);
}
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
("The change in connection state is no longer a leader...");
if (()) {
throw new CancelLeadershipException();
}
}
}
);
();//Try to compete with other nodes on the node "/leader/election" to become a leader
(Integer.MAX_VALUE);
}
}
public class LeaderSelector implements Closeable {
private final CuratorFramework client;
private final LeaderSelectorListener listener;
private final CloseableExecutorService executorService;
private final InterProcessMutex mutex;
...
public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener) {
(client, "client cannot be null");
(leaderPath);
(listener, "listener cannot be null");
= client;
= new WrappedListener(this, listener);
hasLeadership = false;
= executorService;
//Initialize a distributed lock
mutex = new InterProcessMutex(client, leaderPath) {
@Override
protected byte[] getLockNodeBytes() {
return (() > 0) ? getIdBytes(id) : null;
}
};
}
public void start() {
((, ), "Cannot be started more than once");
(!(), "Already started");
(!hasLeadership, "Already has leadership");
().addListener(listener);
request();
}
public boolean request() {
(() == , "close() has already been called");
return internalRequeue();
}
private synchronized boolean internalRequeue() {
if (!isQueued && (() == )) {
isQueued = true;
//Terminate the election work as a task to the thread pool for execution
Future<Void> task = (new Callable<Void>() {
@Override
public Void call() throws Exception {
...
doWorkLoop();
...
return null;
}
});
(task);
return true;
}
return false;
}
private void doWorkLoop() throws Exception {
...
doWork();
...
}
@VisibleForTesting
void doWork() throws Exception {
hasLeadership = false;
try {
//Try to obtain a distributed lock, if the acquisition fails, it will block
();
//Execute this line of code, which means that the distributed lock is successfully obtained
hasLeadership = true;
try {
if (debugLeadershipLatch != null) {
();
}
if (debugLeadershipWaitLatch != null) {
();
}
//The takeLeadership() method rewritten by the callback user
(client);
} catch (InterruptedException e) {
().interrupt();
throw e;
} catch (Throwable e) {
(e);
} finally {
clearIsQueued();
}
} catch (InterruptedException e) {
().interrupt();
throw e;
} finally {
if (hasLeadership) {
hasLeadership = false;
boolean wasInterrupted = (); // clear any interrupted tattoo so that () works immediately
try {
//Release the lock
();
} catch (Exception e) {
if (failedMutexReleaseCount != null) {
();
}
(e);
("The leader throw an exception", e);
} finally {
if (wasInterrupted) {
().interrupt();
}
}
}
}
}
...
}