I. Preface
Kafka provides high-performance read and write, and these read and write operations are operated on the Topic, the creation of the Topic is particularly critical, which involves partitioning allocation strategy, state flow, etc., and the new statement of the Topic is very simple
bash \
--bootstrap-server localhost:9092 \ // Need to write to endpoints
--create --topic topicA // The name of the topic to create.
--partitions 10 // Number of partitions of the topic to be created.
--replication-factor 2 // Replication factor, i.e. how many replicas to create per TP
Therefore, the creation of a Topic may not be as simple as it seems, so in this section we'll go over the details of creating a new Topic.
The following discussion is based on Kafka version 2.8.2.
II. Overall process
Topic new construction is divided into 2 parts, which are
- The user calls the corresponding API, and then the Controller specifies the partition allocation policy and persists it to the Zookeeper
- The controller is responsible for listening to the Zookeeper's callback function to get the metadata changes, triggering the state machine and actually executing the replica allocation.
The user initiates a request for a new Topic, the Controller receives the request, starts to formulate a partition allocation scheme, then persists the allocation scheme into the Zookeeper, and then returns the result to the user.
In the Controller specifically listening to the Zookeeper node changes in the thread (of course, this thread is asynchronous with the thread to create the Topic), when a change is found, it will asynchronously trigger the state machine for the state of the state of the flow, and then the corresponding Broker will be set up as the Leader or Follower
III. Topic Partition Allocation Program
In Module 1, the main process is in 3 parts:
- User initiates a request to add a new Topic to Controller
- Once the Controller receives the request, it begins to develop a partition allocation policy for that Topic
- Controller persists the formulated policy to the Zookeeper
The above description, process 1, 3 are relatively well understood, we focus on process 2, that is, partition allocation strategy. kafka partitioning scheme core logic in scala/kafka/admin/, is divided into two kinds of inorganic racks, organic racks, we look at the core of the inorganic racks of strategies
The rackless strategy is further divided into Leader Replica and Follow Replica.
3.1、Leader Partition
The allocation strategy for Leader and Follower is unified in the method #assignReplicasToBrokersRackUnaware, which is only about 20 lines long, so let's take a look at it.
private def assignReplicasToBrokersRackUnaware(nPartitions: Int, // goaltopicTotal number of partitions
replicationFactor: Int, // topicReplica Factor
brokerList: Seq[Int], // brokerlistings
fixedStartIndex: Int, // Default pass-1
startPartitionId: Int /* Default pass-1 */): Map[Int, Seq[Int]] = {
val ret = [Int, Seq[Int]]()
val brokerArray =
// leaderbe directed againstbrokerlistings的commencementindex,The default will be randomly selected
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else ()
// default0,through (a gap)0commencement
var currentPartitionId = (0, startPartitionId)
// This value is primarily for assigningFollower Partitionparticle signaling a pause for emphasis
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else ()
// 这里commencement对partitionperform cyclic traversal
for (_ <- 0 until nPartitions) {
// This judgment logic,affect (usually adversely)follower partition
if (currentPartitionId > 0 && (currentPartitionId % == 0))
nextReplicaShift += 1
// be facing (us)partitionfirstreplica,i.e.leader
// due tostartIndexIt's randomly generated.,consequentlyfirstReplicaIndex也是through (a gap)broker listRandomly take one of the
val firstReplicaIndex = (currentPartitionId + startIndex) %
// 存储了be facing (us)partitionallreplicaarrays
val replicaBuffer = (brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, ))
(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
It can be seen that the allocation strategy for Topic Leader Replica is relatively simple, and we briefly summarize its process again
- From Broker ListrandomizationSelect a Broker to be the Leader of Partition 0.
- After that, start traversing the Broker List and create Partition 1, Partition 2, Partition 3 in order ....
- If the traversal reaches the end of the Broker List, then redirect to 0 and continue traversing backward
Assuming that we have 5 Brokers, numbered starting from 1000, which are 1000, 1001, 1002, 1003, and 1004, and assuming that the randomly elected broker for partition 0 is 1000, then the final allocation strategy will be as follows:
Broker |
1000 |
1001 |
1002 |
1003 |
1004 |
Leader Partition |
0 |
1 |
2 |
3 |
4 |
5 |
6 |
7 |
8 |
9 |
And assuming that the randomly elected broker for partition 0 is 1002, the final allocation strategy will be as follows:
Broker |
1000 |
1001 |
1002 |
1003 |
1004 |
Leader Partition |
3 |
4 |
0 |
1 |
2 |
8 |
9 |
5 |
6 |
7 |
The purpose of this is to break up the Partition as much as possible and assign the Partition Leader to a different Broker to avoid data hotspots
However this solution is not perfect, it will just break up the currently created Topic Partition Leader and doesn't take into consideration theAllocation of other Topic PartitionsAssuming that we have created 5 Topics, all of which are single-partitioned, and they all happen to be on Broker 1000, the next time we create a new Topic, its Partition 0 will still be on Broker 1000, creating a data hotspot. However, because the creation is random, when there are enough Topics, it is still guaranteed to be relatively discrete.
3.2、Follower Partition
Leader Replica has been determined, the next step is to develop the Follower's allocation scheme, the Follower's allocation scheme should meet at least the following 2 requirements
- Follower should be randomly dispersed on different Broker, mainly to do high availability guarantee, when the Leader Broker is not available, Follower should be able to top up
- Follower's allocation alsoCan't be too random., because if they are really all randomly assigned, it is possible that one Broker has more replica than the other Broker, and this can be avoided
In addition to the #assignReplicasToBrokersRackUnaware method described above, an important part of the Follower Replica allocation logic is #replicaIndex.
private def replicaIndex(
firstReplicaIndex: Int, // firstreplica(used form a nominal expression)index,i.e.leader index
secondReplicaShift: Int, // randomizationshift,The scope is[0, ),at intervals of,commander-in-chief (military)+1
replicaIndex: Int, // be facing (us)followerCopy No.,through (a gap)0commencement
nBrokers: Int): Int = { // brokerquantities
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
There are only 2 lines of code in this method, but they are quite obscure and not very easy to understand, and there are no comments for it in version 2.8.2. I've looked at the latest version of the community, 3.9.0-SNAPSHOT, and there are still no comments for this method. But we still need to put some effort into understanding it!
first line
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
The effect of this line of code is to generate a random value shift, so the range of shift is 0 <= shift < nBrokers, and as replicaIndex increases, shift increases accordingly, but of course the purpose of doing this is to set the stage for the second line of code
Of course the shift value will only be relevant to the secondReplicaShift, replicaIndex, not the partition
second line
(firstReplicaIndex + shift) % nBrokers
This code ensures that the generated follower index does not duplicate the Leader index and that all follower indexes are incremented forward
Summarize the rules of distribution:
- Randomly select one from the Broker list as the starting position for the first follower (controlled by the variable secondReplicaShift)
- Subsequent follower are all based on the starting position of step 1, sequentially backward +1
-
The position of the follower ensures that it does not conflict with the Leader, and if it does, it is deferred back one place (by the
(firstReplicaIndex + shift) % nBrokers
(control) -
Not all partitions of the current Topic are at the same pace, once (
PartitionNum%BrokerNum == 0
), secondReplicaShift will be +1, resulting in +1 to the start of the first follower, which is even more discrete
Let's look at a specific case:
Broker |
1000 |
1001 |
1002 |
1003 |
1004 |
Leader |
0 |
1 |
2 |
3 |
4 |
5 |
6 |
7 |
8 |
9 |
|
Follower 1 |
1 |
2 |
3 |
4 |
0 |
9 |
5 |
6 |
7 |
8 |
|
Follower 2 |
4 |
0 |
1 |
2 |
3 |
8 |
9 |
5 |
6 |
7 |
- Partition 1: Leader is on 1001, while the 2 Follower are on 1000 and 1002. Obviously.Follower is traversed backward from 1000.looking for, so the distribution of the 2 Follower should have been 1000, 1001, but 1001 happened to be the Leader, so shifted backward and downward, and the final distribution of Follower is also [1000, 1002].
- Note: Why does "Follower is traversed backward from 1000"? This has to do with the shift variable in the #replicaIndex method, which is determined by the random variable secondReplicaShift, so "1000 and back" is the result of this random run, if you run the program again, the results may be different. If you run the program again, the results may not be the same.
- Partition 3: Look at Partition 3 again, Leader is on 1003, Follower is from 1002, so the distribution of Follower is also [1002, 1004].
- Partition 7: Since the total number of brokers is exceeded from partition 5, the variable secondReplicaShift++ causes the starting index of the Follower to be +1 as well, so the distribution of the Follower is [1003, 1004]
Why go through all the trouble of making such a complicated program setup? Wouldn't it be possible to use the N Brokers directly behind the Leader Broker as Follower? In fact, naturally it can, but it may bring some problems, for example, if the Leader is down, these Leader Partitions will float to a certain 1 or a few Broker, which may bring some hotspot hazards, resulting in the survival of BrokerCan't share these flows equally
3.3. Manual formulation of strategies
Of course the above is Kafka helps us to automate the partition allocation scheme, in addition we can manually set the strategy:
bash \
--bootstrap-server localhost:9092 \
--create --topic topicA \
--replica-assignment 1000,1000,1000,1000,1000
Following the above command to create a topic, we will create a new topic with the name "topicA", which has 5 partitions, all of which will be created on Broker ID 1000.
In addition, Kafka also supports rack (rack) priority partition allocation scheme, that is, try to a partition of the replica evenly dispersed to the N rack, so as to ensure that a rack is unavailable, does not affect the overall external service capability of the partition. This article will not expand on this case
IV. State machines
Once the partition allocation scheme has been formulated, the Controller then encodes this scheme and converts it to a binary byte[], which in turn is persisted to the ZooKeeper with a path of/topics/topicXXX
(where topicXXX is the name of the topic) within the path, and then returned to the user to create a successful prompt; however, the logic of the real creation of the Topic does not end, the Controller will be asynchronous execution of subsequent operations to create the Topic, the source code in the logic of the relatively rounded, but no more than to do the following two things:
- Updated metadata and notified to all Brokers
- Propagate ISR to each Broker and perform Make Leader, Make Follower operations accordingly.
Instead, the above operations are realized through two state machines:
-
partition state machine
-
replica state machine
The entry point for Controll to receive ZK asynchronous notifications is#processTopicChange
4.1. Partitioned state machines
That is, the state of a PARTITION, corresponding to the assertion class, has 4 states:
- A NewPartition is a newly created state that actually stays in the Controll for a short period of time, and then converts to an OnlinePartition.
- OnlinePartition Online state, only in the online state of the partition can provide services to the outside world.
- OfflinePartition Offline state, e.g. Topic deletion.
- NonExistentPartition Initialization state, if a new Topic is created, the partition will be in this state by default.
The conversion relationships are as follows
This article only discusses the process of state transitions when a new Topic is created, so it only deals with the
- NonExistentPartition -> NewPartition
- NewPartition -> OnlinePartition
4.2. Copy state machine
The so-called replica state machine, corresponding to the declaration class, has seven states: NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionIneligible, NonExistentReplica, ReplicaDeletionIneligible, NonExistentReplica. In the process of Topic new creation, we will only be involved in three of them: NewReplica, OnlineReplica, NonExistentReplica, and the replica state machine plays a limited space in the new creation process. space is limited, not the focus of this article, the reader has a general concept of its can be
4.3. State flow
First of all, let's make sure that Kafka's Controller is single-threaded, all events are executed serially, and all of the following operations are also executed serially
There are 2 pre-steps that need to be performed before the state flow is actually executed
-
Produces a unique Topic ID for a newly created Topic.
#setTopicIds
Inside, it's really just a simple call to#randomUuid
to generate a random string -
Read the partition allocation policy. Then read the partition allocation policy from zk (storage path
/brokers/topics/topicName
) reads the partition allocation policy for this Topic, and then puts the partition allocation policy into the cache, which is located in the#partitionAssignments
There's not much to say about the above two steps, they're just some preliminaries for the state flow. The next step is to get into the logic of the main method, the#onNewPartitionCreation
This method can be viewed briefly and performs 4 main parts
- Partition state machine sets state to NewPartition
- The replica state machine downstates to NewReplica.
- Partition state machine sets state to OnlinePartition
- The replica state machine downstates to OnlineReplica.
// #onNewPartitionCreation
private def onNewPartitionCreation(newPartitions: Set[TopicPartition]): Unit = {
info(s"New partition creation callback for ${(",")}")
(, NewPartition)
((newPartitions).toSeq, NewReplica)
(, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy(false)))
((newPartitions).toSeq, OnlineReplica)
}
4.3.1, Partition state machine NewPartition
The partition state machine sets the state to NewPartition.This step is the maintenance of the#partitionStates
memory variable, set the state of the corresponding partition to NewPartition, and do nothing else
4.3.2 Replica state machine NewReplica
The replica state machine downstates to NewReplica. this step is the first step in the process of maintaining the#replicaStates
memory variable that sets the replica state to NewReplica
4.3.3, Partition state machine OnlinePartition
This step is also the core part of the entire state machine flow, which is divided into the following 5 major steps:
- Initialize Leader, ISR, etc. and staging this information into zk
- Create path to topic-partition in zk with path /brokers/topics/topicName/partitions
- Create paths for each partition with path /brokers/topics/topicName/partitions/xxx, for example
- /brokers/topics/topicName/partitions/0
- /brokers/topics/topicName/partitions/1
- /brokers/topics/topicName/partitions/2
- Persist the Leader and ISR information at path /brokers/topics/topicName/partitions/0/state
-
And then the information that has been persisted to zk by Leader, ISR, etc. is put into the cache
#partitionLeadershipInfo
center -
Because the metadata of Leader, ISR, changed, this information is recorded and placed in the memory structure
#leaderAndIsrRequestMap
that indicates that this information is required to be synchronized to the corresponding Broker's -
safeguard
#partitionStates
memory variable that sets the state to OnlinePartition - Calling the interface ApiKeys.LEADER_AND_ISR.Sends data to the corresponding BrokerWhen the Broker receives this request, it performs the MakeLeader/MakeFollower operations.
4.3.4, Replica state machine OnlineReplica
The replica state machine down state is set to OnlineReplica. maintenance.#replicaStates
memory variable that sets the state to OnlineReplica
At this point, a Kafka Topic is actually created.