Outline
Use scenarios
It will be mainly used in those systems
3. Why is it necessary to use zk clusters in distributed system architecture
What are the characteristics of a distributed system
Three roles of cluster machines
6. Long connection and session between client and zk
Data model znode and node type
The most core Watcher listening callback mechanism
The protocol's master-slave synchronization mechanism and crash recovery mechanism
Protocol process cluster startup-data synchronization-crash recovery
11. ZAB message broadcast process using 2PC two-stage submission idea
Is it strong consistency or final consistency
Two possible data inconsistency problems under the protocol
14. Data synchronization between new Leader and Follower when crash recovery
How will the protocol handle messages that need to be discarded
The role of Observer node
Suitable for small cluster deployment + reasons for more read and less read scenarios
Summary of characteristics
Use scenarios
(1) Distributed lock
(2) Cluster metadata management
(3) Distributed Coordination (HA)
(4) zk usage scenario summary
Kafka uses zk for metadata management, Master election, and distributed coordination. Canal also uses zk for metadata management and Master election (HA main and backup switching). HDFS HA is also based on zk.
(1) Distributed lock
Distributed locks in distributed architectures include: Redis distributed locks and zk distributed locks.
(2) Cluster metadata management
Kafka and Canal are distributed architectures running in distributed clusters. They both need to centrally store and manage the core metadata of the distributed cluster, so they both choose to put the core metadata of the distributed cluster in zk.
(3) Distributed Coordination (HA)
If a client changes the data in zk, zk will notify other clients listening to this data that have changed.
Several very classic distributed coordination scenarios:
1.Kafka
Kafka has multiple Brokers, and multiple Brokers will compete to become a Controller role. If the Broker, which is the Controller, is hung, then a node it registered in zk will be deleted. Then other Brokers will be reverse notified by zk and continue to compete to become the new controller.
2.HDFS
The HA architecture of NameNode in HDFS deploys two NameNodes, only one NameNode becomes the Master through the zk election, and the other NameNode is used as a backup.
Canal
Canal's HA architecture also uses zk to ensure that after a node has hanged up, other alternate nodes can be notified to switch to ensure HA.
(4) zk usage scenario summary
zk encapsulates many core and mainstream demand functions in distributed architectures. For example, many distributed systems require distributed locks, centralized storage of metadata of distributed clusters, master elections, and distributed coordination. These functional requirements can be implemented using zk.
It will be mainly used in those systems
(1) Distributed Java Business System
(2) Open source distributed system
(3) Self-developed distributed system
(1) Distributed Java Business System
For example, distributed e-commerce platforms, Internet platforms developed by Java, or traditional architecture systems, all belong to distributed Java business systems.
Most of these systems rely on Dubbo and Spring Cloud to split the system. After splitting it into many services or subsystems, they can coordinate their work to complete the final function.
However, zk is used less in this type of Java business system. The function that zk uses more frequently in this type of Java business system is just a distributed lock. Even the functions of distributed locks often choose to use Redis distributed locks.
(2) Open source distributed system
For example, Dubbo, HBase, HDFS, Kafka, Canal, Storm, and Solr are mainly used to use zk: centralized storage of metadata of distributed clusters, master elections to implement HA architecture, distributed coordination and notification.
Dubbo: Use zk as a registry + centrally store metadata of distributed clusters;
HBase: Use zk to centrally store metadata of distributed clusters;
HDFS: Use zk to implement the HA architecture for Master elections;
Kafka: Use zk to centrally store metadata, distributed coordination and notification of distributed clusters;
Canal: Use zk to centrally store metadata of distributed clusters, and Master elections to implement HA;
Canal, Kafka, HDFS and other technologies all use zk for metadata management and Master elections.
(3) Self-developed distributed system
In many self-developed similar distributed systems, you may consider: Is it necessary to centrally store the metadata of a distributed cluster? Is it necessary to assist in Master elections to implement the HA architecture? Is distributed coordination and notification required?
If there are similar needs when developing a distributed system in self-developed, then you can consider introducing zk to meet this demand.
3. Why is it necessary to use zk clusters in distributed system architecture
(1) Develop a system similar to zk
(2) Use zk cluster
When it is necessary to perform distributed clusters: centralized storage of metadata, master election implementation, distributed coordination and notification, you can develop a system similar to zk, or use zk clusters.
(1) Develop a system similar to zk
If it is a stand-alone version, it is only deployed on one machine, and some functions are provided. Although it has implemented storing some metadata, supporting Master elections, supporting distributed coordination and notifications. But for stand-alone version of the system, what if it fails?
Therefore, cluster deployment is required, and multiple machines are used to ensure high availability. Even if one machine is hung up, it can continue to run.
Suppose there are now 3 machines that need to store metadata. Has written a piece of data to machine 1, so how should machine 1 synchronize the data to other machines? So how should data consistency be guaranteed after a self-developed zk system is deployed?
(2) Use zk cluster
The proven zk has few bugs and comprehensive functions. It has been used in many industrial-grade distributed systems, so it is just enough to use the zk cluster directly.
What are the characteristics of a distributed system
(1) Clustered deployment
(2) Data model of tree structure
(3) Write in sequence
(4) Data consistency
(5) High performance
(6) Highly available
(7) High concurrency
(1) Clustered deployment
3 to 5 machines form a cluster, and each machine saves all the data of zk in memory. The machines communicate and synchronize data, and the client can connect to any machine.
(2) Data model of tree structure
The data structure of znode is similar to that of file systems, and is a tree-shaped data structure with hierarchical relationships. The data structure of znode is a tree structure, pure memory storage, and znode can be considered as a node.
create /usr/local/uid
create /usr/local/test_file
uid: You can write some data values, such as hello world
test_file: You can also write some data values
(3) Write in sequence
There is only one machine in the cluster that can be written and all machines can be read. All write requests will be assigned a global unique incremental number ZXID in the zk cluster. The function of ZXID is to ensure that the write requests initiated by the client are in sequence.
(4) Data consistency
Any zk machine will be synchronized to other machines after receiving a write request to ensure that the data is consistent. The data seen by the client connecting to any zk machine is consistent.
(5) High performance
Every zk machine maintains data in memory, so the zk cluster is absolutely high concurrency and high performance. If zk is deployed on high-configuration machines, the zk cluster of 3 machines can withstand tens of thousands of requests per second.
(6) Highly available
Even if no more than half of the machines in the cluster are hung up, the cluster can be guaranteed to be available and data will not be lost. 1 machine can be hung with 3 machines and 2 machines can be hung with 5 machines.
(7) High concurrency
High concurrency is determined by high performance. As long as it is processed based on pure memory data structures, the concurrency capability is very high. Use high-configuration physical machines to write, for example, 1 16-core 32G can support tens of thousands of QPS, and 3 16-core 32G can support tens of thousands of QPS.
Three roles of cluster machines
Generally speaking, there are three role machines in the zk cluster, namely Leader, Follower, and Observer. After the cluster is started, a leader will be automatically elected. Only the leader can write, and the follower can only synchronize data and provide data reading. If the leader hangs, the Follower will continue to elect a new Leader. Observer can only read, and Observer does not participate in the election.
6. Long connection and session between client and zk
After the zk cluster is started, each node in the cluster will assign a role to itself. The connection established by the client and the zk cluster afterwards is a TCP long connection. This creates a session session, which will sense whether the session exists through the heartbeat.
Data model znode and node type
The core data model of zk is the znode tree. Writing data to zk is to create a znode with a tree-shaped structure, which can be written into the values and stored in zk's memory.
There are two types of nodes: persistent nodes and temporary nodes. A persistent node, even if the client is disconnected, always exists. A temporary node means that as long as the client is disconnected, the node will be gone.
Sequential nodes are the number that increases globally when creating nodes.
The implementation of zk distributed lock in Curator is based on zk's temporary sequential nodes. A temporary sequential node will be created when locking. zk will automatically add a suffix to the temporary node, which is a globally incremented number. If the client disconnects, the lock added by the client will be automatically destroyed, and at this time, other clients will sense it and try to add the lock.
If you do metadata storage, you need to use persistent nodes. If distributed coordination and notification are performed, temporary nodes are usually used. If distributed locks are implemented, temporary sequential nodes are usually used.
Each znode also has a Stat to store the data version:
1.version (znode version)
2.cversion (version of znode child node)
3.aversion (znode's ACL permission control version)
The most core Watcher listening callback mechanism
The most core mechanism of zk is: a client can watcher listen on znode, and when znode changes, zk will call back the client to notify.
This is a very useful function and is very necessary in the coordination of distributed systems. If zk only supports writing and reading, it can only implement metadata storage, Master election and some functions, and the coordination requirements for distributed systems cannot be achieved. If System A monitors a change in a data, if System B updates the data, zk needs to be able to notify System A of the changes in the data.
By performing these two operations on the zk memory data model (different node types): writing and reading data, monitoring data changes (reversely notifying data changes when updating data), you can realize: storing cluster metadata, distributed lock, Master election, distributed coordinated monitoring and other functions;
The protocol's master-slave synchronization mechanism and crash recovery mechanism
The ZAB protocol uses a master-slave architecture and requires the division of cluster roles. There are two roles: Leader and Follower. Both the Leader and the Follower can handle read requests, but only the Leader can handle write requests.
After receiving the transaction request, the Leader will write data to the local disk log file, then convert it to Proposal proposal and synchronize it to all Followers. After receiving the Leader's Proposal proposal, Follower will also write data to the local disk log file.
When the leader finds that more than half of the followers have received the Proposal proposal, the leader will send a Commit message to all followers to submit the transaction to write data to memory.
If the leader crashes, Follower will re-election the new Leader to ensure that the service runs.
Therefore, the ZAB protocol involves: role division, 2PC (two-stage), and over half-write mechanism.
Protocol process cluster startup-data synchronization-crash recovery
(1) When the zk cluster is started, it will enter data recovery mode
(2) After zk is started, it will enter message broadcast mode
(3) When the Leader goes down, it will enter data recovery mode
(1) When the zk cluster is started, it will enter data recovery mode
A leader is elected when the cluster starts. As long as more than half of the machines recognize that a machine is a leader, the machine can be elected as a leader.
After the leader is elected, the leader will wait for more than half of the followers in the cluster to synchronize data with it. As long as more than half of the followers complete data synchronization, the cluster will exit recovery mode and provide services to the outside world. Of course, the follower, which has not completed data synchronization, will continue to synchronize data with the Leader.
(2) After zk is started, it will enter message broadcast mode
The client can connect to both the Leader and the Follower, but be careful that only the Leader can handle write requests.
If the client sends a write request to the Follower, the Follower forwards the write request to the Leader. After the leader receives the write request, it will synchronize the write request to all followers in the form of Proposal's proposal. After more than half of the Followers receive the Proposal, the Leader sends a Commit message to ask the Follower to submit a write request.
(3) When the Leader goes down, it will enter data recovery mode
When the Leader goes down, the Follower will re-elect a Leader. As long as more than half of the followers admit that a follower becomes the leader, the election can be completed.
So in the zk cluster, as long as the number of downtime machines is less than half, the cluster can still work normally. Because more than half of the machines survive to re-election, a new leader can be re-electioned at this time. After the new leader is elected, wait for more than half of the followers to synchronize data with it. After more than half of the followers complete data synchronization, the cluster will re-enter the message broadcast mode.
(4) Summary
1. Data recovery mode when cluster starts
Leader election (election mechanism of more than half of the machine) + (remaining machines) for data synchronization.
2. Message broadcast mode when message is written
Leader uses the 2PC mode write mechanism to synchronize the Follower.
3. Data recovery mode when Leader downtime
When the leader goes down, as long as more than half of the remaining surviving machines are left, a new leader can be elected. After the new leader is elected, the Follower will resynchronize the data.
11. ZAB message broadcast process using 2PC two-stage submission idea
(1) Before the Leader initiates a transaction Proposal
(2) After each Follower receives a transaction Proposal
(3) The leader itself will also perform Commit operations
When broadcasting each message, it is implemented through 2PC. First, the Leader broadcasts the Proposal proposal, and then each Follower returns the ACK response. After receiving more than half of the Follower's ACK response, the Leader broadcasts the Commit message for the Follower to submit.
(1) Before the Leader initiates a transaction Proposal
The leader will assign a globally unique incremental ZXID to strictly ensure the order of transactions, and the leader will create a FIFO queue for each follower. The Proposal sent to Follower will be placed in the queue in sequence to ensure the order of transaction processing.
(2) After each Follower receives a transaction Proposal
Follower will immediately write to the local disk log, and after the write is successful, the data will be guaranteed not be lost. Then the Follower will return an ACK to the Leader. When more than half of the Followers return an ACK, the Leader will send a Commit message to all Followers.
(3) The leader itself will also perform Commit operations
After the Leader and Follower commit, it means that this data can be read.
Is it strong consistency or final consistency
(1) Strong consistency
(2) Final consistency
(3) Sequence consistency
(1) Strong consistency
As long as you write a piece of data, you can immediately read the data from any machine on zk. When a strongly consistent write operation is stuck, the write operation can be returned until the leader and all followers commit it, and the write can be considered successful. So as long as the writing is successful, it can be found from any zk machine query, which is strong consistency. It is obvious that zk under the ZAB protocol mechanism is not strong consistency.
(2) Final consistency
Write a piece of data, and the method returns the write successfully. At this time, it may be impossible to check on other zk machines immediately, and data inconsistencies may occur for a short period of time. But after a while, other machines will definitely synchronize this data, and you will eventually find the written data.
In the ZAB protocol: When more than half of the Followers propose to the Proposal to return ACK, the Leader will send a Commit message to all Followers. As long as the Follower or Leader commits, the data will be read by the client.
Then it is possible: some followers have already committed, but some followers have not yet committed. When a client connects to a follower, the data that just Commit can be read, but when some clients connect to another follower, the data that does not have Commit is not yet read.
So zk is not strong consistency. The leader does not guarantee that a single data will be read by the client only after all the followers are committed. During the Commit process, the data read on different followers may be inconsistent. But after completing the Commit, the client will eventually read the consistent data.
(3) Sequence consistency
The definition given by zk is sequential consistency, and of course it also belongs to the final consistency. But the sequential consistency of zk is a little better than the final consistency, because the Leader will ensure that all Proposals are synchronized in order when synchronized to Follower.
If zk is required to be strong consistency, then zk's sync() method can be called manually.
Two possible data inconsistency problems under the protocol
(1) The leader crashed before sending the Commit message
(2) Leader crashed before the Proposal proposal was broadcast
(1) The leader crashed before sending the Commit message
Leader received more than half of the follower's ACK, and then the leader committed it himself, but before he could send a Commit message to all followers, he was down. At this time, the data equivalent to the Leader is inconsistent with all Followers, so you must ensure that all Followers will eventually complete the Commit.
Therefore, after the leader crashes, a follower with the largest ZXID will be elected as the leader, which will check the transaction log. If you find that there is a Proposal proposal in your transaction log that has not been submitted yet, then the old leader crashes before he has time to send a Commit message. At this time, it will serve as a new leader to send a Commit message to the Follower for this Proposal, thus ensuring that the transactions submitted by the old Leader can eventually be submitted to all Followers.
Similarly, if the leader receives more than half of the ACK response from Follower, the Leader will crash or the Leader and Follower network problems occur during the Commit message. Then, as long as the follower is synchronized with the leading it can find (the largest follower in the new election or the original leader), it can ensure that the new data is committed in the follower.
(2) Leader crashed before the Proposal proposal was broadcast
The leader went down before he had time to send a Proposal proposal to all followers. At this time, the request on the leader should be discarded.
For this case: If there is a transaction Proposal proposal in the old Leader log, it restarts and synchronizes with the new Leader, and finds that the transaction Proposal proposal should not actually exist, it will be discarded directly.
14. Data synchronization between new Leader and Follower when crash recovery
(1) Send Proposal first and then send Commit message
(2) The Commit operation will write data to the znode in memory
(3) Judging the data is available through the synchronized Follower list
(4) Reasons for choosing ZXID's largest follower as the new leader
(1) Send Proposal first and then send Commit message
After a new leader is elected, other followers will synchronize data with it. The leader will prepare a queue for each Follower, and then send all Proposal proposals to the Follower, and then send a Commit message to the Follower.
(2) The Commit operation will write data to the znode in memory
The Commit operation is to add this data to the znode tree data structure in memory, so that the client can access the data, and of course it will also notify the client listening to the znode. When a Proposal proposal is received, the data will be written to disk in the form of a transaction log.
(3) Judging the data is available through the synchronized Follower list
If a follower completes data synchronization with the new leader, it will be added to the synchronized follower list of the new leader. When there are more than half of the following in this synchronized follower list, the new leader can continue to provide services to the outside world.
(4) Reasons for choosing ZXID's largest follower as the new leader
When electing a new leader, the following with the largest ZXID will be selected as the new leader. For example, 5 machines, 1Leader + 4 Followers. One leader sent the Proposal to 4 Followers, and 3 Followers (more than half) received the Proposal and returned ACK, but the fourth Follower did not receive the Proposal. At this time, the Leader has hanged after executing the Commit, and the Commit message cannot be sent to other Followers. At this time, there are 4 followers left. As long as 3 of them vote as leaders, they can become leaders. Assuming that the three followers who received the Proposal voted for the follower that did not receive the Proposal, the data will be permanently lost, so you need to choose a follower with the largest ZXID as the new leader.
How will the protocol handle messages that need to be discarded
(1) ZAB protocol handles messages that need to be discarded according to epoch
(2) A simple summary of ZAB protocol
(1) ZAB protocol handles messages that need to be discarded according to epoch
The ZXID of each transaction is 64 bits, the high 32 bits are the leader's epoch (represents the election round), and the low 32 bits are the self-growing serial number.
If a leader just writes a Proposal proposal to the local disk log, it crashes before it has time to broadcast the Proposal proposal to all Followers. Then when the new leader is elected, the epoch of the business will grow one by one. Then when the old leader reboots and rejoins the cluster and becomes a follower, you will find that you have one more Proposal proposal than the new leader, but the epoch proposed by the Proposal is lower than the epoch of the new leader, so this data will be discarded.
(2) A simple summary of ZAB protocol
1. When starting: more than half of the machine elects Leader + data synchronization
2. When providing services to the outside world: 2PC + more than half of the write mechanism, sequence consistency (final consistency)
3. When crash recovery: re-election of Leader + handle the two data inconsistencies.
The role of Observer node
(1) For write request
(2) For read request
The Observer node does not participate in the Leader election, does not participate in the ACK process of more than half of the follower when the broadcast proposal is made, and only processes client read requests and synchronized data.
(1) For write request
Since the Leader does data synchronization, Observer will not participate in the more than half of the write mechanism. Therefore, no matter how many machines a zk cluster is, it can only be written by one leader. Leader writes up to tens of thousands of QPS per second, which cannot be expanded.
Therefore, zk is suitable for scenarios with less writing. Redis's QPS can reach 100,000, while zk's QPS can only reach tens of thousands. Although Redis and zk are both memory-level, the processing of write requests is handled by a single thread. However, since Redis does not have a half-write mechanism, it has higher write performance.
(2) For read request
There are usually 2 or 4 followers, so that they can reach tens of thousands of QPS per second when processing read requests. If more followers are introduced, more followers will participate in the over half of the writing mechanism. This will affect the efficiency of the Leader writing, so there is an Observer.
Therefore, in order to further improve the zk cluster's ability to handle read requests, an Observer node can be introduced. Since it only synchronizes data to provide read services, nodes can be expanded infinitely without affecting write efficiency.
Suitable for small cluster deployment + reasons for more read and less read scenarios
(1) ZK clusters usually use three or five small clusters to deploy
(2) ZK cluster is suitable for scenarios where more reads, less writes, and less reads
(1) ZK clusters usually use three or five small clusters to deploy
Suppose there is 1 Leader + 20 Followers, a total of 21 machines. At this time, since the follower has to participate in more than half of the ACK of the ZAB write request, a write request must wait for at least 10 followers to return the ACK before sending the Commit message. The leader can only tell the client that the write request is successful after sending a Commit message, so the performance will be relatively poor. Therefore, the ZAB protocol determines that the zk cluster usually uses a small cluster of 1 leader + 2 Followers.
(2) ZK cluster is suitable for scenarios where more reads, less writes, and less reads
The processing capability of write requests in the zk cluster cannot be expanded. If the number of read requests is large, you can add the Observer machine. Therefore, zk is only suitable for scenarios where more reads, less writes, so zk is mainly used to handle some coordination work of distributed systems.
Summary of characteristics
(1) Cluster mode deployment
(2) Master-slave architecture
(3) Memory data model
(4) Sequence consistency
(5) High performance
(6) Highly available
(7) High concurrency
(1) Cluster mode deployment
Generally, when deploying odd nodes, more than half of the machines cannot be hung up. Because 5 machines can hang 2, and 6 machines can only hang 2 at most. Therefore, the effects of 5 and 6 are the same, so odd nodes can reduce machine overhead. Moreover, zk cluster is a small cluster deployment, suitable for scenarios where more reads, less writes, and less.
(2) Master-slave architecture
Leader、Follower、Observer。
(3) Memory data model
znode, multiple node types, supports Watcher mechanism to implement callback notifications.
(4) Sequence consistency
The messages are synchronized in order, but they will be consistent in the end, not strongly consistent. The high 32-bit epoch of ZXID, and the low 32-bit are self-growing serial numbers.
(5) High performance
2PC's over half of the write mechanism, pure memory data structure, znode. ZAB protocol: 2PC, more than half ACK + write transaction log, Commit + write memory data.
(6) Highly available
The following downtime has not affected, and there are data inconsistencies in the Leader downtime. The new election leader will be processed automatically and run normally. However, during recovery mode, it may be possible to not write zk to a short period of time. The client conducts a long TCP connection with zk and maintains the Session through the heartbeat mechanism.
(7) High concurrency
For stand-alone leader writing, the Observer node can linearly expand the read QPS.