Preface: this article implements a simple one flow limiter based on the token bucket algorithm
1 Token Bucket Algorithm
Implementation Principle
- Token Generation: At fixed intervals, the algorithm puts a certain number of tokens into a bucket. The rate at which the tokens are generated is fixed and is usually expressed as the number of tokens generated per second.
-
- Barrel capacity: The bucket has a maximum capacity and if the bucket is full, new tokens will be discarded. This means that the system will not increase requests indefinitely, even in high traffic situations.
- Request processing: Whenever a request arrives, it needs to get a token from the bucket. If there is a token in the bucket, the request can be processed and the number of tokens in the bucket is reduced by one. If there is no token, the request will be rejected or delayed, depending on the implementation.
- flow control: The smoothness of the flow and the maximum flow can be controlled by adjusting the token generation rate and the bucket capacity.
Algorithmic Advantages:
- It can carry a certain amount of unexpected traffic situations.
- The change in the flow restriction window has been relatively smooth.
coding
According to the principle of the token bucket algorithm, three variables can be defined first.Drum capacity
、token generation rate
cap (a poem)Number of tokens in the current bucket
. Also define arateLimiter
class and the corresponding constructor method:
public class RateLimiter {
// Write your own log printing tool(The thread pool article has a posting)
static Logger log = new Logger(, );
// Drum capacity
private final int maxPermit;
// token generation rate / unit of angle or arc equivalent one sixtieth of a degree
private final int rate;
// Number of tokens in the current bucket(Considering that this variable will be used multithreaded, Use atomic classes for)
private final AtomicInteger holdPermits;
public RateLimiter(int maxPermit, int rate, int initPermits) {
if (rate < 1) throw new IllegalArgumentException("the rate must be greater than 1");
if (initPermits < 0) throw new IllegalArgumentException("the initPermits must be greater than 0");
if (maxPermit < 1) throw new IllegalArgumentException("the maxPermit must be greater than 1");
= maxPermit;
= rate;
= new AtomicInteger(initPermits);
}
}
Then we need to add aboolean tryAcquire(int permit)
method. Indicates that getting thepermit
Number of tokens. Returns true if the fetch succeeds, false if it fails.
It is possible to write this method:
/**
* Trying to get permit Number of tokens
*
* @param permit Number of tokens
* @return get permit Number of tokens则返回 true, Otherwise, return false
*/
public boolean tryAcquire(int permit) {
if (permit > maxPermit) throw new IllegalArgumentException("the permit must be smaller than maxPermit:" + maxPermit);
if (permit < 1) throw new IllegalArgumentException("the permit must be greater than 1");
int currentPermits;
do {
currentPermits = ();
if (currentPermits < permit) {
return false;
}
} while (!(currentPermits, currentPermits - permit));
// Log Printing
("Original Token Number:" + currentPermits + ", minimize:" + permit + ", current total:" + (currentPermits - permit));
return true;
}
This method draws on the atomic classcompareAndSet
operation and spin to realize token deduction. When the number of tokens in the current bucket is greater than or equal to the number of tokens requested to be fetched, use thecompareAndSet
to enable token deduction.
But this method may fail due to concurrent execution (early deduction of tokens) by other threads. So a spin operation is needed to ensure that the token license can be obtained correctly when the number of tokens is sufficient. Failure is returned only if the number of tokens in the bucket is less than the number of tokens requested by the request.
At this point, the token bucket ofconstructor method
cap (a poem)Methods for obtaining tokens
It has been implemented. But when and how do you put tokens into the bucket?
If you use a timed task then you need to create additional thread objects to do so.
Taking a cue from the wisdom of our predecessors here, the number of tokens is counted and updated in passing each time a token is acquired, in which case we would also need a variable to remember the last time a token was counted.
So add a variable to the classlastFreshTime
Record the last time the update token was calculated, and since this variable may be changed by multiple threads, use an atomic class object to ensure thread safety.
// the last time the refresh token was calculated
private final AtomicLong lastFreshTime;
Then, we also need a method for calculating the number of tokens in the update bucket before each token fetch, which first calculates the number of tokens that should be generated based on the number of nanoseconds in the past time, using theint
Rounding down, the
The exact time required to generate these tokens is then back-calculated using the number of tokens (since the number of tokens is calculated using theint
rounded), plus the currentlastFreshTime
The value of the newlastFreshTime
The value of the
Here, for the sake of thread-safety, thelastFreshTime
The updated use ofcompareAndSet
It is guaranteed that only one thread can get the update permission. This thread, after successfully updating thelastFreshTime
After that, it is necessary to continue to update the number of tokens of the
As a result of thetryAcquire
There is also the possibility of other threads deducting the number of tokens, so the atomicity of the update operation needs to be ensured here as well.
/**
* Number of refreshment tokens
*/
private void freshPermit() {
long now = ();
long lastTime = ();
if (now <= lastTime) return;
int increment = (int) ((now - lastTime) * rate / 1_000_000_000);
long thisTime = lastTime + increment * 1_000_000_000L / rate;
if (increment > 0 && (lastTime, thisTime)) {
int current;
int next;
do {
current = ();
next = (maxPermit, current + increment);
} while (!(current, next));
("Original Token Number:" + current + ", rise:" + increment + ", current total:" + ());
}
}
At this point, we've implemented a simple token bucket implementation code, which can be used as long as each time thetryAcquire
when usingfreshPermit
Just update the token count.
Typically, however, the token bucket will also have a timeout time with aboolean tryAcquire(int permit, long timeOut)
method. Here we make a simple implementation that uses a periodic sleep operation instead of a locking mechanism.
Suppose the number of tokens to be acquired isp
The timeout period ist
So what's going on intryAcquire(p, t)
method, if the current token is insufficientp
then the thread will sleep for a certain amount of time and try to get the token again until it has been in use for longer than thet
Failure will be returned only if the acquisition is still not successful.
One problem here is the sleep schedulesleepDuration
The determination of the code in which thesleepDuration
The value of the(p / rate) / 10
, and the minimum is 10 and the maximum is 100.
Implementation Code:
/**
* exist timeout Time to try to get permit Number of tokens
*
* @param permit Number of tokens
* @param timeOut timeout work unit (one's workplace) unit of angle or arc equivalent one sixtieth of a degree
* @return 如果exist timout Time to get the permit Number of tokens则返回 true, Otherwise, return false
*/
public boolean tryAcquire(int permit, long timeOut) {
if (permit < 1) throw new IllegalArgumentException("the permit must be greater than 1");
if (timeOut < 0) throw new IllegalArgumentException("the timeOut must be greater than 0");
timeOut = timeOut * 1_000_000_000 + ();
long sleepDuration = (long) (1.0 * permit / rate);
sleepDuration = (sleepDuration, 100);
sleepDuration = (sleepDuration, 10);
while (() < timeOut) {
if (tryAcquire(permit)) return true;
else {
try {
(sleepDuration);
} catch (InterruptedException e) {
().interrupt();
return false;
}
}
}
return false;
}
At this point, we have implemented a simple flow limiter class using the token bucket algorithm.
Full Code:
public class RateLimiter {
static Logger log = new Logger(, );
/**
* Maximum number of tokens
*/
private final int maxPermit;
/**
* token generation rate / each second
*/
private final int rate;
/**
* Current number of available tokens
*/
private final AtomicInteger holdPermits;
/**
* Time of last token calculation
*/
private final AtomicLong lastFreshTime;
public RateLimiter(int maxPermit, int rate, int initPermits) {
if (rate < 1) throw new IllegalArgumentException("the rate must be greater than 1");
if (initPermits < 0) throw new IllegalArgumentException("the initPermits must be greater than 0");
if (maxPermit < 1) throw new IllegalArgumentException("the maxPermit must be greater than 1");
if (maxPermit < rate) throw new IllegalArgumentException("the maxPermit must be greater than rate");
= maxPermit;
= rate;
= new AtomicInteger(initPermits);
= new AtomicLong(());
}
/**
* Trying to get permit Number of tokens
*
* @param permit Number of tokens
* @return get permit Number of tokens则返回 true, Otherwise, return false
*/
public boolean tryAcquire(int permit) {
if (permit > maxPermit)
throw new IllegalArgumentException("the permit must be smaller than maxPermit:" + maxPermit);
if (permit < 1) throw new IllegalArgumentException("the permit must be greater than 1");
freshPermit();
int currentPermits;
do {
currentPermits = ();
if (currentPermits < permit) {
return false;
}
} while (!(currentPermits, currentPermits - permit));
("Original Token Number:" + currentPermits + ", minimize:" + permit + ", current total:" + (currentPermits - permit));
return true;
}
/**
* 刷新Number of tokens
*/
private void freshPermit() {
long now = ();
long lastTime = ();
if (now <= lastTime) return;
int increment = (int) ((now - lastTime) * rate / 1_000_000_000);
long thisTime = lastTime + increment * 1_000_000_000L / rate;
if (increment > 0 && (lastTime, thisTime)) {
int current;
int next;
do {
current = ();
next = (maxPermit, current + increment);
} while (!(current, next));
("Original Token Number:" + current + ", rise:" + increment + ", current total:" + ());
}
}
/**
* exist timeout 时间内Trying to get permit Number of tokens
*
* @param permit Number of tokens
* @param timeOut timeout work unit (one's workplace) unit of angle or arc equivalent one sixtieth of a degree
* @return 如果exist timout 时间内get permit Number of tokens则返回 true, Otherwise, return false
*/
public boolean tryAcquire(int permit, long timeOut) {
if (permit < 1) throw new IllegalArgumentException("the permit must be greater than 1");
if (timeOut < 0) throw new IllegalArgumentException("the timeOut must be greater than 0");
timeOut = timeOut * 1_000_000_000 + ();
long sleepDuration = (long) (1.0 * permit / rate);
sleepDuration = (sleepDuration, 100);
sleepDuration = (sleepDuration, 10);
while (() < timeOut) {
if (tryAcquire(permit)) return true;
else {
try {
(sleepDuration);
} catch (InterruptedException e) {
().interrupt();
return false;
}
}
}
return false;
}
public static void main(String[] args) {
RateLimiter rateLimiter = new RateLimiter(100, 50, 0);
("commencement");
for (int i = 0; i < 3; i++) {
int j = i;
new Thread(() -> {
int k = 0;
while (true) {
if ((20, 1)) {
("(prefix indicating ordinal number, e.g. first, number two etc)" + k++ + "disk, threading " + j + " Token acquired successfully");
} else ("(prefix indicating ordinal number, e.g. first, number two etc)" + k++ + "disk, threading " + j + " Failed to get token");
try {
(888);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
}
}
}
summarize
This is a case where you can actually still learn quite a bit about how to thread-safely implement functionality.