This post focuses on how to use Redis to implement idempotent, anti-jitter, flow-limiting and other features.
idempotent component
import ;
import ;
import ;
import ;
import ;
/**
* Message Queue Idempotent Processor
*/
@Component
@RequiredArgsConstructor
public class MessageQueueIdempotentHandler {
private final StringRedisTemplate stringRedisTemplate;
private static final String IDEMPOTENT_KEY_PREFIX = "xxx:idempotent:";
/**
* Determine if the current message has been consumed
*
* @param messageId message unique identifier
* @return Has the message been consumed
*/
public boolean isMessageBeingConsumed(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
return (().setIfAbsent(key, "0", 2, ));
}
/**
* Determine if the message consumption process is complete
*
* @param messageId message unique identifier
* @return Whether the message is executed or not
*/
public boolean isAccomplish(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
return (().get(key), "1");
}
/**
* Setting up message flow execution completion
*
* @param messageId message unique identifier
*/
public void setAccomplish(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
().set(key, "1", 2, );
}
/**
* If message processing encounters an exception,Remove idempotent markers
*
* @param messageId message unique identifier
*/
public void delMessageProcessed(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
(key);
}
}
@Component
@RocketMQMessageListener(consumerGroup = "saaslink_consumer_group", topic = RedisKeyConstant.SHORT_LINK_STATS_STREAM_TOPIC_KEY)
@Slf4j
public class ShortLinkStatsSaveConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt msgExt) {
String msgId = ();
// Use redis to implement idempotency
if ((())) {
// Determine if the current message flow is complete.
if ((())) {
return; }
}
throw new ServiceException("Message did not complete the process, message queue retry is required"); }
}
try {
byte[] msgExtBody = (); }
// Convert to map
Map<String, String> producerMap = (msgExtBody, );
ShortLinkStatsRecordDTO statsRecord = (("statsRecord")), );
// The actual added logic
} catch (Throwable ex) {
// so-and-so situation is down
(()));
("Record Short Link Monitor Consumption Exception", ex);
throw ex;
}
(());
}
}
Anti-shake component
The idempotent annotation, which prevents users from repeatedly submitting form information, is implemented primarily through distributed locking.
import ;
import ;
import ;
import ;
/**
* Power annotations to prevent users from repeatedly submitting form information
*/
@Target()
@Retention()
public @interface NoDuplicateSubmit {
/**
* The error message returned when the idempotent failure logic is triggered
*/
String message() default "You are operating too fast, please try again later";
}
import ;
import .;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
* Preventing users from repeatedly submitting form information Cutting Controller
*/
@Aspect
@RequiredArgsConstructor
public final class NoDuplicateSubmitAspect {
private final RedissonClient redissonClient;
/**
* Enhanced method marking {@link NoDuplicateSubmit} annotation logic
*/
@Around("@annotation()")
public Object noDuplicateSubmit(ProceedingJoinPoint joinPoint) throws Throwable {
NoDuplicateSubmit noDuplicateSubmit = getNoDuplicateSubmitAnnotation(joinPoint);
// Get Distributed Lock Logo
String lockKey = ("no-duplicate-submit:path:%s:currentUserId:%s:md5:%s", getServletPath(), getCurrentUserId(), calcArgsMD5(joinPoint));
RLock lock = (lockKey);
// Attempting to acquire a lock,Failure to acquire a lock means that the commit has been repeated,Throw an exception directly
if (!()) {
throw new ClientException(());
}
Object result;
try {
// Execute the original logic of the method marked with the anti-duplicate commit annotation
result = ();
} finally {
();
}
return result;
}
/**
* @return Return custom anti-duplicate submission annotations
*/
public static NoDuplicateSubmit getNoDuplicateSubmitAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
MethodSignature methodSignature = (MethodSignature) ();
Method targetMethod = ().getClass().getDeclaredMethod((), ().getParameterTypes());
return ();
}
/**
* @return Get current thread context ServletPath
*/
private String getServletPath() {
ServletRequestAttributes sra = (ServletRequestAttributes) ();
return ().getServletPath();
}
/**
* @return current operating user ID
*/
private String getCurrentUserId() {
// through (a gap)UserConTextGetting
return "xxx";
}
/**
* @return joinPoint md5
*/
private String calcArgsMD5(ProceedingJoinPoint joinPoint) {
return DigestUtil.md5Hex((()));
}
Current Limiting Assembly
Sentinel performs flow limiting
/**
* Initialize the current limit configuration
*/
@Component
public class SentinelRuleConfig implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
List<FlowRule> rules = new ArrayList<>();
FlowRule createOrderRule = new FlowRule();
("xxx");
(RuleConstant.FLOW_GRADE_QPS);
(1);
(createOrderRule);
(rules);
}
}
/**
* Customized Flow Control Policies
*/
public class CustomBlockHandler {
public static Result<ShortLinkCreateRespDTO> createShortLinkBlockHandlerMethod(ShortLinkCreateReqDTO requestParam, BlockException exception) {
return new Result<ShortLinkCreateRespDTO>().setCode("B100000").setMessage("Too many people currently visiting the site,Please try again later....");
}
}
@PostMapping("/api/xxx/v1/create")
@SentinelResource(
value = "xxx",
blockHandler = "createShortLinkBlockHandlerMethod",
blockHandlerClass =
)
public Result<ShortLinkCreateRespDTO> create(@RequestBody CreateReqDTO requestParam) {
return ((requestParam));
}
Redis Flow Limiting Component
Using a lua script, determine if the number of concurrent requests within 1s exceeds our expectations, and limit it if it exceeds our expectations.
-- Parameters for setting user access frequency limits
local username = KEYS[1]
local timeWindow = tonumber(ARGV[1]) -- time window in seconds
-- Constructs the name of the key in Redis that stores the number of times a user can access it
local accessKey = "short-link:user-flow-risk-control:" ... username
-- Atomically increment the access count and get the incremented value
local currentAccessCount = ("INCR", accessKey)
-- Set the expiration time of the key
if currentAccessCount == 1 then
("EXPIRE", accessKey, timeWindow)
end
-- Return the current access count
return currentAccessCount
/**
* User-operated traffic wind control profiles
*/
@Data
@Component
@ConfigurationProperties(prefix = "-limit")
public class UserFlowRiskControlConfiguration {
/**
* Whether to enable user flow risk control validation
*/
private Boolean enable;
/**
* Time window for traffic wind control, in seconds.
*/
private String timeWindow;
private String timeWindow; /**
* The number of times the traffic can be accessed within the time window.
*/
private Long maxAccessCount;
}
xxx:
group:
max-num: 20
flow-limit:
enable: true
time-window: 1
max-access-count: 20
import .;
import ;
import ;
import ;
import ;
import ;
import .*;
import ;
import ;
import ;
import .slf4j.Slf4j;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import static .FLOW_LIMIT_ERROR;
/**
* User Operated Traffic Risk Control Filter
*/
@Slf4j
@RequiredArgsConstructor
public class UserFlowRiskControlFilter implements Filter {
private final StringRedisTemplate stringRedisTemplate;
private final UserFlowRiskControlConfiguration userFlowRiskControlConfiguration;
private static final String USER_FLOW_RISK_CONTROL_LUA_SCRIPT_PATH = "lua/user_flow_risk_control.lua";
@SneakyThrows
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
(new ResourceScriptSource(new ClassPathResource(USER_FLOW_RISK_CONTROL_LUA_SCRIPT_PATH)));
();
String username = (()).orElse("other");
Long result;
try {
result = (redisScript, (username), ());
} catch (Throwable ex) {
("Enforcing user request traffic limitsLUAScript Error", ex);
returnJson((HttpServletResponse) response, ((new ClientException(FLOW_LIMIT_ERROR))));
return;
}
if (result == null || result > ()) {
returnJson((HttpServletResponse) response, ((new ClientException(FLOW_LIMIT_ERROR))));
return;
}
(request, response);
}
private void returnJson(HttpServletResponse response, String json) throws Exception {
("UTF-8");
("text/html; charset=utf-8");
try (PrintWriter writer = ()) {
(json);
}
}
}