Location>code7788 >text

Redis implementation of idempotence, jitter, flow limiting, etc.

Popularity:622 ℃/2024-10-14 17:37:02

This post focuses on how to use Redis to implement idempotent, anti-jitter, flow-limiting and other features.

idempotent component

import ;
import ;
import ;

import ;
import ;

/**
 * Message Queue Idempotent Processor
 */
@Component
@RequiredArgsConstructor
public class MessageQueueIdempotentHandler {

    private final StringRedisTemplate stringRedisTemplate;

    private static final String IDEMPOTENT_KEY_PREFIX = "xxx:idempotent:";

    /**
     * Determine if the current message has been consumed
     *
     * @param messageId message unique identifier
     * @return Has the message been consumed
     */
    public boolean isMessageBeingConsumed(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        return (().setIfAbsent(key, "0", 2, ));
    }

    /**
     * Determine if the message consumption process is complete
     *
     * @param messageId message unique identifier
     * @return Whether the message is executed or not
     */
    public boolean isAccomplish(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        return (().get(key), "1");
    }

    /**
     * Setting up message flow execution completion
     *
     * @param messageId message unique identifier
     */
    public void setAccomplish(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        ().set(key, "1", 2, );
    }

    /**
     * If message processing encounters an exception,Remove idempotent markers
     *
     * @param messageId message unique identifier
     */
    public void delMessageProcessed(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        (key);
    }
}
@Component
@RocketMQMessageListener(consumerGroup = "saaslink_consumer_group", topic = RedisKeyConstant.SHORT_LINK_STATS_STREAM_TOPIC_KEY)
@Slf4j
public class ShortLinkStatsSaveConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt msgExt) {
        String msgId = ();
        // Use redis to implement idempotency
        if ((())) {
            // Determine if the current message flow is complete.
            if ((())) {
                return; }
            }
            throw new ServiceException("Message did not complete the process, message queue retry is required"); }
        }
        try {
            byte[] msgExtBody = (); }
            // Convert to map
            Map<String, String> producerMap = (msgExtBody, );
            ShortLinkStatsRecordDTO statsRecord = (("statsRecord")), );
            // The actual added logic
            } catch (Throwable ex) {
            // so-and-so situation is down
            (()));
            ("Record Short Link Monitor Consumption Exception", ex);
            throw ex;
        }
        (());
    }
}

Anti-shake component

The idempotent annotation, which prevents users from repeatedly submitting form information, is implemented primarily through distributed locking.

import ;
import ;
import ;
import ;

/**
 * Power annotations to prevent users from repeatedly submitting form information
 */
@Target()
@Retention()
public @interface NoDuplicateSubmit {

    /**
     * The error message returned when the idempotent failure logic is triggered
     */
    String message() default "You are operating too fast, please try again later";
}
import ;
import .;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;

import ;

/**
 * Preventing users from repeatedly submitting form information Cutting Controller
 */
@Aspect
@RequiredArgsConstructor
public final class NoDuplicateSubmitAspect {

    private final RedissonClient redissonClient;

    /**
     * Enhanced method marking {@link NoDuplicateSubmit} annotation logic
     */
    @Around("@annotation()")
    public Object noDuplicateSubmit(ProceedingJoinPoint joinPoint) throws Throwable {
        NoDuplicateSubmit noDuplicateSubmit = getNoDuplicateSubmitAnnotation(joinPoint);
        // Get Distributed Lock Logo
        String lockKey = ("no-duplicate-submit:path:%s:currentUserId:%s:md5:%s", getServletPath(), getCurrentUserId(), calcArgsMD5(joinPoint));
        RLock lock = (lockKey);
        // Attempting to acquire a lock,Failure to acquire a lock means that the commit has been repeated,Throw an exception directly
        if (!()) {
            throw new ClientException(());
        }
        Object result;
        try {
            // Execute the original logic of the method marked with the anti-duplicate commit annotation
            result = ();
        } finally {
            ();
        }
        return result;
    }

    /**
     * @return Return custom anti-duplicate submission annotations
     */
    public static NoDuplicateSubmit getNoDuplicateSubmitAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
        MethodSignature methodSignature = (MethodSignature) ();
        Method targetMethod = ().getClass().getDeclaredMethod((), ().getParameterTypes());
        return ();
    }

    /**
     * @return Get current thread context ServletPath
     */
    private String getServletPath() {
        ServletRequestAttributes sra = (ServletRequestAttributes) ();
        return ().getServletPath();
    }

    /**
     * @return current operating user ID
     */
    private String getCurrentUserId() {
        // through (a gap)UserConTextGetting
        return "xxx";
    }

    /**
     * @return joinPoint md5
     */
    private String calcArgsMD5(ProceedingJoinPoint joinPoint) {
        return DigestUtil.md5Hex((()));
    }

Current Limiting Assembly

Sentinel performs flow limiting

/**
 * Initialize the current limit configuration
 */
@Component
public class SentinelRuleConfig implements InitializingBean {

    @Override
    public void afterPropertiesSet() throws Exception {
        List<FlowRule> rules = new ArrayList<>();
        FlowRule createOrderRule = new FlowRule();
        ("xxx");
        (RuleConstant.FLOW_GRADE_QPS);
        (1);
        (createOrderRule);
        (rules);
    }
}
/**
 * Customized Flow Control Policies
 */
public class CustomBlockHandler {

    public static Result<ShortLinkCreateRespDTO> createShortLinkBlockHandlerMethod(ShortLinkCreateReqDTO requestParam, BlockException exception) {
        return new Result<ShortLinkCreateRespDTO>().setCode("B100000").setMessage("Too many people currently visiting the site,Please try again later....");
    }
}
    @PostMapping("/api/xxx/v1/create")
    @SentinelResource(
            value = "xxx",
            blockHandler = "createShortLinkBlockHandlerMethod",
            blockHandlerClass = 
    )
    public Result<ShortLinkCreateRespDTO> create(@RequestBody CreateReqDTO requestParam) {
        return ((requestParam));
    }

Redis Flow Limiting Component

Using a lua script, determine if the number of concurrent requests within 1s exceeds our expectations, and limit it if it exceeds our expectations.

-- Parameters for setting user access frequency limits
local username = KEYS[1]
local timeWindow = tonumber(ARGV[1]) -- time window in seconds

-- Constructs the name of the key in Redis that stores the number of times a user can access it
local accessKey = "short-link:user-flow-risk-control:" ... username

-- Atomically increment the access count and get the incremented value
local currentAccessCount = ("INCR", accessKey)

-- Set the expiration time of the key
if currentAccessCount == 1 then
    ("EXPIRE", accessKey, timeWindow)
end

-- Return the current access count
return currentAccessCount
/**
 * User-operated traffic wind control profiles
 */
@Data
@Component
@ConfigurationProperties(prefix = "-limit")
public class UserFlowRiskControlConfiguration {

    /**
     * Whether to enable user flow risk control validation
     */
    private Boolean enable;

    /**
     * Time window for traffic wind control, in seconds.
     */
    private String timeWindow;

    private String timeWindow; /**
     * The number of times the traffic can be accessed within the time window.
     */
    private Long maxAccessCount;
}
xxx:
  group:
    max-num: 20
  flow-limit:
    enable: true
    time-window: 1
    max-access-count: 20
import .;
import ;
import ;
import ;
import ;
import ;
import .*;
import ;
import ;
import ;
import .slf4j.Slf4j;
import ;
import ;
import ;
import ;

import ;
import ;
import ;

import static .FLOW_LIMIT_ERROR;


/**
 * User Operated Traffic Risk Control Filter
 */
@Slf4j
@RequiredArgsConstructor
public class UserFlowRiskControlFilter implements Filter {

    private final StringRedisTemplate stringRedisTemplate;
    private final UserFlowRiskControlConfiguration userFlowRiskControlConfiguration;

    private static final String USER_FLOW_RISK_CONTROL_LUA_SCRIPT_PATH = "lua/user_flow_risk_control.lua";

    @SneakyThrows
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException {
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        (new ResourceScriptSource(new ClassPathResource(USER_FLOW_RISK_CONTROL_LUA_SCRIPT_PATH)));
        ();
        String username = (()).orElse("other");
        Long result;
        try {
            result = (redisScript, (username), ());
        } catch (Throwable ex) {
            ("Enforcing user request traffic limitsLUAScript Error", ex);
            returnJson((HttpServletResponse) response, ((new ClientException(FLOW_LIMIT_ERROR))));
            return;
        }
        if (result == null || result > ()) {
            returnJson((HttpServletResponse) response, ((new ClientException(FLOW_LIMIT_ERROR))));
            return;
        }
        (request, response);
    }

    private void returnJson(HttpServletResponse response, String json) throws Exception {
        ("UTF-8");
        ("text/html; charset=utf-8");
        try (PrintWriter writer = ()) {
            (json);
        }
    }
}