Location>code7788 >text

Kafka principle analysis of the "Purgatory (Purgatory | Wheel of Time)"

Popularity:40 ℃/2024-10-15 15:07:20

I. Preface

This article introduces the Kafka hefty component Purgatory, I believe that friends who do Kafka more or less have some knowledge of it, at least heard its name. What is its role, used to solve what problem? The official website confluent has long had an article on it to do a description of the

/confluence/pages/?pageId=34839465

Here's a quick summary: Purgatory is used to store requests that are in a temporary or waiting state, where certain conditions may not have been met and are being managed temporarily. After these conditions are met, or the request times out, these requests will be efficiently called back by the Purgatory, then continue to execute the subsequent logic

On a side note, why did Kafka name it "Purgatory"? Here's what wikipedia has to say about it

In the tradition of the Church, purgatory is the process of refining a person after death, the purification of the stains of sin from a person's body, an experience of being purified and refined in the process of a person's passing through death to reach the state of perfection (heaven)

I believe Purgatory emphasizes the temporary here, and there are other names such as Reaper, which shows that the original Kafka authors were still very literate :)

II. Evolution

Regarding the formation of the Purgatory component, it didn't happen overnight, it went through at least 2 major iterations.

  • Version 1: In Kafka version 0.8 and before, the first version was used, which at its core relied heavily on JUC's delayed queue (). However, most of the time, these delayed tasks that are put into the Purgatory don't really wait until the timeout. For example, in the case of acks=all, assuming that the default timeout is 1 second, i.e., you need to synchronize the data to all the follower within 1 second, the leader puts the data into the Purgatory and starts the callback waiting, but in most cases, the data may be synchronized in a few tens of milliseconds, and the callback will be executed to end the asynchronous operation, however, there are some tasks that exist in the deferred queue.DelayQueueThe request is not really deleted, it can only be found and deleted when it actually times out (after 1 second). Thus taking up expensive memory resources for nothing is a drawback, and there are some performance issues, as well as a log(N) time complexity for entries to be added and modified.
  • Version 2: In later versions, Kafka optimized it, introducing the excellent design Hierarchical Timing Wheel (multi-layer time wheel) concept, not only can immediately delete the completed task, but also make its performance soared, almost to the degree of constant O(1), about the specific version of the walkthrough and performance testing can see the official website articles

/confluence/display/KAFKA/Purgatory+Redesign+Proposal

/blog/apache-kafka-purgatory-hierarchical-timing-wheels/

However, the article does not expand on many details, there is no source code level process explanation, which is the original intention of this article was born, next we will have an all-encompassing introduction to Purgatory, including its design concept and source code analysis. Also: the source code is taken from the latest trunk branch of the community, that is, the release branch of 4.0.0 in the near future. Considering that Purgatory is already quite stable and mature, there won't be any major changes from the current trunk branch to 4.0.0.

III. Overall business processes

To get a global idea of the role Purgatroy plays, we take Consumer'sJoin GroupWhat Join Group does is also very simple, it needs to coordinate multiple Consumers to invoke the Join Group interface as quickly as possible within a certain time window, where the subsequent results are considered in two scenarios

  • After all Consumers have invoked Join Group within the time window, the Coordinator begins to develop a partition allocation strategy
  • At the end of the time window, only a portion of the Consumers have called the Join Group, so the Coordinator eliminates those who have not called the Join Group, and begins to formulate an allocation strategy for only those Consumers who have called the Join Group.

Assuming there is no Purgatory component right now, the process might look like the following if we implement it:

Assume that all 4 threads have arrived within the time window, but not in the same order of arrival. Thread 2 has arrived, and thread 1, thread 3, and thread 4 are all still on their way, so thread 2 has been in a pending state, and even if a new task arrives, thread 2 is unable to respond to the processing, and the corresponding thread 2 occupancy has been waiting until after the ack response to release it.inefficiency

The figure "Wait All Threads Ready" component how to realize it? In fact, you can use the JDK's CyclicBarrier or Semaphore or CountDownLatch can be realized, but not the focus of this article to discuss, not to expand!

How does Purgatory accomplish this? What role does it play in the whole process?

First of all, the process is still 4 threads come to call the interface successively, but the difference is that the thread that has arrived only needs to give the message that it has received (including callback, timeout and other basic data) to Purgatory, and then this thread will be released, it can go to deal with other tasks. Purgatory will take over the subsequent operations, including determining whether the conditions are met and whether the window has timed out. Once one of the conditions is met, Purgatory will execute a callback to ack response to each of these requests.

Knowing the role that Purgatory plays in the overall process, it's time to expand on the details of the component's internal implementation

IV. Purgatory Composition

Let's start by providing a flowchart of Purgatory's operations

4.1. Business threads

The business thread, which corresponds to the process in the left part of the above figure. The so-called business thread, that is, the use of Purgatory components as a staging request thread, such as Join Group, Producer ACKS = all, etc., although the business thread to call the Purgatory code is very simple, only one line, take Join Group as an example:

(delayedRebalance, Seq(groupKey))

But a lot was done within Purgatory:

  • First try to complete the call, if all the conditions have been met, then the current task succeeds directly and there is no need to interact with the time wheel
  • If the condition is not met, the task is stored in the time wheel
  • If the user sets a key, it also listens for TimeTasks with the same key.
    • In fact, it is to do batch operations on tasks with the same key, such as canceling them together, which will be mentioned later.

It can be seen that the business thread is only responsible for writing data to the time wheel, so when is the data in the time wheel cleared? This involves another core thread, ExpiredOperationReaper.

4.2. Harvesting threads

Inside Purgatory there is also a separate thread ExpiredOperationReaper, which we can translate as a harvesting thread or cleanup thread, whose role is to scan in real time for tasks that have expired and remove them from the time wheel. It is defined as follows

/**
 * A background reaper to expire delayed operations that have timed out
 */
private class ExpiredOperationReaper extends ShutdownableThread(
  "ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
  false) {

  override def doWork(): Unit = {
    advanceClock(200L)
  }
}

It is important to note that the harvesting thread is just a single thread, its role is just to find out those tasks that have expired in real time, and throw the subsequent callback logic to the thread pool, and then continue scanning, so it can be seen that its task is not too onerous

Here a brief mention of the "callback thread pool", from the above we know that the thread will be handed over to the Purgatory after the end of the mission, subsequent triggers have this "callback thread pool" in the thread to perform the definition of the thread pool as follows

 = (1,
            runnable -> (SYSTEM_TIMER_THREAD_PREFIX + executorName, runnable));

As you can see it is a single threaded and fixed number of threads thread pool. Why is it set to be single-threaded? If a callback blocks in one application, won't all callbacks in the thread pool block?

Indeed, but considering that the work this thread pool is doing is just callbacks, usually to the network sending module, the data is actually ready and the TPS response is very fast, so it's usually not a bottleneck either

From the above, it is clear that both business threads and harvesting threads are inextricably linked to the time wheel

V. Time Wheel (Timing Wheel)

Whether it is the management of delayed tasks, storage, removal and other core operations, they are all done by the time wheel, so the time wheel is the most central component in the whole Purgatory. Here we clarify the relationship between Purgatory and the time wheel, the time wheel is only a sub-concept of Purgatory, it is an internal component extracted in order to make Purgatory more efficient and faster performance.

5.1 Data structures

The data structure of the time wheel is also relatively simple, consisting of a wheel + a bidirectional linked table

Wheels:

Two-way list:

The structure of the wheel + two-way list is then:

The main reason for designing a bidirectional chain table is that the addition and deletion of Tasks is a very frequent thing, and our data structure should be able to ensure that these requests are handled efficiently, while a bidirectional cyclic chain table ensures that the addition and deletion of any Task node maintains an O(1) time complexity, and is therefore the obvious choice!

5.2 Adding and Removing Tasks

What does the process of adding and removing specific Tasks look like? Let's illustrate with an example:

Assuming that the granularity of my current time wheel is 10 seconds, i.e. one grid every 10 seconds

Now comes a task, this task Task1 to be triggered at the 35th second, at this point we find the 4th time frame and put this task here

Then came 2 more tasks that were triggered at 36 and 38 seconds, so again they will be placed in the 4th time frame

Similarly, it is not difficult to imagine that if a few more tasks come along, and they are triggered at 12, 18, 69, 62, 65, 53, and 54, then the time wheel will become as follows

What if a task comes in with a delay time of 100 seconds? Actually, this piece involves a multi-level time wheel

As for the removal of tasks, just refer to the bi-directional chain table to remove nodes

5.3. Multi-level time wheel

5.3.1 Definition of foundations

Multi-level time wheel, as the name suggests, i.e. there are many levels of time wheel, the higher up, the coarser the granularity, theoretically, as long as the memory is large enough, the time wheel can store an unlimited size of delayed tasks. The following figure shows a 2-level time wheel:

  • Inner time wheel: time granularity is 10 seconds, each time wheel has 8 grids, so the inner time wheel can store tasks from 0-80 seconds
  • Outer time wheel: time granularity is 80 seconds, again with 8 grids, while the outer time wheel can store tasks from 0-640 seconds

In fact, one grid of each outer time wheel corresponds to an inner time wheel, except that the above figure does not present this, so, picking up from the above, if we were to store a 100-second task when the current time wheel is found to be out of bounds, it would be mindlessly thrown upwards until it finds a time wheel that can catch this timeout, the upper time wheel has a much larger span, and so the 100-second task would fall on the "81-160" grid. Although the upper time wheel takes over the task, it will be the finest-grained time wheel that handles the task, i.e., the inner time wheel corresponding to the "81-160" grid will ultimately accept the task and trigger the callback in the "Clock Simulation" chapter. We'll expand on this in the "Clock Simulation" section.

So in fact we do not care in the end Purgatory will have how many layers of time wheel, theoretically it may be infinite, we only need to know the finest granularity of the time wheel of the step + number of the wheel behind the composition can be deduced. Then Kafka set the finest granularity of the wheel step length and the number of respectively? The answer is hidden in the class constructor

public SystemTimer(String executorName) {
    this(executorName, 1, 20, ());
}

It can be seen that the finest-grained time step is 1ms and the number of individuals is 20, from which a table can be derived

level

pacemaker

number of individuals

Maximum time

1

1ms

20

20ms

2

20ms

20

400ms

3

400ms

20

8s

4

8s

20

160s (2 minutes and 40 seconds)

5

160s

20

3200s (53 minutes 20 seconds)

6

3200s

20

64000s (17h46m40s)

Can see the 6th layer, the length of the delay has reached 17 hours, and Kafka general case, may be enough to the 4th layer; and from the overall point of view, the finest granularity of the time wheel is accurate to 1ms, and can receive a theoretically unlimited length of the timed task, it can be said to be a godsend. However, there is a little doubt here, that is, the granularity is so fine, is there a performance problem? We'll get into that later.

5.3.2 Adding Tasks

In this section, we will take a look at the addition and removal of tasks in a multi-level time wheel. Regarding the addition of tasks, in a nutshell, if the target task exceeds the maximum time range of the current time wheel, then it will be thrown directly to the higher level of the time wheel; or to cite an example from the above, if the time wheel receives a delayed task that will be executed in 700 seconds, then it will be thrown directly to the higher level of the time wheel.

The first level time wheel, the finest granularity, ranges from 0-80 seconds and cannot be stored, then thrown upwards;

The secondary time wheel receives this task and realizes that the timeout is 700 seconds, while its own range is 0-640, still unable to store it, and continues to throw it upwards

After receiving the task, the third-level time wheel still checks the range of time it can receive, and realizes that it is 0-5120 seconds, and 700 seconds is within its own range, and then calculates in which grid the 700-second task should fall, and finally it is stored in the grid of 641-1280

To summarize: Tasks are always added to the most granular time wheel first, and then cascaded up the hierarchy until a wheel is found that can take over the task and then store it in the corresponding grid.

5.3.2 Task Removal

Routinely, the removal action for a Task at a high Level that hasn't really expired yet is to put it into a finer-grained time wheel, or as illustrated in the above example

  1. Now the 700-second Task is placed in the "641-1280" grid (TaskList) of the three-level time wheel, which will expire in 640 seconds.
  2. Now that the clock has just passed 640 seconds, the grid "641-1280" is pushed out, and it is found that there is a task with a 700-second timeout, but it hasn't really timed out yet, because the current time is 640 seconds.
  3. And then this task will be re-added to the time wheel, because the clock has passed 640 seconds, so at this point both the primary and secondary time wheels have changed, and the secondary time wheel has been replaced with the following, so that the current task will be placed in the 641-720 grid
    1. 641-720
    2. 721-800
    3. 801-880
    4. 881-960
    5. 961-1040
    6. 1041-1120
    7. 1121-1200
    8. 1201-1280
  1. Whereas the 641-720 lattice corresponds to the following first-level time round, the 700-second task corresponds to the lattice 691-700, so in subsequent clock simulations, you really have to wait until the lattice 691-700 is woken up to invoke the
    1. 641-650
    2. 651-660
    3. 661-670
    4. 671-680
    5. 681-690
    6. 691-700
    7. 701-710
    8. 711-720

5.4. Clock simulation

The next step is the very important one, where Purgatory has to simulate the clock advancing forward in time, which triggers the relevant task to be awakened

5.4.1、

Before we really get into clock simulation, we need to lay out a key class under the JUC package. the entire clock simulation relies heavily on the capabilities of this delay queue. delayQueue has the following core methods:

  • put ( delayed) puts a delayed object into the delayed queue
  • Offer (delayed) vs. put
  • poll() will continue to block until it returns a delayed object that has expired, but will return null if there is no data in the current delay queue.
  • poll(long timeout, TimeUnit unit) is similar to poll(), except that the current method adds a timeout limit, and if the delay queue is empty, it doesn't return null immediately, but waits for a timeout.

Because DelayQueue only accepts objects, this object is defined as follows

/**
 * A mix-in style interface for marking objects that should be
 * acted upon after a given delay.
 *
 * <p>An implementation of this interface must define a
 * {@code compareTo} method that provides an ordering consistent with
 * its {@code getDelay} method.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

It can be seen as an interface, if used, we need to define a delay class and implement this interface. We can write a small example of a delay queue for a visualization

public class DelayedQueueExample {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedItem> delayQueue = new DelayQueue<>();

        (new DelayedItem(2000));
        (new DelayedItem(5000));
        (new DelayedItem(6000));

        while (!()) {
            DelayedItem delayedItem = (200, );
            if (delayedItem != null) {
                ("delayedItem content : " + delayedItem);
            } else {
                ("DelayedItem is null");
            }
        }
    }

    private static class DelayedItem implements Delayed {
        private final long expirationTime;

        public DelayedItem(long delayTime) {
             = () + delayTime;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            long diff = expirationTime - ();
            return (diff, );
        }

        @Override
        public int compareTo(Delayed other) {
            if (() < ()) {
                return -1;
            }
            if (() > ()) {
                return 1;
            }
            return 0;
        }
    }
}

In the above example, we put three pieces of data into the delay queue, they need to process the delay request time were 2 seconds, 5 seconds, 6 seconds, when calling the poll() method, you can receive the request callback exactly at the corresponding time, of course, this piece of high efficiency thanks to Doug Lea's JUC package!

Some students may say, since JUC's delayed queue can do all these things, why do we need the time wheel? Naturally, delayed queues have their own problems, see the "Evolution" module.

5.4.2 Delayed objects

So the elements put into the delay queue are these delayed Tasks? The answer is no, because once these tasks are put into the delay queue, it will become a burden to delete, and bring a lot of memory consumption (in fact, the first version of the Purgatory is designed this way), in fact, here the elements of the delay queue is a two-way circular list in the Wheel of Time, the following figure

Here's a little more discussion about adding and removing tasks, standing in for delayed queues

  • When the Task is added to a space in the time wheel, a TaskList object is created, which of course has only one element in the bidirectional list, and the TaskList is added to the delay queue.
  • When a Task is added to a grid with data, it is added directly to the TaskList's chain, since the TaskList has already been entrusted to a delayed queue, so there is no delayed queuing involved.
  • When a Task needs to be deleted, directly find the corresponding TaskList and remove it from the chain.
  • When the TaskList times out and is evoked by the delayed queue, these Tasks will be processed in turn, while if the chain list in the TaskList is empty, it will be skipped.

This not only perfectly avoids the operation of deleting elements in the delayed queue, but also perfectly solves the OOM problem, and the time complexity of adding and deleting elements is O(1)

5.4.3、Tick

The thread used for simulating clock advancement is the harvesting thread mentioned above, and the entry point for the method is#advanceClockIf all the operations have not timed out, then the harvesting thread actually has no business to process.

In fact, the core of the Tick operation is to call the poll operation of the delay queue, which is used to get those TimerTaskList that have timed out.

TimerTaskList bucket = (timeoutMs, );

However, the granularity of this Tick is per grid of the time wheel, so it is not consistent with the frequency of Tasks, usually a grid may contain more than one Task, these Tasks will be real business callbacks if they do time out in time, and if they do not time out, they will be rejoined to the time wheel

5.5、Watch Key

The so-called Watch Key is usually a set of lifecycle related data set the same key, so that when the condition is reached, the group of tasks can be unified callback, whether successful or canceled.

For example, when executing Group's Join operation, it is expected that there will be 10 consumers calling the Join interface, and then each consumer calls the interface with the watch key parameter (the watch key here can be set to the group name), as long as it is found that the number of calls to this key is full of 10, then you can call back these 10 delayed requests and remove them from the timeline.

VI. Source code analysis

We labeled the key part of the above diagram with the relevant classes

  • First the class corresponding to the entire time wheel is
  • The class corresponding to each grid on the time wheel is
  • The class corresponding to the elements in the chain table in each lattice is
  • Each element needs to store the TimerTask, which is an abstract class that also needs to be implemented by the user

In fact, through this diagram, we have a full picture of the classes involved in Purgatory, the main understanding here is TimerTask, because the other classes are wrapped inside the Purgatory component, do not need to be inherited, and does not involve changes. the definition of the TimerTask as follows

public abstract class TimerTask implements Runnable {
    private volatile TimerTaskEntry timerTaskEntry;
    public final long delayMs;
}

These two attributes are also better understood

  • timerTaskEntry: it is actually a 1:1 relationship with TimerTask, and there is also a reference to TimerTaskEntry in TimerTask.
  • delayMs: delay time, i.e. the time when the task will be triggered to be called in the future

However, it is not enough just to have this class, you also need to have callbacks to the relevant interfaces for some key operations, such as onComplete, onExpiration etc. Therefore, Kafka involves a subclass of TimerTask, DelayedOperation.

abstract class DelayedOperation(delayMs: Long,
                                lockOpt: Option[Lock] = None)
  extends TimerTask(delayMs) with Logging {

  private val completed = new AtomicBoolean(false)
  // Visible for testing
  private[server] val lock: Lock = (new ReentrantLock)

  /*
   * Force completing the delayed operation, if not already completed.
   * This function can be triggered when
   *
   * 1. The operation has been verified to be completable inside tryComplete()
   * 2. The operation has expired and hence needs to be completed right now
   *
   * Return true iff the operation is completed by the caller: note that
   * concurrent threads can try to complete the same operation, but only
   * the first thread will succeed in completing the operation and return
   * true, others will still return false
   */
  def forceComplete(): Boolean = {
    if ((false, true)) {
      // cancel the timeout timer
      cancel()
      onComplete()
      true
    } else {
      false
    }
  }

  /**
   * Check if the delayed operation is already completed
   */
  def isCompleted: Boolean = ()

  /**
   * Call-back to execute when a delayed operation gets expired and hence forced to complete.
   */
  def onExpiration(): Unit

  /**
   * Process for completing an operation; This function needs to be defined
   * in subclasses and will be called exactly once in forceComplete()
   */
  def onComplete(): Unit

  /**
   * Try to complete the delayed operation by first checking if the operation
   * can be completed by now. If yes execute the completion logic by calling
   * forceComplete() and return true iff forceComplete returns true; otherwise return false
   *
   * This function needs to be defined in subclasses
   */
  def tryComplete(): Boolean

  /**
   * Thread-safe variant of tryComplete() and call extra function if first tryComplete returns false
   * @param f else function to be executed after first tryComplete returns false
   * @return result of tryComplete
   */
  private[server] def safeTryCompleteOrElse(f: => Unit): Boolean = inLock(lock) {
    if (tryComplete()) true
    else {
      f
      // last completion check
      tryComplete()
    }
  }

  /**
   * Thread-safe variant of tryComplete()
   */
  private[server] def safeTryComplete(): Boolean = inLock(lock)(tryComplete())

  /*
   * run() method defines a task that is executed on timeout
   */
  override def run(): Unit = {
    if (forceComplete())
      onExpiration()
  }
}

All business classes need to inherit DelayedOperation and rewrite the relevant methods, the logic will not be repeated.

Summary: The above is just an analysis of the Purgatory design ideas and the general process, there are many multi-threaded concurrency related performance operations, Kafka are handled very beautiful, this article can not be enumerated, the reader is interested in reference to the article over the source code, I believe that there is a great benefit!