Location>code7788 >text

Kafka Community KIP-500 Chinese translation (removing ZooKeeper)

Popularity:570 ℃/2024-11-06 01:26:10

Link to original article:/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum

Translator: one of the biggest changes in the version is the lifting of the dependency on ZooKeeper, and the author of this article is the god Colin, who explains the whole process of going to ZK at a high level, more on the side of the overall design, to understand it will help us from a higher altitude, a better grasp of the KRaft

Background (Motivation

Currently, Kafka uses ZooKeeper to store its metadata about partitions and brokers, and to elect a broker to be the Kafka Controller. We would like to remove this dependency on ZooKeeper. This will enable us to manage metadata in a more scalable and robust way, enabling support for more partitions. It will also simplify the deployment and configuration of Kafka.

Currently, Kafka uses ZooKeeper to store metadata such as partitions and brokers, and the Kafka Controller's selector operation also relies on ZooKeeper, so we'd like to remove the dependency on ZooKeeper. This way we can manage metadata in a more scalable and robust way, support more partitions, and simplify Kafka deployment and configuration.

Metadata-> event log (Metadata as an Event Log

We often talk about the benefits of managing state as a stream of events. A single number, the offset, describes a consumer's position in the stream. Multiple consumers can quickly catch up to the latest state simply by replaying all the events newer than their current offset. The log establishes a clear ordering between events, and ensures that the consumers always move along a single timeline.

We often talk about the benefits of managing state as a series of events. A bitpoint, which is simply a number, describes a consumer's position in the stream. All consumers can catch up quickly by simply replaying message events that are larger than their current bitpoint. The log establishes a clear sequence between events and ensures that consumers always move along a single timeline

 

However, although our users enjoy these benefits, Kafka itself has been left out. We treat changes to metadata as isolated changes with no relationship to each other. When the controller pushes out state change notifications (such as LeaderAndIsrRequest) to other brokers in the cluster, it is possible for brokers to get some of the changes, but not all. Although the controller retries several times, it eventually give up. This can leave brokers in a divergent state.

However, we Kafka users enjoy these benefits, but Kafka itself doesn't make good use of it. We treat metadata changes as isolated changes unrelated to each other, and when the controller pushes a state change to other brokers in the cluster (e.g. LeaderAndIsrRequest), the brokers may receive some change requests, but not all. Although the controller retries several times, it eventually gives up. This causes the broker to be in a split state

 

Worse still, although ZooKeeper is the store of record, the state in ZooKeeper often doesn't match the state that is held in memory in the controller. For example, when a partition leader changes its ISR in ZK, the controller will typically not learn about these changes for many seconds. There is no generic way for the controller to follow the ZooKeeper event log. Although the controller can set one-shot watches, the number of watches is limited for performance reasons. When a watch triggers, it doesn't tell the controller the current state-- only that the state has changed. By the time the controller re-reads the znode and sets up a new watch, the state may have changed from what it was when the watch originally fired. If there is no watch set, the controller may not learn about the change at all. In some cases, restarting the controller is the only way to resolve the discrepancy.

To make matters worse, although ZooKeeper stores metadata, many times the data stored by ZooKeeper does not match the metadata in the controller's memory. For example, when a partition leader modifies its ISR list, the controller usually receives a notification only after many seconds (Translator: Just a quick note, the partition leader writes the ISR directly to the ZooKeeper, not the controller, which results in the controller storing metadata that isn't up-to-date). controller does not have a generalized way to track ZooKeeper's event logs. Although controllers can set up one-time monitors, the number of monitors is limited for performance reasons. When a ZooKeeper monitor is triggered, it doesn't tell the controller that only the ISR of a certain partition has changed, it only tells the controller that there has been a change. When the controller re-reads the znode and resets the watch, the state may have changed relative to when it was first triggered. Without setting the watch, the controller may not be aware of the metadata change at all. In some cases, restarting the controller becomes the only way to resolve data inconsistencies (Translation: this section talks throughout about some of the inconsistencies that ZooKeeper brings to the table

 

Rather than being stored in a separate system, metadata should be stored in Kafka itself. This will avoid all the problems associated with discrepancies between the controller state and the Zookeeper state. Rather than pushing out notifications to brokers, brokers should simply consume metadata events from the event log. This ensures that metadata changes will always arrive in the same order. Brokers will be able to store metadata locally in a file. When they start up, they will only need to read what has changed from the controller, not the full state. This will let us support more partitions with less CPU consumption.

Rather than storing metadata in another system, Kafka should manage it itself. This avoids all the problems caused by inconsistent controller and ZooKeeper state. Broker should consume metadata change events from event logs rather than controller pushing data to broker. This ensures that the metadata changes received by the broker are always in order. At the same time, the broker can also store the metadata in a local file, so that when it restarts, it doesn't need to read the full amount of metadata from the controller, but only the incremental portion, so that it can spend less cpu to process more partitions.

Simplified Deployment and Configuration (Simpler Deployment and Configuration

ZooKeeper is a separate system, with its own configuration file syntax, management tools, and deployment patterns. This means that system administrators need to learn how to manage and deploy two separate distributed systems in order to deploy Kafka. This can be a daunting task for administrators, especially if they are not very familiar with deploying Java services. Unifying the system would greatly improve the "day one" experience of running Kafka, and help broaden its adoption.

ZooKeeper is a standalone system with its own configuration files, management tools, and deployment model. This means that system administrators need to master managing and deploying two distributed systems at the same time, which can be a daunting task for administrators, especially if they are not familiar with how to deploy Java services. Harmonizing the systems will greatly improve "day one" of running Kafka (Translator: This is a foreigner's terminology habit, the so-called "first day" is the first time to use kafka feeling will become better!) experience and help expand its adoption

 

Because the Kafka and ZooKeeper configurations are separate, it is easy to make mistakes. For example, administrators may set up SASL on Kafka, and incorrectly think that they have secured all of the data travelling over the network. In fact, it is also necessary to configure security in the separate, external ZooKeeper system in order to do this. Unifying the two systems would give a uniform security configuration model.

Because Kafka is configured separately from ZooKeeper, it's easy to make mistakes. For example, administrators might set up SASL on Kafka and incorrectly assume that they have protected all data transmitted over the network. In fact, in order to do this, SASL needs to be configured on a separate, external ZooKeeper system. unifying the two systems results in a unified security configuration model!

 

Finally, in the future we may want to support a single-node Kafka mode. This would be useful for people who wanted to quickly test out Kafka without starting multiple daemons. Removing the ZooKeeper dependency makes this possible.

Finally, in the future, we may want to support a Kafka cluster in single-node mode, which is useful for running tests that don't require multiple nodes, and removing the dependency on ZooKeeper makes it possible to

Architecture (Architecture

Introduction (Introduction

This KIP presents an overall vision for a scalable post-ZooKeeper Kafka. In order to present the big picture, I have mostly left out details like RPC formats, on-disk formats, and so on. We will want to have follow-on KIPs to describe each step in greater detail. This is similar to KIP-4, which presented an overall vision which subsequent KIPs enlarged upon.

This KIP provides a general vision for scalable, de-ZooKeeperized Kafka. I've largely omitted the details of the RPC format, disk format, etc. for the sake of the big picture analysis and macro-narrative. We hope to have subsequent KIPs to describe each step in more detail. This is similar to KIP-4, which presents an overarching vision and subsequent KIPs elaborate on it

Overview (Overview

Currently, a Kafka cluster contains several broker nodes, and an external quorum of ZooKeeper nodes. We have pictured 4 broker nodes and 3 ZooKeeper nodes in this diagram. This is a typical size for a small cluster. The controller (depicted in orange) loads its state from the ZooKeeper quorum after it is elected. The lines extending from the controller to the other nodes in the broker represent the updates which the controller pushes, such as LeaderAndIsr and UpdateMetadata messages.

Currently, the Kafka cluster contains multiple broker nodes and external arbitration nodes for ZooKeeper. Four broker nodes and three ZooKeeper nodes are depicted in the figure above. This is typically the size of a small cluster. controller (shown in orange) loads state information such as metadata from the ZooKeeper when it is selected. Connections from the controller to the other brokers represent updates pushed by the controller, such as LeaderAndIsr and UpdateMetadata messages (Translation: Note here that in ZooKeeper mode, the controller is pushing metadata to other brokers

 

Note that this diagram is slightly misleading. Other brokers besides the controller can and do communicate with ZooKeeper. So really, a line should be drawn from each broker to ZK. However, drawing that many lines would make the diagram difficult to read. Another issue which this diagram leaves out is that external command line tools and utilities can modify the state in ZooKeeper, without the involvement of the controller. As discussed earlier, these issues make it difficult to know whether the state in memory on the controller truly reflects the persistent state in ZooKeeper.

Note that this diagram is more or less misleading. Brokers other than the controller also communicate with ZooKeeper. So, in fact, a line should be drawn from each broker to ZK. However, drawing that many lines would make this diagram difficult to read. Another issue that this diagram misses is that external command line tools and programs can modify state in the ZooKeeper without involving the controller. As mentioned earlier, these issues make it difficult to know if the state in memory on the controller truly reflects the persistent state in the ZooKeeper

 

In the proposed architecture, three controller nodes substitute for the three ZooKeeper nodes. The controller nodes and the broker nodes run in separate JVMs. The controller nodes elect a single leader for the metadata partition, shown in orange. Instead of the controller pushing out updates to the brokers, the brokers pull metadata updates from this leader. That is why the arrows point towards the controller rather than away.

In "proposed"The architecture of the pattern has three controller nodes instead of three ZooKeeper nodes. controller nodes and broker nodes run in separate JVMs. The three quasi-controller nodes elect a leader based on metadata partitioning, thus becoming the true cluster controller (Translator: just to clarify, the metadata partition refers to partition 0 of the internal topic '__cluster_metadata', which of course also has only one partition), as shown in orange. broker gets metadata updates from that leader, rather than the controller pushing updates to the broker. That's why the arrow is pointing towards the controller instead of away from it

Note that although the controller processes are logically separate from the broker processes, they need not be physically separate. In some cases, it may make sense to deploy some or all of the controller processes on the same node as the broker processes. This is similar to how ZooKeeper processes may be deployed on the same nodes as Kafka brokers today in smaller clusters. As per usual, all sorts of deployment options are possible, including running in the same JVM.

Note that although controller processes are logically separate from broker processes, they do not need to be physically separate. In some cases, it may make sense to deploy some or all of the controller processes on the same nodes as the broker processes. This is similar to a small cluster where the ZooKeeper process may also be deployed on the same node as the broker process. As before, various deployment modes are possible, including running in the same JVM

Controller ArbitrationThe Controller Quorum

The controller nodes comprise a Raft quorum which manages the metadata log. This log contains information about each change to the cluster metadata. Everything that is currently stored in ZooKeeper, such as topics, partitions, ISRs, configurations, and so on, will be stored in this log.

These controller nodes consist of a Raft arbitration that manages a metadata log that contains information about all metadata changes. Each one is initially stored in theZooKeeper metadata, such as topics, partitions, ISRs, configurations, etc., are now stored in this log

 

Using the Raft algorithm, the controller nodes will elect a leader from amongst themselves, without relying on any external system. The leader of the metadata log is called the active controller. The active controller handles all RPCs made from the brokers. The follower controllers replicate the data which is written to the active controller, and serve as hot standbys if the active controller should fail. Because the controllers will now all track the latest state, controller failover will not require a lengthy reloading period where we transfer all the state to the new controller.

Using the Raft consensus algorithm, these quasi-controller nodes will elect a leader among themselves without relying on any external system. This leader, who manages the metadata logs, is called the active controller, which handles all RPC requests from brokers, while all follower controllers synchronize data from the active controller and act as a hot standby to take over when the active controller goes down. The active controller will handle all the RPC requests from the brokers, while all the follower controllers will synchronize their data from the active controller and act as a hot standby at all times. Because these follower controllers all keep track of the latest state, transferring all state to the new controller does not require a long reload.

 

Just like ZooKeeper, Raft requires a majority of nodes to be running in order to continue running. Therefore, a three-node controller cluster can survive one failure. A five-node controller cluster can survive two failures, and so on.

just asZooKeeper, Raft also requires a majority of nodes to survive in order to continue providing services. So a 3 node controller cluster can tolerate 1 node down, a 5 node cluster can tolerate 2 nodes down, etc....

 

Periodically, the controllers will write out a snapshot of the metadata to disk. While this is conceptually similar to compaction, the code path will be a bit different because we can simply read the state from memory rather than re-reading the log from disk.

theseController writes metadata information to local disk, and while this is conceptually somewhat similar to "compaction", the code implementation is slightly different because we are simply reading the data directly from memory rather than from disk (Translator: This is a bit of a roundabout way of saying that the controller node compacts the metadata in memory and then stores it on disk, but when the program actually uses the data, it must actually get it directly from memory. What does compaction-compaction mean here? Actually corresponds to the kafka compaction type of topic, the concept to its to here. For example, the number of partitions of a topic, at the beginning was 3, later became 5, and then later eventually became 10, then there are a total of 3 in the number of log entries, but after the real compaction, will only keep the most recent one log can be

Broker Metadata Management (BMM)

Instead of the controller pushing out updates to the other brokers, those brokers will fetch updates from the active controller via the new MetadataFetch API.

Broker will use the newly defined API to pull metadata from the active controller instead of thePushing data from controller to broker in ZooKeeper era

 

A MetadataFetch is similar to a fetch request. Just like with a fetch request, the broker will track the offset of the last updates it fetched, and only request newer updates from the active controller.

A metadata pull is much like a fetch request pull. Like a fetch request, the broker will send a metadata pull request to the active controller using the latest locally stored offset (Translator: Note here that the so-called fetch request is the API called when the consumer consumes the message, and the follower pulls the message from the leader using that API.

 

The broker will persist the metadata it fetched to disk. This will allow the broker to start up very quickly, even if there are hundreds of thousands or even millions of partitions. (Note that since this persistence is an optimization, we can leave it out of the first version, if it makes development easier.)

And the broker will pull metadata persisted to disk. So even if there are thousands, or even millions of partitions, the broker is guaranteed to start up quickly. (Note, however, that the persistence operation is an optimization that we can leave out of the first release)

 

Most of the time, the broker should only need to fetch the deltas, not the full state. However, if the broker is too far behind the active controller, or if the broker has no cached metadata at all, the controller will send a full metadata image rather than a series of deltas.

In most cases, the broker only needs to pull the incremental update portion of the metadata, not the full amount. However, if the metadata in the broker is too far behind the active controller, or if the current broker doesn't store any metadata information at all, the active controller will just send the full amount of metadata information

 

The broker will periodically ask for metadata updates from the active controller. This request will double as a heartbeat, letting the controller know that the broker is alive.

The broker will periodically send a metadata pull request to the active controller, which can also act as a heartbeat to let the controller know that the broker is still alive

 

Note that while this KIP only discusses broker metadata management, client metadata management is important for scalability as well. Once the infrastructure for sending incremental metadata updates exists, we will want to use it for clients as well as for brokers. After all, there are typically a lot more clients than brokers. As the number of partitions grows, it will become more and more important to deliver metadata updates incrementally to clients that are interested in many partitions. We will discuss this further in follow-on KIPs.

Note that this KIP only discusses broker's management of metadata, client's metadata management is also very important for scalability. Once this architecture for incrementally updating metadata is implemented, we would likewise want to port this mechanism to the client side; after all, there will usually be many more clients than brokers. As the number of partitions grows, the way the client incrementally updates metadata becomes more and more important. We'll talk about it in other KIPs in the future!

Broker state machine (The Broker State Machine

Currently, brokers register themselves with ZooKeeper right after they start up. This registration accomplishes two things: it lets the broker know whether it has been elected as the controller, and it lets other nodes know how to contact it.

Currently, as soon as the broker starts, it sends a request to theThe ZooKeeper initiates registration, which accomplishes two things: it lets the broker know if it itself has been selected as a controller, and it lets other nodes know how to contact it

 

In the post-ZooKeeper world, brokers will register themselves with the controller quorum, rather than with ZooKeeper.

And in de-ZooKeeper mode (Translator: Note that the original author mentioned "post-ZooKeeper" several times, which directly translates to "go to ZooKeeper", which is actually KRaft, so readers should pay attention to it.), the broker will initiate registration directly with the controller instead of the ZooKeeper

 

Currently, if a broker loses its ZooKeeper session, the controller removes it from the cluster metadata. In the post-ZooKeeper world, the active controller removes a broker from the cluster metadata if it has not sent a MetadataFetch heartbeat in a long enough time.

Currently, if a broker's ZooKeeper session expires, the controller removes it from the metadata. And in the de-ZooKeeper mode, when a broker does not send a heartbeat request for MetadataFetch to the controller within the specified time, at this time it will be removed from the metadata by the active controller

 

In the current world, a broker which can contact ZooKeeper but which is partitioned from the controller will continue serving user requests, but will not receive any metadata updates. This can lead to some confusing and difficult situations. For example, a producer using acks=1 might continue to produce to a leader that actually was not the leader any more, but which failed to receive the controller's LeaderAndIsrRequest moving the leadership.

In the current ZooKeeper paradigm, if a broker can connect to the ZooKeeper, but not to the controller, it can no longer receive metadata updates, and it can continue to process requests sent by users. This can lead to some confusing as well as complex scenarios. For example, when the producer sets the parameter acks=1, the target broker may not be the leader anymore, but the producer will keep sending data to it because it can no longer receive the LeaderAndIsrRequest to switch leaders from the controller.

 

In the post-ZK world, cluster membership is integrated with metadata updates. Brokers cannot continue to be members of the cluster if they cannot receive metadata updates. While it is still possible for a broker to be partitioned from a particular client, the broker will be removed from the cluster if it is partitioned from the controller.

In the de-ZooKeeper model, the membership of a cluster broker is integrated with the metadata. When a broker fails to receive metadata changes, it will no longer be a member of the cluster. While it is possible to isolate a broker from a specific client, if the broker is not able to establish a connection with the controller, the broker will be removed from the cluster.

Broker States

Offline (Offline

When the broker process is in the Offline state, it is either not running at all, or in the process of performing single-node tasks needed to starting up such as initializing the JVM or performing log recovery.

When a broker goes offline, it's either not running at all, or it's just starting up and performing some single-node startup tasks, such as JVM initialization or log recovery.

Isolation (Fenced

When the broker is in the Fenced state, it will not respond to RPCs from clients. The broker will be in the fenced state when starting up and attempting to fetch the newest metadata. It will re-enter the fenced state if it can't contact the active controller. Fenced brokers should be omitted from the metadata sent to clients.

When a broker is in isolation, it will not respond to RPC requests from clients, and it will be in isolation when it starts up and is ready to pull metadata. If it finds itself unable to establish a connection with the controller, it will also go into isolation again. A broker in isolation will not send metadata to the client.

Online (Online

When a broker is online, it is ready to respond to requests from clients.

When a broker is online, it responds to requests sent from the client

Offline (Stopping)

Brokers enter the stopping state when they receive a SIGINT. This indicates that the system administrator wants to shut down the broker.

When the broker receives the specified signal, the broker will go offline, indicating that the system administrator wants the broker to go down.

 

When a broker is stopping, it is still running, but we are trying to migrate the partition leaders off of the broker.

When a broker is being stopped, it is actually still running and we are trying to migrate the leader to another broker

 

Eventually, the active controller will ask the broker to finally go offline, by returning a special result code in the MetadataFetchResponse. Alternately, the broker will shut down if the leaders can't be moved in a predetermined amount of time.

Finally, the active controller will tell this broker that it can actually go offline by returning a specific result code on the MetadataFetchResponse request. Or if the LEADER transfer operation cannot be completed within the specified time, the broker will also be forced offline

Forwarding has beenAPIRequests toController(Transitioning some existing APIs to Controller-Only)

Many operations that were formerly performed by a direct write to ZooKeeper will become controller operations instead. For example, changing configurations, altering ACLs that are stored with the default Authorizer, and so on.

Many of the operations that students previously needed to write to ZooKeeper will become controller operations. For example, modifying configuration, changing ACLs for default permissions, etc.

 

New versions of the clients should send these operations directly to the active controller. This is a backwards compatible change: it will work with both old and new clusters. In order to preserve compatibility with old clients that sent these operations to a random broker, the brokers will forward these requests to the active controller.

The new client version should send these requests directly to the active controller, which is a forward-compatible operation: it will work for both the old cluster and the new one. Previously, the old client would send requests to a random broker, and to be forward-compatible, the broker will forward these requests to the controller

New Controller APIs (New Controller APIs)

In some cases, we will need to create a new API to replace an operation that was formerly done via ZooKeeper. One example of this is that when the leader of a partition wants to modify the in-sync replica set, it currently modifies ZooKeeper directly In the post-ZK world, the leader will make an RPC to the active controller instead.

In some cases, we need to create new APIs to replace operations that were previously done through ZooKeeper. For example, if the leader needs to modify an ISR list, the current situation is that the leader will modify the ZooKeeper directly, whereas in the de-ZooKeeper mode, the leader will send an RPC request to the active controller

Removing Direct ZooKeeper Access from Tools

Currently, some tools and scripts directly contact ZooKeeper. In a post-ZooKeeper world, these tools must use Kafka APIs instead. Fortunately, "KIP-4: Command line and centralized administrative operations" began the task of removing direct ZooKeeper access several years ago, and it is nearly complete.

Currently, some tools and scripts are built directly with ZooKeeper. In a de-ZooKeeper model, these tools must be replaced by the Kafka API. Fortunately, the "KIP-4: Command line and centralized administrative operations" KIP has been working on removing direct operations from ZooKeeper for a couple of years now, and it's pretty much done!

Compatibility, Deprecation, and Migration Plan (CDPM)

Client Compatibility

We will preserve compatibility with the existing Kafka clients. In some cases, the existing clients will take a less efficient code path. For example, the brokers may need to forward their requests to the active controller.

We will provide compatibility for currently existing versions of the kafka client. In some scenarios, the existing client may not have much of an impact on the code, such as when the broker forwards certain requests to the active controller (Translator: Indeed, in this case, the client doesn't have to move at all, it just needs the broker backend to do some forwarding

Bridge Release

The overall plan for compatibility is to create a "bridge release" of Kafka where the ZooKeeper dependency is well-isolated.

The overall plan for compatibility is to create a "bridged version" where dependencies on ZooKeeper are well isolated (Translator: In fact, the version supports both KRaft and ZooKeeper versions, using this so-called bridge version, and by the time it is done, the dependency on ZooKeeper will be completely removed.

Rolling Upgrade

The rolling upgrade from the bridge release will take several steps.

The rolling update of the bridged version will go through several steps

Upgrade to the Bridge Release

The cluster must be upgraded to the bridge release, if it isn't already.

If the cluster has not yet been upgraded to the bridged version, it must first upgrade the

Start the Controller Quorum Nodes

We will configure the controller quorum nodes with the address of the ZooKeeper quorum. Once the controller quorum is established, the active controller will enter its node information into /brokers/ids and overwrite the /controller node with its ID. This will prevent any of the un-upgraded broker nodes from becoming the controller at any future point during the rolling upgrade.

We will configure the ZooKeeper access point on the controller node. Once controller arbitration is complete, i.e., the active controller is elected, the active controller will add its broker ID to the ZooKeeper's /brokers/ids path and forcefully replace the contents of the /controller path with its own ID, this action will prevent all broker nodes that have not yet been upgraded to KRaft from becoming controller nodes during future rolling upgrades.

Once it has taken over the /controller node, the active controller will proceed to load the full state of ZooKeeper. It will write out this information to the quorum's metadata storage. After this point, the metadata quorum will be the metadata store of record, rather than the data in ZooKeeper.

Once it takes over the /controller node, the active controller will load the full state of ZooKeeper, which it then writes to the KRfat metadata store (Translation: it actually writes this metadata information to the built-in topic:__cluster_metadata), after which the storage of metadata will be moved from ZooKeeper to KRfat mode

 

We do not need to worry about the ZooKeeper state getting concurrently modified during this loading process. In the bridge release, neither the tools nor the non-controller brokers will modify ZooKeeper.

We don't need to worry about ZooKeeper state being concurrently modified by other brokers or clients during this loading process. In the bridged version, neither the tool nor the non-controller brokers will modify the ZooKeeper

 

The new active controller will monitor ZooKeeper for legacy broker node registrations. It will know how to send the legacy "push" metadata requests to those nodes, during the transition period.

During this transition period of cluster mode change, this new active controller will monitor the delayed registration of brokers on the ZooKeeper, and it will synchronize these changes on the ZooKeeper to the KRfat mode

Roll the Broker Nodes

We will roll the broker nodes as usual. The new broker nodes will not contact ZooKeeper. If the configuration for the zookeeper server addresses is left in the configuration, it will be ignored.

We can just update these broker nodes on a normal rolling basis. The new brokers will no longer be connected to the ZooKeeper, and if these brokers find out that the ZooKeeper connection string is not configured, they won't care, they'll just ignore it!

Roll the Controller Quorum

Once the last broker node has been rolled, there will be no more need for ZooKeeper. We will remove it from the configuration of the controller quorum nodes, and then roll the controller quorum to fully remove it.

Once the last broker node has been updated, no more nodes will need ZooKeeper. We also need to remove it from the Controller's configuration, and once the rolling upgrade is complete, the dependency on ZooKeeper will be removed from the entire cluster

Rejected Alternatives (Rejected Alternatives)

Pluggable Consensus (Pluggable Consensus)

Rather than managing metadata ourselves, we could make the metadata storage layer pluggable so that it could work with systems other than ZooKeeper. For example, we could make it possible to store metadata in etcd, Consul, or similar systems.

Instead of managing the metadata ourselves, we can abstract a metadata storage layer that we can then interface with other systems, not just ZooKeeper. e.g. we can easily store the data in etcd, consul, or other detailed system

 

Unfortunately, this strategy would not address either of the two main goals of ZooKeeper removal. Because they have ZooKeeper-like APIs and design goals, these external systems would not let us treat metadata as an event log. Because they are still external systems that are not integrated with the project, deployment and configuration would still remain more complex than they needed to be.

Unfortunately, this solution does not address the goal of deleting ZooKeeper. Because this solution will have APIs and design goals similar to those of ZooKeeper, these external systems will not allow us to manage metadata like event logs. Because they are still an external system and not integral to Kafka, the complexity of deployment and configuration is still very high.

 

Supporting multiple metadata storage options would inevitably decrease the amount of testing we could give to each configuration. Our system tests would have to either run with every possible configuration storage mechanism, which would greatly increase the resources needed, or choose to leave some user under-tested. Increasing the size of test matrix in this fashion would really hurt the project.

Supporting multiple metadata storage options will inevitably increase (Translator: here is probably a typo by the original author, the original wrote decrease decrease) The amount of testing for each configuration. Our system tests are either run using every configuration storage mechanism possible, which would greatly increase the resources required, or we choose to leave some users in the under-test state. Increasing the size of the test matrix in this way can actually hurt the project

Additionally, if we supported multiple metadata storage options, we would have to use "least common denominator" APIs. In other words, we could not use any API unless all possible metadata storage options supported it. In practice, this would make it difficult to optimize the system.

In addition, if we support more than one metadata storage option, we will have to use the Least Common Denominator API; in other words, we can't use an API unless all possible metadata storage options support it, which in practice makes optimization of the system difficult!

Follow-up (Follow-on Work

This KIP expresses a vision of how we would like to evolve Kafka in the future. We will create follow-on KIPs to hash out the concrete details of each change.

This KIP expresses a vision of how we want to evolve Kafka in the future. We will create subsequent KIPs to discuss the specifics of each change in detail

  • KIP-455: Create an Administrative API for Replica Reassignment
  • KIP-497: Add inter-broker API to alter ISR
  • KIP-543: Expand ConfigCommand's non-ZK functionality
  • KIP-555: Deprecate Direct Zookeeper access in Kafka Administrative Tools
  • KIP-589 Add API to update Replica state in Controller
  • KIP-590: Redirect Zookeeper Mutation Protocols to The Controller
  • KIP-595: A Raft Protocol for the Metadata Quorum
  • KIP-631: The Quorum-based Kafka Controller

References

The Raft consensus algorithm

  • Ongaro, D., Ousterhout, J. In Search of an Understandable Consensus Algorithm

Handling Metadata via Write-Ahead Logging

  • Shvachko, K., Kuang, H., Radia, S. Chansler, R. The Hadoop Distributed Filesystem
  • Balakrishnan, M., Malkhi, D., Wobber, T. Tango: Distributed Data Structures over a Shared Log