Location>code7788 >text

Implementing a current limiter based on the token bucket algorithm

Popularity:746 ℃/2024-10-26 21:35:46

Preface: this article implements a simple one flow limiter based on the token bucket algorithm

1 Token Bucket Algorithm

Implementation Principle

  • Token Generation: At fixed intervals, the algorithm puts a certain number of tokens into a bucket. The rate at which the tokens are generated is fixed and is usually expressed as the number of tokens generated per second.
    • Barrel capacity: The bucket has a maximum capacity and if the bucket is full, new tokens will be discarded. This means that the system will not increase requests indefinitely, even in high traffic situations.
  • Request processing: Whenever a request arrives, it needs to get a token from the bucket. If there is a token in the bucket, the request can be processed and the number of tokens in the bucket is reduced by one. If there is no token, the request will be rejected or delayed, depending on the implementation.
  • flow control: The smoothness of the flow and the maximum flow can be controlled by adjusting the token generation rate and the bucket capacity.

Algorithmic Advantages:

  • It can carry a certain amount of unexpected traffic situations.
  • The change in the flow restriction window has been relatively smooth.

coding

According to the principle of the token bucket algorithm, three variables can be defined first.Drum capacitytoken generation ratecap (a poem)Number of tokens in the current bucket. Also define arateLimiterclass and the corresponding constructor method:

public class RateLimiter {
    // Write your own log printing tool(The thread pool article has a posting)
    static Logger log = new Logger(, );
    // Drum capacity
    private final int maxPermit;
    // token generation rate / unit of angle or arc equivalent one sixtieth of a degree
    private final int rate;
    // Number of tokens in the current bucket(Considering that this variable will be used multithreaded, Use atomic classes for)
    private final AtomicInteger holdPermits;
    
    public RateLimiter(int maxPermit, int rate, int initPermits) {
        if (rate < 1) throw new IllegalArgumentException("the rate must be greater than 1");
        if (initPermits < 0) throw new IllegalArgumentException("the initPermits must be greater than 0");
        if (maxPermit < 1) throw new IllegalArgumentException("the maxPermit must be greater than 1");
         = maxPermit;
         = rate;
         = new AtomicInteger(initPermits);
    }
}

Then we need to add aboolean tryAcquire(int permit) method. Indicates that getting thepermitNumber of tokens. Returns true if the fetch succeeds, false if it fails.
It is possible to write this method:

/**
 * Trying to get permit Number of tokens
 *
 * @param permit Number of tokens
 * @return get permit Number of tokens则返回 true, Otherwise, return false
 */
public boolean tryAcquire(int permit) {
    if (permit > maxPermit) throw new IllegalArgumentException("the permit must be smaller than maxPermit:" + maxPermit);
    if (permit < 1) throw new IllegalArgumentException("the permit must be greater than 1");
    int currentPermits;
    do {
        currentPermits = ();
        if (currentPermits < permit) {
            return false;
        }
    } while (!(currentPermits, currentPermits - permit));
    // Log Printing
    ("Original Token Number:" + currentPermits + ", minimize:" + permit + ", current total:" + (currentPermits - permit));
    return true;
}

This method draws on the atomic classcompareAndSetoperation and spin to realize token deduction. When the number of tokens in the current bucket is greater than or equal to the number of tokens requested to be fetched, use thecompareAndSetto enable token deduction.
But this method may fail due to concurrent execution (early deduction of tokens) by other threads. So a spin operation is needed to ensure that the token license can be obtained correctly when the number of tokens is sufficient. Failure is returned only if the number of tokens in the bucket is less than the number of tokens requested by the request.

At this point, the token bucket ofconstructor methodcap (a poem)Methods for obtaining tokensIt has been implemented. But when and how do you put tokens into the bucket?
If you use a timed task then you need to create additional thread objects to do so.

Taking a cue from the wisdom of our predecessors here, the number of tokens is counted and updated in passing each time a token is acquired, in which case we would also need a variable to remember the last time a token was counted.
So add a variable to the classlastFreshTime Record the last time the update token was calculated, and since this variable may be changed by multiple threads, use an atomic class object to ensure thread safety.

// the last time the refresh token was calculated
private final AtomicLong lastFreshTime;

Then, we also need a method for calculating the number of tokens in the update bucket before each token fetch, which first calculates the number of tokens that should be generated based on the number of nanoseconds in the past time, using theintRounding down, the
The exact time required to generate these tokens is then back-calculated using the number of tokens (since the number of tokens is calculated using theintrounded), plus the currentlastFreshTimeThe value of the newlastFreshTimeThe value of the
Here, for the sake of thread-safety, thelastFreshTime The updated use ofcompareAndSetIt is guaranteed that only one thread can get the update permission. This thread, after successfully updating thelastFreshTimeAfter that, it is necessary to continue to update the number of tokens of the
As a result of thetryAcquireThere is also the possibility of other threads deducting the number of tokens, so the atomicity of the update operation needs to be ensured here as well.

/**
 * Number of refreshment tokens
 */
private void freshPermit() {
    long now = ();
    long lastTime = ();
    if (now <= lastTime) return;
    int increment = (int) ((now - lastTime) * rate / 1_000_000_000);
    long thisTime = lastTime + increment * 1_000_000_000L / rate;
    if (increment > 0 && (lastTime, thisTime)) {
        int current;
        int next;
        do {
            current = ();
            next = (maxPermit, current + increment);
        } while (!(current, next));
        ("Original Token Number:" + current + ", rise:" + increment + ", current total:" + ());
    }
}

At this point, we've implemented a simple token bucket implementation code, which can be used as long as each time thetryAcquirewhen usingfreshPermitJust update the token count.

Typically, however, the token bucket will also have a timeout time with aboolean tryAcquire(int permit, long timeOut)method. Here we make a simple implementation that uses a periodic sleep operation instead of a locking mechanism.

Suppose the number of tokens to be acquired ispThe timeout period istSo what's going on intryAcquire(p, t)method, if the current token is insufficientpthen the thread will sleep for a certain amount of time and try to get the token again until it has been in use for longer than thetFailure will be returned only if the acquisition is still not successful.

One problem here is the sleep schedulesleepDurationThe determination of the code in which thesleepDurationThe value of the(p / rate) / 10, and the minimum is 10 and the maximum is 100.
Implementation Code:

/**
 * exist timeout Time to try to get permit Number of tokens
 *
 * @param permit Number of tokens
 * @param timeOut timeout work unit (one's workplace) unit of angle or arc equivalent one sixtieth of a degree
 * @return 如果exist timout Time to get the permit Number of tokens则返回 true, Otherwise, return false
 */
public boolean tryAcquire(int permit, long timeOut) {
    if (permit < 1) throw new IllegalArgumentException("the permit must be greater than 1");
    if (timeOut < 0) throw new IllegalArgumentException("the timeOut must be greater than 0");
    timeOut = timeOut * 1_000_000_000 + ();
    long sleepDuration = (long) (1.0 * permit / rate);
    sleepDuration = (sleepDuration, 100);
    sleepDuration = (sleepDuration, 10);
    while (() < timeOut) {
        if (tryAcquire(permit)) return true;
        else {
            try {
                (sleepDuration);
            } catch (InterruptedException e) {
                ().interrupt();
                return false;
            }
        }
    }
    return false;
}

At this point, we have implemented a simple flow limiter class using the token bucket algorithm.
Full Code:

public class RateLimiter {

    static Logger log = new Logger(, );
    /**
     * Maximum number of tokens
     */
    private final int maxPermit;
    /**
     * token generation rate / each second
     */
    private final int rate;

    /**
     * Current number of available tokens
     */
    private final AtomicInteger holdPermits;
    /**
     * Time of last token calculation
     */
    private final AtomicLong lastFreshTime;


    public RateLimiter(int maxPermit, int rate, int initPermits) {
        if (rate < 1) throw new IllegalArgumentException("the rate must be greater than 1");
        if (initPermits < 0) throw new IllegalArgumentException("the initPermits must be greater than 0");
        if (maxPermit < 1) throw new IllegalArgumentException("the maxPermit must be greater than 1");
        if (maxPermit < rate) throw new IllegalArgumentException("the maxPermit must be greater than rate");
         = maxPermit;
         = rate;
         = new AtomicInteger(initPermits);
         = new AtomicLong(());
    }

    /**
     * Trying to get permit Number of tokens
     *
     * @param permit Number of tokens
     * @return get permit Number of tokens则返回 true, Otherwise, return false
     */
    public boolean tryAcquire(int permit) {
        if (permit > maxPermit)
            throw new IllegalArgumentException("the permit must be smaller than maxPermit:" + maxPermit);
        if (permit < 1) throw new IllegalArgumentException("the permit must be greater than 1");
        freshPermit();

        int currentPermits;
        do {
            currentPermits = ();
            if (currentPermits < permit) {
                return false;
            }
        } while (!(currentPermits, currentPermits - permit));

        ("Original Token Number:" + currentPermits + ", minimize:" + permit + ", current total:" + (currentPermits - permit));
        return true;
    }

    /**
     * 刷新Number of tokens
     */
    private void freshPermit() {
        long now = ();
        long lastTime = ();
        if (now <= lastTime) return;
        int increment = (int) ((now - lastTime) * rate / 1_000_000_000);
        long thisTime = lastTime + increment * 1_000_000_000L / rate;
        if (increment > 0 && (lastTime, thisTime)) {
            int current;
            int next;
            do {
                current = ();
                next = (maxPermit, current + increment);
            } while (!(current, next));
            ("Original Token Number:" + current + ", rise:" + increment + ", current total:" + ());
        }
    }

    /**
     * exist timeout 时间内Trying to get permit Number of tokens
     *
     * @param permit Number of tokens
     * @param timeOut timeout work unit (one's workplace) unit of angle or arc equivalent one sixtieth of a degree
     * @return 如果exist timout 时间内get permit Number of tokens则返回 true, Otherwise, return false
     */
    public boolean tryAcquire(int permit, long timeOut) {
        if (permit < 1) throw new IllegalArgumentException("the permit must be greater than 1");
        if (timeOut < 0) throw new IllegalArgumentException("the timeOut must be greater than 0");
        timeOut = timeOut * 1_000_000_000 + ();
        long sleepDuration = (long) (1.0 * permit / rate);
        sleepDuration = (sleepDuration, 100);
        sleepDuration = (sleepDuration, 10);
        while (() < timeOut) {
            if (tryAcquire(permit)) return true;
            else {
                try {
                    (sleepDuration);
                } catch (InterruptedException e) {
                    ().interrupt();
                    return false;
                }
            }
        }
        return false;
    }

    public static void main(String[] args) {
        RateLimiter rateLimiter = new RateLimiter(100, 50, 0);
        ("commencement");
        for (int i = 0; i < 3; i++) {
            int j = i;
            new Thread(() -> {
                int k = 0;
                while (true) {
                    if ((20, 1)) {
                        ("(prefix indicating ordinal number, e.g. first, number two etc)" + k++ + "disk, threading " + j + " Token acquired successfully");
                    } else ("(prefix indicating ordinal number, e.g. first, number two etc)" + k++ + "disk, threading " + j + " Failed to get token");
                    try {
                        (888);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }).start();
        }
    }
}

summarize

This is a case where you can actually still learn quite a bit about how to thread-safely implement functionality.