Location>code7788 >text

RocketMQ practical combat—10. Marketing system code optimization

Popularity:911 ℃/2025-02-11 22:41:51

Outline

1. The marketing system introduces MQ to achieve asynchronous performance optimization

2. Release coupons based on MQ to improve system scalability

3. Redis to realize repetitive promotion activities

4. Implement asynchronous creation of events based on promotional activities

5. Implementation of push task sharding and sharding message batch sending

6. Decoupling of push system and user group query logic

7. Query user data and send push messages in batches

8. Thread pool encapsulation and push system multi-thread push

9. Multi-threaded push of tens of millions of messages in the push system

10 million users’ lazy coupon code implementation

11. Implementation of code for coupons for specified user groups

12. Refactoring of batch merging algorithm for sharded messages

13. Implementation of push code for millions of portraits

14. Full-link pressure measurement of PUSH in million-level users in production environment

 

Next, the codes for the four major promotional scenarios of the marketing system are optimized: full users push promotion activities, full users send coupons, specific users push coupon messages, and hot products are regularly pushed

 

1. The marketing system introduces MQ to achieve asynchronous performance optimization

Querying a full number of users, creating a large number of messages, and sending a large number of messages to MQ, all of these three operations can be very time-consuming. Therefore, none of these three operations should be processed synchronously when creating marketing activities, but it is best to use MQ to process asynchronously when consumed.

 

Examples of promotional activity creation interface:

 

First version: Write library (10 milliseconds) + Full user pull (several minutes) + Large number of messages are created and sent to MQ (several minutes). At this time, the interface performance is low and time-consuming.

 

Optimization: After using MQ to implement asynchronous processing, write the library (10 milliseconds) + send a message to MQ (10 milliseconds).

 

2. Release coupons based on MQ to improve system scalability

MQ mainly has three major functions: peak cutting and valley filling, asynchronous improvement of performance, and decoupling and improvement of scalability. In the initial version, MQ was mainly used to achieve peak cutting and valley filling to solve the problems faced by the instantaneous high concurrent library writing or calling interfaces. The time-consuming problem encountered when using MQ to create active interfaces was continued, that is, to improve performance by implementing asynchronization.

 

The following is a description of using MQ to improve system scalability:

 

For example, when creating an order, a coupon is used in the order. At this time, the order system needs to notify the inventory system and lock the product inventory. At the same time, it is also necessary to notify the marketing system to lock the coupon and set its is_used field. But later the user initiated the cancellation operation, modified the order status, and released the inventory and coupons. The inventory is restored to its original quantity, and the coupon can continue to be used. So when canceling an order, should we directly call the interfaces between the inventory system and the marketing system to release inventory and coupons?

 

Generally, when canceling an order, an order cancel event message OrderCanceledEvent will be sent to MQ by introducing MQ. Then let the inventory system, marketing system, points system, etc. pay attention to the order cancellation event message, and each handles the order cancellation after the order is cancelled. This will enable the order system to decouple the inventory system, marketing system, points system and other systems, and improve the scalability of the order system.

 

3. Redis to realize repetitive promotion activities

(1) Configure Redis

(2) Use Redis to deduplicate when creating promotions

 

(1) Configure Redis

@Data
 @Configuration
 @ConditionalOnClass()
 public class RedisConfig {
     @Value("${}")
     private String host;
     @Value("${}")
     private String port;
     @Value("${}")
     private String password;
     @Value("${}")
     private int timeout;

     @Bean
     @ConditionalOnClass()
     public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
         RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
         (redisConnectionFactory);
         (new StringRedisSerializer());
         ();
         return redisTemplate;
     }

     @Bean
     @ConditionalOnClass()
     public RedissonClient redissonClient() {
         Config config = new Config();
         ()
             .setAddress("redis://" + host + ":" + port)
             .setPassword(password)
             .setConnectionMinimumIdleSize(10)
             .setConnectionPoolSize(100)
             .setIdleConnectionTimeout(600000)
             .setSubscriptionConnectionMinimumIdleSize(10)
             .setSubscriptionConnectionPoolSize(100)
             .setTimeout(timeout);
         (new StringCodec());
         (5);
         (5);
         RedissonClient client = (config);
         return client;
     }

     @Bean
     @ConditionalOnClass()
     public RedisCache redisCache(RedisTemplate redisTemplate) {
         return new RedisCache(redisTemplate);
     }
 }

 public class RedisCache {
     private RedisTemplate redisTemplate;
    
     public RedisCache(RedisTemplate redisTemplate) {
          = redisTemplate;
     }
    
     //Cache Storage
     public void set(String key, String value, int seconds) {
         ValueOperations<String, String> vo = ();
         if (seconds > 0) {
             (key, value, seconds, );
         } else {
             (key, value);
         }
     }
    
     //Cache Retrieval
     public String get(String key) {
         ValueOperations<String, String> vo = ();
         return (key);
     }
    
     //The cache fails manually
     public boolean delete(String key) {
         return (key);
     }
    
     //Judge whether the hash key exists
     public boolean hExists(String key) {
         return hGetAll(key).isEmpty();
     }
    
     //Get the key-value pairs in the hash variable, corresponding to the redis hgetall command
     public Map<String, String> hGetAll(String key) {
         return ().entries(key);
     }
    
     //Add hash key-value pairs in the form of map collection
     public void hPutAll(String key, Map<String, String> map) {
         ().putAll(key, map);
     }
    
     //Execute the lua script
     public <T> T execute(RedisScript<T> script, List<String> keys, String... args) {
         return (T) (script, keys, args);
     }
    
     public RedisTemplate getRedisTemplate() {
         return redisTemplate;
     }
 }

(2) Use Redis to deduplicate when creating promotions

@Service
 public class PromotionServiceImpl implements PromotionService {
     //Open promotional activities DAO
     @Autowired
     private SalesPromotionDAO salesPromotionDAO;

     //Redis Cache Tool
     @Resource
     private RedisCache redisCache;
    
     @Resource
     private PromotionConverter promotionConverter;
    
     //Add or modified a promotion
     @Transactional(rollbackFor = )
     @Override
     public SaveOrUpdatePromotionDTO saveOrUpdatePromotion(SaveOrUpdatePromotionRequest request) {
         //Judge whether the activity is repeated
         String result = (PROMOTION_CONCURRENCY_KEY +
             () +
             () +
             ().getTime() +
             ().getTime());
         if ((result)) {
             return null;
         }

         ("Activity content: {}", request);
         //Activity rules
         String rule = JsonUtil.object2Json(());

         //Construct the promotional activity entity
         SalesPromotionDO salesPromotionDO = (request);
         (rule);

         //Promotional activities are in stock
         (salesPromotionDO);

         //Write Redis cache for next creation of derepeat
         (PROMOTION_CONCURRENCY_KEY + () + () + ().getTime() + ().getTime(), ().toString(), 30 * 60);

         //Push promotional activities for all users and send MQ
         sendPlatformPromotionMessage(salesPromotionDO);

         //Construct the response data
         SaveOrUpdatePromotionDTO dto = new SaveOrUpdatePromotionDTO();
         (());
         (());
         (rule);
         (());
         (true);
         return dto;
     }
     ...
 }

 

4. Implement asynchronous creation of events based on promotional activities

(1) When creating a promotion, publish a create event event message to MQ

(2) The marketing system requires consumption to create event events messages

 

(1) When creating a promotion, publish a create event event message to MQ

//Promotional activity creation event
 @Data
 public class SalesPromotionCreatedEvent {
     private SalesPromotionDO salesPromotion;
 }

 @Service
 public class PromotionServiceImpl implements PromotionService {
     ...
     //Add or modified an operation activity
     @Transactional(rollbackFor = )
     @Override
     public SaveOrUpdatePromotionDTO saveOrUpdatePromotion(SaveOrUpdatePromotionRequest request) {
         //Judge whether the activity is repeated
         String result = (PROMOTION_CONCURRENCY_KEY +
             () +
             () +
             ().getTime() +
             ().getTime());
         if ((result)) {
             return null;
         }

         ("Activity content: {}", request);
         //Activity rules
         String rule = JsonUtil.object2Json(());

         //Construct the promotional activity entity
         SalesPromotionDO salesPromotionDO = (request);
         (rule);

         //Promotional activities are in stock
         (salesPromotionDO);

         (PROMOTION_CONCURRENCY_KEY +
             () +
             () +
             ().getTime() +
             ().getTime(), ().toString(), 30 * 60);

         //Push promotions for all users through MQ
         //sendPlatformPromotionMessage(salesPromotionDO);

         //Publish promotional event creation event to MQ
         publishSalesPromotionCreatedEvent(salesPromotionDO);

         //Construct the response data
         SaveOrUpdatePromotionDTO dto = new SaveOrUpdatePromotionDTO();
         (());
         (());
         (rule);
         (());
         (true);
         return dto;
     }

     //Publish promotional activity creation event
     private void publishSalesPromotionCreatedEvent(SalesPromotionDO salesPromotion) {
         SalesPromotionCreatedEvent salesPromotionCreatedEvent = new SalesPromotionCreatedEvent();
         (salesPromotion);
         String salesPromotionCreatedEventJSON = JsonUtil.object2Json(salesPromotionCreatedEvent);
         (RocketMqConstant.SALES_PROMOTION_CREATED_EVENT_TOPIC, salesPromotionCreatedEventJSON, "Publish promotional activity creation event");
     }
     ...
 }

(2) The marketing system requires consumption to create event events messages

@Configuration
 public class ConsumerBeanConfig {
     ...
     @Bean("salesPromotionCreatedEventListener")
     public DefaultMQPushConsumer salesPromotionCreatedEventListener(SalesPromotionCreatedEventListener salesPromotionCreatedEventListener) throws MQClientException {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(SALES_PROMOTION_CREATED_EVENT_CONSUMER_GROUP);
         (());
         (SALES_PROMOTION_CREATED_EVENT_TOPIC, "*");
         (salesPromotionCreatedEventListener);
         ();
         return consumer;
     }
     ...
 }

 //Promotional activity creation event listener
 @Component
 public class SalesPromotionCreatedEventListener implements MessageListenerConcurrently {
     ...
 }

 

5. Implementation of push task sharding and sharding message batch sending

When the marketing system creates event events messages in consumption, it will push task sharding and sharding messages from tens of millions of users will be merged to send to MQ.

//The Topic sent to MQ is "PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC".
 //Promotional activity creation event listener
 @Component
 public class SalesPromotionCreatedEventListener implements MessageListenerConcurrently {
     @DubboReference(version = "1.0.0")
     private AccountApi accountApi;

     @Resource
     private DefaultProducer defaultProducer;

     @Override
     public ConsumerConcurrentlyStatus consumerMessage(List<MessageExt> list, ConsumerConcurrentlyContext consumeConcurrentlyContext) {
         try {
             for (MessageExt messageExt : list) {
                 //The following three lines of code can obtain a promotion that has just been successfully created.
                 String message = new String(());
                 SalesPromotionCreatedEvent salesPromotionCreatedEvent = (message, );
                 SalesPromotionDO salesPromotion = ();

                 //This promotion will launch push for all users

                 //userBucketSize is the size of a user shard, corresponding to a startUserId ~ endUserId user ID range
                 final int userBucketSize = 1000;
                 //messageBatchSize is to merge multiple task messages into the size of a batch message. Each batch message of RocketMQ contains 100 push task messages.
                 //So 1w push task messages will be combined into 100 batch messages
                 //Send 10,000 push task messages to MQ, and only 100 network communications are required to RocketMQ, which can greatly reduce the time spent sending messages
                 final int messageBatchSize = 100;

                 //1. There are two ways to obtain the number of all users:
                 //The first is to perform count (not efficient), the second is to obtain max(userId), and the second method is usually used
                 //select * from account order by id desc limit 1, similar to this SQL statement to obtain the largest primary key value in the user table
                 JsonResult<Long> queryMaxUserIdResult = ();
                 if (!()) {
                     throw new BaseBizException((), ());
                 }
                 Long maxUserId = ();

                 //2. After obtaining the number of all users, you can use a certain algorithm to combine the self-increase ID to slice the push tasks of tens of millions of users. For example, a push task contains 1,000 users or 2,000 users.
                 //userBuckets is a collection of user shards, which contains tens of thousands of key-value pairs. Each key-value pair is a startUserId -> endUserId, representing a push task shard
                 Map<Long, Long> userBuckets = new LinkedHashMap<>();
                 AtomicBoolean doSharding = new AtomicBoolean(true);// Is it necessary to execute sharding currently
                 long startUserId = 1L;//Start user ID, the database's primary key starts from 1

                 while (()) {
                     if (startUserId > maxUserId) {
                         (true, false);
                         break;
                     }
                     (startUserId, startUserId + userBucketSize);
                     startUserId += userBucketSize;
                 }

                 //3. After completing the sharding, the possible batch merge of thousands of push tasks will be performed on the batch of RocketMQ messages
                 //Send tasks to MQ batch by batch mode, thereby reducing the time spent on communication with RocketMQ network
                 int handledBucketCount = 0;
                 List<String> promotionPushTaskBatch = new ArrayList<>(messageBatchSize);
                 for (<Long, Long> userBucket : ()) {
                     handledBucketCount++;
                     PlatformPromotionUserBucketMessage promotionPushTask = ()
                         .startUserId(())
                         .endUserId(())
                         .promotionId(())
                         .promotionType(())
                         .mainMessage(())
                         .message("You have been qualified for the event and can open the APP to enter the event page")
                         .informType(())
                         .build();
                     String promotionPushTaskJSON = JsonUtil.object2Json(promotionPushTask);
                     (promotionPushTaskJSON);

                     //After batch merge and send, clear the promotionPushTaskBatch
                     if (() == messageBatchSize || handledBucketCount == ()) {
                         (RocketMqConstant.PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC, promotionPushTaskBatch, "Platform Distributes Promotion User Bucket Message");
                         ();
                     }
                 }
             }
         } catch(Exception e) {
             ("consume error, promotional event creation event handling exception", e);
             return ConsumerConcurrentlyStatus.RECONSUME_LATER;
         }
         return ConsumerConcurrentlyStatus.CONSUME_SUCCESS;
     }
 }

 

6. Decoupling of push system and user group query logic

After the marketing system merges the push shard messages batch to MQ, these messages will be consumed by the marketing system rather than by the push system. If the push system directly consumes these sharded task messages, the push system needs to query the user group information based on the user ID range. In this way, the push system will couple the member system (check according to user ID) or the big data system (check according to user portrait) to couple the query logic of the specific user group.

 

Therefore, the marketing system decides the specific user group query logic, so as to achieve the decoupling of the push system and the member system or the big data system. At this time, the marketing system will encapsulate the specific push messages of each user, and then send them to the MQ through the merge batch message, and finally consumed by the push system.

 

Therefore, the marketing system is responsible for the message of sharding of consumption push tasks.

 

7. Query user data and send push messages in batches

After the marketing system obtains a push task shard, it decides how to query the user group by itself. The marketing system will encapsulate push messages for each user found, and then send these push messages to MQ in batch mode and process them by the push system for consumption.

 

Step 1: Get a push task shard

Step 2: Query the user group corresponding to this push task shard

Step 3: Create a user push message that meets the format specified by the push system for each user, and then send each user's push message to MQ.

 

The first implementation of step 3 (not recommended):

Use thread pool to concurrently send 1,000 messages in a task shard to MQ. The problem with this implementation is that if a shard task has 1,000 users, then although it is multi-threaded concurrently, it still needs to be sent. 1000 requests to MQ.

 

The second implementation of step 3 (recommended):

Use a thread pool to send messages in batches to MQ in batches. The official website of RocketMQ recommends that a batch of messages should not exceed 1MB, and in the RocketMQ source code, the batch message should not exceed 4MB. Therefore, when sending batches, you need to consider the size of the message sent, and then choose how many messages will be sent in each batch according to network pressure and IO pressure.

 

Here, 100 batches are sent, and 1,000 user-push messages will be combined into 10 batches for sending. Therefore, only 10 network requests can be initiated, and the entire process of processing each push task shard to writing MQ is very fast. A marketing system processes 10,000 push shard tasks in a single thread. Each task requires 10 MQ times, each time of 10ms, and a total of 1000s=20 minutes are required. With multiple marketing systems, the 10 batch sends for each shard task can be regarded as thread pool concurrently processed. Suppose there are 2 machines, each machine has 50 threads, then a total of 1000s / (2*50) = 10s can be used to complete the processing of 10,000 shard tasks.

//The following is the implementation of the marketing system monitoring of "PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC" consumption push task shard message.
 //After consumption is completed, the message will be sent to MQ's "PLATFORM_PROMOTION_SEND_TOPIC":

 @Configuration
 public class ConsumerBeanConfig {
     ...
     //The platform issues promotion activities for users to buy consumers
     @Bean("platformPromotionUserBucketReceiveTopicConsumer")
     public DefaultMQPushConsumer receiveCouponUserBucketConsumer(PlatFormPromotionUserBucketListener platFormPromotionUserBucketListener) throws MQClientException {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_PROMOTION_SEND_USER_BUCKET_CONSUMER_GROUP);
         (());
         (PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC, "*");
         (platFormPromotionUserBucketListener);
         ();
         return consumer;
     }
 }

 @Component
 public class PlatFormPromotionUserBucketListener implements MessageListenerConcurrently {
     //Membership Service
     @DubboReference(version = "1.0.0")
     private AccountApi accountApi;

     //The thread pool shared by sending messages
     @Autowired
     @Qualifier("sharedSendMsgThreadPool")
     private SafeThreadPool sharedSendMsgThreadPool;

     //RocketMQ Producer
     @Autowired
     private DefaultProducer defaultProducer;

     //Concurrent consumption messages
     @Override
     public ConsumerConcurrentlyStatus consumerMessage(List<MessageExt> msgList, ConsumerConcurrentlyContext context) {
         try {
             for (MessageExt messageExt : msgList) {
                 //1. Get a push task fragment
                 String message = new String(());
                 ("Execute the user bucket message logic of the platform sending promotion activities, message content: {}", message);
                 PlatformPromotionUserBucketMessage promotionPushTask = (message, );

                 //2. Query the user group corresponding to this push task shard
                 Long startUserId = ();
                 Long endUserId = ();

                 JsonResult<List<MembershipAccountDTO>> queryResult = (startUserId, endUserId);
                 if (!()) {
                     throw new BaseBizException((), ());
                 }

                 List<MembershipAccountDTO> membershipAccounts = ();
                 if ((membershipAccounts)) {
                     ("The user is not queried based on the id range in the user bucket, startUserId={}, endUserId{}", startUserId, endUserId);
                     continue;
                 }

                 //3. Create a user push message that meets the format specified by the push system for each user, and then send each user's push message to MQ;
                 //The first implementation (not recommended):
                 // Use a thread pool to concurrently send 1,000 messages in a task shard to MQ concurrently;
                 //The problem with this implementation is that if a shard task has 1,000 users, then although it is multi-threaded concurrent, it still needs to send 1,000 requests to MQ;
                 //The second implementation (recommended):
                 //Send messages to MQ in batches concurrently using thread pools;
                 //The official website of RocketMQ for batch sending messages is that a batch cannot exceed 1MB, and in the RocketMQ source code, batch messages cannot actually exceed 4MB;
                 //So when sending in batches, you need to consider the size of the message sent comprehensively, and then select how many messages will be sent in each batch based on comprehensive comparison and evaluation based on network pressure and IO pressure;
                 // Here, it will be sent in batches according to 100 items, and 1,000 users will be sent in batches.
                 // Therefore, it is only necessary to initiate 10 network requests, and the entire process of processing each task shard to writing MQ is very fast;
                 // A marketing system continuously processes 10,000 sharded tasks with a single thread. Each task requires 10 MQ, 100,000 times, and 10ms each time. A total of 1000,000ms=1000s=20 minutes.
                 //Multiple marketing systems, the 10 batches of each shard task are written concurrently by thread pools.
                 //Suppose there are 50 threads for each machine, then a total of 1000s / 50 = 20s can be used to process 10,000 shard tasks here;
                 PlatformPromotionMessage promotionMessage = ()
                     .promotionId(())
                     .promotionType(())
                     .mainMessage(())
                     .message("You have been qualified for the event, open the APP to enter the event page")
                     .informType(())
                     .build();

                 List<String> batch = new ArrayList<>(100);
                 for (MembershipAccountDTO account : membershipAccounts) {
                     (());
                     ((promotionMessage));
                     if (() == 100) {
                         (() -> {
                             (RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, batch, "Platform sends promotional message");
                         });
                         ();
                     }
                 }
                 //The rest are also sent in batches
                 if (!(batch)) {
                     (() -> {
                         (RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, batch, "Platform sends promotional message");
                     });
                     ();
                 }
             }
         } catch (Exception e) {
             ("consume error, promotional message consumption failed", e);
             //This consumption failed, re-use next time
             return ConsumerConcurrentlyStatus.RECONSUME_LATER;
         }
         return ConsumerConcurrentlyStatus.CONSUME_SUCCESS;
     }
 }

 

8. Thread pool encapsulation and push system multi-thread push

(1) Instantiate a thread pool through @Configuration

(2) Specific thread pool instantiation

 

(1) Instantiate a thread pool through @Configuration

@Configuration
 public class ThreadPoolConfig {
     //The thread pool shared by sending messages
     //Thread pool name, thread name: sharedThreadPool
     //A maximum of how many threads are allowed to execute tasks at the same time: 100
     @Bean("sharedSendMsgThreadPool")
     public SafeThreadPool sharedSendMsgThreadPool() {
         return new SafeThreadPool("sharedSendMsgThreadPool", 100);
     }
 }

(2) Specific thread pool instantiation

The corePoolSize of the thread pool that sends the message is set to 0, and all threads can be reclaimed when they are idle.

public class SafeThreadPool {
     private final Semaphore semaphore;
     private final ThreadPoolExecutor threadPoolExecutor;

     public SafeThreadPool(String name, int permits) {
         //If more than 100 tasks are to be run at the same time, it will be blocked through the semaphore semaphore
         semaphore = new Semaphore(permits);

         //Why should I set corePoolSize to be 0?
         //Because message push is not always pushed, only promotional activities such as coupons are required to push messages. Under normal circumstances, message push will not be pushed.
         //So the corePoolSize of the thread pool that sends the message is set to 0, and all threads can be recycled when they are idle
         threadPoolExecutor = new ThreadPoolExecutor(
             0,
             permits * 2,
             60,
             ,
             new SynchronousQueue<>(),
             (name)
         );
     }

     public void execute(Runnable task) {
         //If more than 100 batches are to be pushed concurrently, they will block here.
         //For example, when 100 threads are busy, it is impossible to have more than 100 batches to be submitted at the same time.
         //In extreme cases, at most 100 batches, you can get the semaphore and the max capacity of 100 * 2
         ();

         (() -> {
             try {
                 ();
             } finally {
                 ();
             }
         });
     }
 }

 

9. Multi-threaded push of tens of millions of messages in the push system

According to the previous example, for a push task of a message of tens of millions, the marketing system will first shard the push task of the message of tens of millions of yuan, and then batch is sent to MQ.

 

The marketing system will obtain the push task shard of this push task with tens of millions of messages, and then decides how to query the user group by itself.

 

The marketing system will encapsulate push messages for each user found, and then send these push messages to MQ in batch mode and process them by the push system for consumption.

 

Therefore, the marketing system will encapsulate tens of millions of push messages and then merge them into 100,000 batch push messages to send to MQ. If each batch push message is sent to MQ, it takes 50ms in total, which is 50000s. Using 2 marketing systems, a total of 200 threads are used to send these 100,000 batch push messages to MQ, so a total of 5000s / 200 = 25s can be used to send tens of millions of push messages to MQ.

 

Assuming that 5 4-core 8G machines have deployed push systems, each push system will consume 2 million push messages, and then use multi-threaded concurrent pushes. Since 5 machines are deployed, each machine will get 20000 messages, and when consumption, it will be taken batch by batch and placed in msgList. If each message calls the third-party platform SDK to initiate a push, it takes 100ms~200ms, then it takes a total of 200w*200ms=400,000s=ten hours. At this time, the thread pool must be used to push concurrently in a multi-threaded manner.

 

Each machine has a maximum of 60 threads, so there are a total of 300 threads for 5 machines. Since 200ms is pushed at a time, each thread can be pushed 5 times per second, and 300 threads can be pushed 1500 times per second. Then after 6000s, 5 machines can push 10 million times with 300 threads, 6000s / 60 = 100 minutes, more than an hour. Therefore, the full push of tens of millions of users is as fast as dozens of minutes and slow as two or three hours.

 

The push system that listens to PLATFORM_PROMOTION_SEND_TOPIC, consumes these 2 million push messages and then initiates the push code as follows:

@Configuration
 public class ConsumerBeanConfig {
     ...
     //Platform activity push message consumer completeFuture logic
     @Bean("platformPromotionSendTopicConsumer")
     public DefaultMQPushConsumer platformPromotionSendConsumer(PlatFormPromotionListener platFormPromotionListener) throws MQClientException {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_PROMOTION_SEND_CONSUMER_GROUP);
         (());
         (PLATFORM_PROMOTION_SEND_TOPIC, "*");
         (platFormPromotionListener);
         ();
         return consumer;
     }
 }


 @Component
 public class PlatFormPromotionListener implements MessageListenerConcurrently {
     //Message push factory provider
     @Autowired
     private FactoryProducer factoryProducer;
    
     @Autowired
     private RedisTemplate<String, String> redisTemplate;

     private static final int PERMITS = 30;
     private static final AtomicBoolean initializedRef = new AtomicBoolean(false);
     private static ThreadPoolExecutor THREAD_POOL_EXECUTOR = null;

     private static final Supplier<ThreadPoolExecutor> THREAD_POOL_EXECUTOR_SUPPLIER = () -> {
         if ((false, true)) {
             //corePoolSize is 30, maxPoolSize is 60
             THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(PERMITS, PERMITS * 2, 60, , new ArrayBlockingQueue<>(1000), ("consumePromotionMsg"), new ());
         }
         return THREAD_POOL_EXECUTOR;
     };
    
     //Concurrent consumption messages
     @Override
     public ConsumerConcurrentlyStatus consumerMessage(List<MessageExt> msgList, ConsumerConcurrentlyContext context) {
         try {
             //Use custom business thread pool
             List<CompletableFuture<AltResult>> futureList = ()
                 .map(e -> (() -> handleMessageExt(e), THREAD_POOL_EXECUTOR_SUPPLIER.get()))
                 .collect(());
             List<Throwable> resultList = ()
                 .map(CompletableFuture::join)
                 .filter(e -> != null)
                 .map(e -> ).collect(());
             if (!()) {
                 throw (0);
             }
         } catch (Throwable e) {
             ("consume error, platform coupon consumption failed", e);
             //This consumption failed, re-use next time
             return ConsumerConcurrentlyStatus.RECONSUME_LATER;
         }
         return ConsumerConcurrentlyStatus.CONSUME_SUCCESS;
     }

     private AltResult handleMessageExt(MessageExt messageExt) {
         try {
             ("Execute the platform sending notification message logic, message content: {}", ());
             String msg = new String(());
             PlatformPromotionMessage message = (msg, );

             //idempotent control
             if ((().get(()))) {
                 return new AltResult(null);
             }
             //Get message service factory
             MessageSendServiceFactory messageSendServiceFactory = (());
             //Message sending service component
             MessageSendService messageSendService = ();
             //Construct message
             PlatformMessagePushMessage messagePushMessage = ()
                 .informType(())
                 .mainMessage(())
                 .userAccountId(())
                 .message(())
                 .build();

             MessageSendDTO messageSendDTO = (messagePushMessage);
             (messageSendDTO);

             //After sending it successfully, logging it to redis
             ().set((), ().toString());

             ("Message push is completed, messageSendDTO:{}", messageSendDTO);
             (20);
             return new AltResult(null);
         } catch (Exception e) {
             return new AltResult(e);
         }
     }

     //The return result of completeFuture is suitable for cases where no return value is
     //The ex field is null, which means that the task execution is successful
     //The ex field is not null, which means that the task execution failed, and set the exception to the ex field
     private static class AltResult {
         final Throwable ex;
         public AltResult(Throwable ex) {
              = ex;
         }
     }
 }

 

10 million users’ lazy coupon code implementation

(1) The initial version of the issuance of coupons to all users

(2) Optimization implementation of lazily issuing coupons to all users

 

(1) The initial version of the issuance of coupons to all users

First, the marketing system slices the task of issuing coupons for all users, and then sends the message of the slice to the following Topic.

PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC
@RestController
 @RequestMapping("/demo/promotion/coupon")
 public class PromotionCouponController {
     //Promotional service
     @Autowired
     private CouponService couponService;
    
     //A new coupon event
     @PostMapping
     public JsonResult<SaveOrUpdateCouponDTO> saveOrUpdateCoupon(@RequestBody SaveOrUpdateCouponRequest request) {
         try {
             ("Add a new coupon: {}", (request));
             SaveOrUpdateCouponDTO dto = (request);
             return (dto);
         } catch (BaseBizException e) {
             ("biz error: request={}", (request), e);
             return ((), ());
         } catch (Exception e) {
             ("system error: request={}", (request), e);
             return (());
         }
     }
     ...
 }

 //Coupon interface implementation
 @Service
 public class CouponServiceImpl implements CouponService {
     ...
     //Save/Modify the coupon activity method
     @Transactional(rollbackFor = )
     @Override
     public SaveOrUpdateCouponDTO saveOrUpdateCoupon(SaveOrUpdateCouponRequest request) {
         SalesPromotionCouponDO couponDO = (request);
         (0);
         (couponDO);
      
         //Issue coupons to all users
         sendPlatformCouponMessage(couponDO);
      
         SaveOrUpdateCouponDTO dto = new SaveOrUpdateCouponDTO();
         (());
         (());
         (true);
         return dto;
     }
     ...
     //Issue coupons to all users
     private void sendPlatformCouponMessage(SalesPromotionCouponDO promotionCouponDO) {
         //The size of the bucket
         final int userBucketSize = 1000;
         final int messageBatchSize = 100;

         //1. Query the largest userId in the outbound database as the total number of users
         JsonResult<Long> maxUserIdJsonResult = ();
         if (()) {
             throw new BaseBizException((), ());
         }
         Long maxUserId = ();

         //2. Divide into m buckets, each bucket has n users, and each bucket sends a "batch send coupon user bucket message"
         //Example: maxUserId = 100w; userBucketSize=1000
         //userBucket1 = [1, 1001)
         //userBucket2 = [1001, 2001)
         //userBucketCount = 1000
         Map<Long, Long> userBuckets = new LinkedHashMap<>();
         AtomicBoolean flagRef = new AtomicBoolean(true);
         long startUserId = 1L;
         while (()) {
             if (startUserId > maxUserId) {
                 (true, false);
             }
             (startUserId, startUserId + userBucketSize);
             startUserId += userBucketSize;
         }

         //3. Send messages in batches
         //Example: userBucketCount = 1000; messageBatchSize = 100
         //The number of batch sends = 10 times. After two buckets, the number of messages sent here dropped from 100w to 10 times.
         int handledBucketCount = 0;
         List<String> jsonMessageBatch = new ArrayList<>(messageBatchSize);
         for (<Long, Long> userBucket : ()) {
             handledBucketCount++;
             PlatformCouponUserBucketMessage message = ()
                 .startUserId(())
                 .endUserId(())
                 .informType(())
                 .couponId(())
                 .activityStartTime(())
                 .activityEndTime(())
                 .couponType(())
                 .build();
             String jsonMessage = JsonUtil.object2Json(message);
             (jsonMessage);

             if (() == messageBatchSize || handledBucketCount == ()) {
                 (RocketMqConstant.PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC, jsonMessageBatch, "Platform issue coupon user bucket messages");
                 ();
             }
         }
     }
 }

Then the marketing system monitors the following Topic consumption shard task after the coupon issuance.

PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC

There are two ways to deal with this: 1. Use the thread pool directly to send a coupon message to MQ. 2. After merging the batch, use the thread pool to send MQ.

@Configuration
 public class ConsumerBeanConfig {
     ...
     //The platform issues coupons for users to buy consumers
     @Bean("platformCouponUserBucketReceiveTopicConsumer")
     public DefaultMQPushConsumer receiveCouponUserBucketConsumer(@Qualifier("platformCouponUserBucketReceiveTopicConsumer")PlatFormCouponUserBucketListener platFormCouponUserBucketListener) throws MQClientException {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_COUPON_SEND_USER_BUCKET_CONSUMER_GROUP);
         (());
         (PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC, "*");
         (platFormCouponUserBucketListener);
         ();
         return consumer;
     }
 }

 @Component
 public class PlatFormCouponUserBucketListener implements MessageListenerConcurrently {
     //Account Service
     @DubboReference(version = "1.0.0")
     private AccountApi accountApi;

     //The thread pool shared by sending messages
     @Autowired
     @Qualifier("sharedSendMsgThreadPool")
     private SafeThreadPool sharedSendMsgThreadPool;

     //RocketMQ Producer
     @Autowired
     private DefaultProducer defaultProducer;

     //Concurrent consumption messages
     @Override
     public ConsumerConcurrentlyStatus consumerMessage(List<MessageExt> msgList, ConsumerConcurrentlyContext context) {
         try {
             for (MessageExt messageExt : msgList) {
                 //1. Deserialize the message
                 String messageString = new String(());
                 ("Execute the message logic of the platform sending coupon user bucket, message content: {}", messageString);
                 PlatformCouponUserBucketMessage message = (messageString, );

                 //2. Query user information in the bucket
                 Long startUserId = ();
                 Long endUserId = ();
                 JsonResult<List<MembershipAccountDTO>> accountBucketResult = (startUserId, endUserId);
                 if (!()) {
                     throw new BaseBizException((), ());
                 }
                 List<MembershipAccountDTO> accountBucket = ();
                 if ((accountBucket)) {
                     ("The user is not queried based on the id range in the user bucket, startUserId={}, endUserId{}", startUserId, endUserId);
                     continue;
                 }

                 //3. Each user sends a "platform send coupon message"
                 //Method 1: Use the thread pool directly to send coupon messages to MQ;
                 //This is consumed in parallel. The above logic has been executed in parallel, and there are database search operations.
                 //AccountBucket is 1,000 users by default. You must send a "platform send coupon message" to each user, that is, 1,000 messages
                 //Below we use a thread pool to send these 1000 messages in parallel (ps: another type can also be sent in batches like sending coupon user bucket messages)
                 PlatformCouponMessage couponMessage = ()
                     .couponId(())
                     .activityStartTime(())
                     .activityEndTime(())
                     .couponType(())
                     .build();
                 for (MembershipAccountDTO account : accountBucket) {
                     (() -> {
                         (());
                         String jsonMessage = (couponMessage);
                         (RocketMqConstant.PLATFORM_COUPON_SEND_TOPIC, jsonMessage, "Platform Send Coupon Message");
                     });
                 }

                 //Method 2: After merging the batch, use the thread pool to send MQ
                 /*List<String> messages = new ArrayList<>(100);
                 for (MembershipAccountDTO account : accountBucket) {
                     (());
                     ((couponMessage));
                     if (() == 100) {
                         (() -> {
                             (RocketMqConstant.PLATFORM_COUPON_SEND_TOPIC, messages, "Platform sends coupon messages");
                         });
                         ();
                     }
                 }
                 //The rest are also sent in batches
                 if (!(messages)) {
                     (() -> {
                         (RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, messages, "Platform sends promotional messages");
                     });
                     ();
                 }*/
             }
         } catch (Exception e){
             ("consume error, platform coupon consumption failed", e);
             //This consumption failed, re-use next time
             return ConsumerConcurrentlyStatus.RECONSUME_LATER;
         }
         return ConsumerConcurrentlyStatus.CONSUME_SUCCESS;
     }
 }

Finally, the marketing system monitors the following Topic, and then issues coupons to each user through the thread pool.

PLATFORM_PROMOTION_SEND_TOPIC
@Configuration
 public class ConsumerBeanConfig {
     ...
     //The platform issues coupons to collect consumers
     @Bean("platformCouponReceiveTopicConsumer")
     public DefaultMQPushConsumer receiveCouponConsumer(PlatFormCouponListener platFormCouponListener) throws MQClientException {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_COUPON_SEND_CONSUMER_GROUP);
         (());
         (PLATFORM_COUPON_SEND_TOPIC, "*");
         (platFormCouponListener);
         ();
         return consumer;
     }
 }

 @Component
 public class PlatFormCouponListener implements MessageListenerConcurrently {
     //Coupon service service
     @Autowired
     private CouponItemService couponItemService;

     // When testing completeFuture, you do not need to initialize the business ThreadPoolExecutor when using commonPool
     // Use supplier to lazy load here to test completeFuture without initializing the thread pool when using commonPool
     //Initialize the thread pool only when using a custom thread using completeFuture
     private static final int PERMITS = 30;
     private static final AtomicBoolean initializedRef = new AtomicBoolean(false);
     private static ThreadPoolExecutor THREAD_POOL_EXECUTOR = null;
     private static final Supplier<ThreadPoolExecutor> THREAD_POOL_EXECUTOR_SUPPLIER = () -> {
         if ((false, true)) {
             THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(PERMITS, PERMITS * 2, 60, , new ArrayBlockingQueue<>(1000), ("consumeCouponMsg"), new ());
         }
         return THREAD_POOL_EXECUTOR;
     };

     //Concurrent consumption messages
     @Override
     public ConsumerConcurrentlyStatus consumerMessage(List<MessageExt> msgList, ConsumerConcurrentlyContext context) {
         try {
             //Method 1: Use the default commonPool to handle tasks
             //supplyAsync(Supplier<U> supplier) API
             //The default is () this thread pool
             //This thread pool is unique in jvm, the default number of threads is to minus 1 core number of CPU
             //If you feel that the number of threads is insufficient, you can adjust the parallelism of commonPool through the value of the jvm system parameter, or use method two
             List<CompletableFuture<SalesPromotionCouponItemDTO>> futureList = ()
                 .map(e -> (() -> handleMessageExt(e)))
                 .collect(());

             //Method 2: Use a custom business thread pool to handle tasks
             //List<CompletableFuture<SalesPromotionCouponItemDTO>> futureList = ()
             // .map(e -> (() -> handleMessageExt(e), THREAD_POOL_EXECUTOR_SUPPLIER.get()))
             // .collect(());

             List<SalesPromotionCouponItemDTO> couponItemDTOList = ()
                 .map(CompletableFuture::join)
                 .filter(Objects::nonNull)
                 .collect(());

             //Save the coupon to the database
             (couponItemDTOList);
         } catch (Exception e) {
             ("consume error, platform coupon consumption failed", e);
             //This consumption failed, re-use next time
             return ConsumerConcurrentlyStatus.RECONSUME_LATER;
         }
         return ConsumerConcurrentlyStatus.CONSUME_SUCCESS;
     }

     public SalesPromotionCouponItemDTO handleMessageExt(MessageExt messageExt) {
         ("Execute the information logic of the platform issuing coupons to consume, message content: {}", ());
         String msg = new String(());
         PlatformCouponMessage platformCouponMessage = (msg, );
         ("Start issue platform coupons, couponId:{}", ());

         //Idepotential logic prevents repeated consumption
         JsonResult<Long> result = ((), ());
         //If it already exists, skip the loop directly and no longer perform the coupon saving operation
         if (()) {
             return null;
         }

         SalesPromotionCouponItemDTO itemDTO = new SalesPromotionCouponItemDTO();
         (());
         (());
         (());
         (0);
         (());
         (());
         return itemDTO;
     }
 }

(2) Optimization implementation of lazily issuing coupons to all users

1. First, you need to configure the Redis related beans to use

@Data
@Configuration
@ConditionalOnClass()
public class RedisConfig {
    @Value("${}")
    private String host;
    @Value("${}")
    private String port;
    @Value("${}")
    private String password;
    @Value("${}")
    private int timeout;

    @Bean
    @ConditionalOnClass()
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        (redisConnectionFactory);
        (new StringRedisSerializer());
        ();
        return redisTemplate;
    }

    @Bean
    @ConditionalOnClass()
    public RedissonClient redissonClient() {
        Config config = new Config();
        ()
            .setAddress("redis://" + host + ":" + port)
            .setPassword(password)
            .setConnectionMinimumIdleSize(10)
            .setConnectionPoolSize(100)
            .setIdleConnectionTimeout(600000)
            .setSubscriptionConnectionMinimumIdleSize(10)
            .setSubscriptionConnectionPoolSize(100)
            .setTimeout(timeout);

        (new StringCodec());
        (5);
        (5);

        RedissonClient client = (config);
        return client;
    }

    @Bean
    @ConditionalOnClass()
    public RedisCache redisCache(RedisTemplate redisTemplate) {
        return new RedisCache(redisTemplate);
    }
}

2. Then implement the Redisson distributed lock-based maintenance of coupon cache list, as well as the lazy coupon expired cache cleaning

In fact, when adding a new coupon, write the coupon information to Redis, and check whether the coupon expires, and delete it if it expires.

//Coupon interface implementation
 @Service
 public class CouponServiceImpl implements CouponService {
     //Redis client tool
     @Autowired
     private RedisCache redisCache;
  
     @Autowired
     private RedissonClient redissonClient;
    
     ...
     //Save/Modify the coupon activity method
     @Transactional(rollbackFor = )
     @Override
     public SaveOrUpdateCouponDTO saveOrUpdateCoupon(SaveOrUpdateCouponRequest request) {
         SalesPromotionCouponDO couponDO = (request);
         (0);
         (couponDO);

         //Judge coupon type
         if (CouponSendTypeEnum.PLATFORM_SEND.getCode().equals(())) {
             //I. If it is the system distribution type, send coupons to MQ for all users
             writeCouponToRedis(couponDO);
         } else {
             //2. If you get the type yourself
             //TODO
         }

         SaveOrUpdateCouponDTO dto = new SaveOrUpdateCouponDTO();
         (());
         (());
         (true);
         return dto;
     }
    
     private void writeCouponToRedis(SalesPromotionCouponDO coupon) {
         //First of all, you need to use Redisson to make a distributed lock based on Redis to lock PROMOTION_COUPON_ID_LIST_LOCK
         //Open another data structure PROMOTION_COUPON_ID_LIST
         RLock lock = (RedisKey.PROMOTION_COUPON_ID_LIST_LOCK);
         try {
             //Click to lock, timeout time is 60s free
             (60, );
             List<Long> couponIds = null;

             String couponIdsJSON = (RedisKey.PROMOTION_COUPON_ID_LIST);
             if (couponIdsJSON == null || ("")) {
                 couponIds = new ArrayList<>();
             } else {
                 couponIds = (couponIdsJSON, );
             }

             //Check whether each coupon has expired. If it has expired or the coupon has been issued, delete it from List and delete it from Redis
             //If the coupon is issued in full, it will not be issued. Therefore, it can be issued to everyone. If the time exceeds the time, the coupon cannot be issued.
             if (() > 0) {
                 Iterator<Long> couponIdIterator = ();
                 while (()) {
                     Long tempCouponId = ();
                     String tempCouponJSON = (RedisKey.PROMOTION_COUPON_KEY + "::" + tempCouponId);
                     SalesPromotionCouponDO tempCoupon = (tempCouponJSON, );

                     Date now = new Date();
                     if ((())) {
                         ();
                         (RedisKey.PROMOTION_COUPON_KEY + "::" + tempCouponId);
                     }
                 }
             }

             (());
             couponIdsJSON = JsonUtil.object2Json(couponIds);
             (RedisKey.PROMOTION_COUPON_ID_LIST, couponIdsJSON, -1);

             String couponJSON = JsonUtil.object2Json(coupon);
             (RedisKey.PROMOTION_COUPON_KEY + "::" + (), couponJSON, -1);
         } finally {
             ();
         }
     }
 }

3. Then realize the member system publishing user login events + the marketing system's lazy coupon issuance after the user login

The member system will send a message to the USER_LOGINED_EVENT_TOPIC of MQ when the user logs in:

@RestController
 @RequestMapping("/demo/membership")
 public class MembershipController {
     @Autowired
     private DefaultProducer defaultProducer;

     @Autowired
     private MembershipAccountService accountService;

     //Trigger user login
     @PostMapping("/triggerUserLoginEvent")
     public JsonResult<Boolean> triggerUserLoginEvent(Long accountId) {
         try {
             List<MembershipAccountDTO> accounts = (accountId, accountId);
             if (accounts != null && () > 0) {
                 MembershipAccountDTO account = (0);
                 UserLoginedEvent userLoginedEvent = new UserLoginedEvent();
                 (account);

                 String userLoginedEventJSON = JsonUtil.object2Json(userLoginedEvent);
                 (RocketMqConstant.USER_LOGINED_EVENT_TOPIC, userLoginedEventJSON, "The user login event occurred");
             }
             return (true);
         } catch (BaseBizException e) {
             ("biz error: request={}", accountId, e);
             return ((), ());
         } catch (Exception e) {
             ("system error: request={}", accountId, e);
             return (());
         }
     }
 }

The marketing system listens to USER_LOGINED_EVENT_TOPIC to issue lazy coupons to the login event messages sent by the user when logging in:

@Configuration
 public class ConsumerBeanConfig {
     ...
     @Bean("userLoginedEventListener")
     public DefaultMQPushConsumer userLoginedEventListener(UserLoginedEventListener userLoginedEventListener) throws MQClientException {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(USER_LOGINED_EVENT_CONSUMER_GROUP);
         (());
         (USER_LOGINED_EVENT_TOPIC, "*");
         (userLoginedEventListener);
         ();
         return consumer;
     }
 }

 //User login event listener
 @Component
 public class UserLoginedEventListener implements MessageListenerConcurrently {
     @Autowired
     private RedisCache redisCache;

     @Autowired
     private SalesPromotionCouponItemDAO couponItemDAO;

     @Override
     public ConsumerConcurrentlyStatus consumerMessage(List<MessageExt> list, ConsumerConcurrentlyContext consumeConcurrentlyContext) {
         try {
             //Check all coupons from the Redis cache
             String couponIdsJSON = (RedisKey.PROMOTION_COUPON_ID_LIST);
             List<Long> couponIds = (couponIdsJSON, );
             List<SalesPromotionCouponDO> coupons = new ArrayList<>();

             for (Long couponId : couponIds) {
                 String couponJSON = (RedisKey.PROMOTION_COUPON_KEY + "::" + couponId);
                 SalesPromotionCouponDO coupon = (couponJSON, );
                 Date now = new Date();
                 if ((()) && (())) {
                     (coupon);
                 }
             }

             for (MessageExt messageExt : list) {
                 //This code can get a user who has just logged in successfully
                 String message = new String(());
                 UserLoginedEvent userLoginedEvent = (message, );
                 MembershipAccountDTO account = ();

                 //Travel through each coupon, check whether the coupon is valid, whether the coupon can continue to issue coupons, and whether the current user has issued coupons, and then issue coupons to the user
                 for (SalesPromotionCouponDO coupon : coupons) {
                     String receiveCouponFlag = (RedisKey.PROMOTION_USER_RECEIVE_COUPON + "::" + () + "::" + ());
                     if (receiveCouponFlag == null || ("")) {
                         SalesPromotionCouponItemDO couponItem = new SalesPromotionCouponItemDO();
                         (());
                         (());
                         (());
                         (());
                         (new Date());
                         (());
                         (0);
                         (new Date());
                         (());
                         (());
                         (couponItem);
                         (RedisKey.PROMOTION_USER_RECEIVE_COUPON + "::" + () + "::" + (), "true", -1);
                     }
                 }
             }
         } catch(Exception e) {
             ("consume error, user login event handling exception", e);
             return ConsumerConcurrentlyStatus.RECONSUME_LATER;
         }
         return ConsumerConcurrentlyStatus.CONSUME_SUCCESS;
     }
 }

 

11. Implementation of code for coupons for specified user groups

(1) Summary of the plan for pushing and issuing coupons for tens of millions of users

(2) The coupon is implemented by designated user groups

 

(1) Summary of the plan for pushing and issuing coupons for tens of millions of users

When creating promotions and issuing coupons, they are not directly pushed and distributed, but are used to transfer and asynchronously process multiple times.

 

The marketing system will first find out the total number of users, then perform task sharding, and then merge the sharded task messages to MQ through batch (asynthetic improvement of performance).

 

The marketing system will consume these sharded task messages, query the user and encapsulate the messages of each user, and then send them to MQ (decoupled membership system and push system).

 

The push system will consume each user's push messages and use multi-thread concurrent push based on the thread pool.

 

(2) The coupon is implemented by designated user groups

A typical example is issuing coupons to activate millions of inactive users.

 

1. The marketing system creates the entrance code for coupons issued by designated user groups

@RestController
 @RequestMapping("/demo/promotion/coupon")
 public class PromotionCouponController {
     ...
     @RequestMapping("/send")
     public JsonResult<SendCouponDTO> sendCouponByConditions(@RequestBody SendCouponRequest request) {
         try {
             ("Send coupons to the designated user group: {}", (request));
             SendCouponDTO dto = (request);
             return (dto);
         } catch (BaseBizException e) {
             ("biz error: request={}", (request), e);
             return ((), ());
         } catch (Exception e) {
             ("system error: request={}", (request), e);
             return (());
         }
     }
 }

 @Service
 public class CouponServiceImpl implements CouponService {
     @DubboReference(version = "1.0.0")
     private MessagePushApi messagePushApi;
     ...
    
     @Transactional(rollbackFor = )
     @Override
     public SendCouponDTO sendCouponByConditions(SendCouponRequest sendCouponRequest) {
         //Save coupon information
         SalesPromotionCouponDO couponDO = (sendCouponRequest);
         (0);
         (());
         (CouponSendTypeEnum.SELF_RECEIVE.getCode());
         (couponDO);

         //Send coupon messages in sharding and batch sending
         shardBatchSendCouponMessage(sendCouponRequest);

         SendCouponDTO sendCouponDTO = new SendCouponDTO();
         ();
         (());
         (());

         //TODO
         (0);
         return sendCouponDTO;
     }
 }

2. Codes for sending coupon messages in segments and batches of marketing systems

Among them, you need to go to the portrait system to obtain user information, then shard it according to the number of users, and then send it in batches to the following Topic of MQ.

PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC
@Service
 public class CouponServiceImpl implements CouponService {
     ...
     //Send coupon messages in sharding and batch sending
     private void shardBatchSendCouponMessage(SendCouponRequest sendCouponRequest) {
         //1. Go to the portrait system to get the count value under the current conditions
         MembershipFilterDTO membershipFilterDTO = ();
         PersonaFilterConditionDTO conditionDTO = (membershipFilterDTO);
         JsonResult<Integer> countResult = (conditionDTO);
         if (!()) {
             throw new BaseBizException((), ());
         }

         //2.Slice according to count value
         //Divide it into m slices, each slice contains: (1) shard ID; (2) number of users;
         //Example: maxUserId = 100w; userBucketSize=1000
         //userBucket1 = [1, 1000)
         //userBucket2 = [2, 1000)
         //userBucket2 = [n, 756), the last shard may be less than 1000
         //userBucketCount = 1000
         Integer count = ();
         Map<Integer, Integer> userBuckets = new LinkedHashMap<>();
         AtomicBoolean flagRef = new AtomicBoolean(true);
         Integer shardId = 1;
         while (()) {
             if (USER_BUCKET_SIZE > count) {
                 (shardId, USER_BUCKET_SIZE);
                 (true, false);
             }
             (shardId, USER_BUCKET_SIZE);
             shardId += 1;
             count -= USER_BUCKET_SIZE;
         }

         //3. Send messages in batches
         //Example: userBucketCount = 1000; messageBatchSize = 100
         List<String> messages = new ArrayList<>();
         PlatformPromotionConditionUserBucketMessage message = ().personaFilterCondition((conditionDTO)).build();
         for (<Integer, Integer> userBucket : ()) {
             (());
             (());
             String jsonMessage = JsonUtil.object2Json(message);
             (jsonMessage);
         }
         ("The number of messages pushed this time, {}",());

         ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
         while (()) {
             List<String> sendBatch = ();
             ("Number of messages in this batch, {}",());
             (() -> {
                 (RocketMqConstant.PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC, sendBatch, "Some User Promotion User Bucket Message");
             });
         }
     }
 }

3. The marketing system handles and pushes the message of sharding of specified user groups

First, the marketing system will consume the following Topics of MQ:

PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC

Then, the marketing system will send the message of receiving the coupon to the following Topic of MQ:

PLATFORM_CONDITION_COUPON_SEND_TOPIC
@Configuration
 public class ConsumerBeanConfig {
     ...
     @Bean("platFormConditionCouponUserBucketConsumer")
     public DefaultMQPushConsumer platFormConditionCouponUserBucketConsumer(PlatFormConditionCouponUserBucketListener platFormPromotionUserBucketListener) throws MQClientException {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_CONSUMER_GROUP);
         (());
         (PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC, "*");
         (platFormPromotionUserBucketListener);
         ();
         return consumer;
     }
 }

 @Component
 public class PlatFormConditionCouponUserBucketListener implements MessageListenerConcurrently {
     //User Portrait Service
     @DubboReference(version = "1.0.0")
     private PersonaApi personaApi;

     //The thread pool shared by sending messages
     @Autowired
     @Qualifier("sharedSendMsgThreadPool")
     private SafeThreadPool sharedSendMsgThreadPool;

     //RocketMQ Producer
     @Autowired
     private DefaultProducer defaultProducer;

     //Concurrent consumption messages
     @Override
     public ConsumerConcurrentlyStatus consumerMessage(List<MessageExt> msgList, ConsumerConcurrentlyContext context) {
         try {
             for (MessageExt messageExt : msgList) {
                 //1. Deserialize the message
                 String messageString = new String(());
                 ("Some users receive coupons and user bucket message logic, message content: {}", messageString);
                 PlatformPromotionConditionUserBucketMessage message = (messageString, );

                 //2. Query user information in the bucket
                 Integer shardId = ();

                 // Calculate the start userId of this shard based on the shard id and the number of shards.
                 Long startUserId = (() - 1) * 1000;
                 Integer bucketSize = ();
                 String personaFilterCondition = ();
                 PersonaFilterConditionDTO personaFilterConditionDTO = (personaFilterCondition, );

                 //Encapsulate the conditions for querying user id
                 PersonaConditionPage page = ()
                     .memberPoint(())
                     .memberLevel(())
                     .offset(startUserId)
                     .limit(bucketSize)
                     .build();

                 //Check the user account id from the user portrait system
                 JsonResult<List<Long>> accountIdsResult = (page);
                 if (!()) {
                     throw new BaseBizException((), ());
                 }

                 List<Long> accountIds = ();
                 if ((accountIds)) {
                     ("The user was not queried based on the shard information in the user bucket, shardId={}", shardId);
                     continue;
                 }

                 //3. Each user sends a message to receive coupons.
                 PlatformMessagePushMessage pushMessage = ()
                     .message("Congratulations on obtaining coupon qualification, click to enter the event page")
                     .mainMessage("Eligible for coupons")
                     .informType(())
                     .build();

                 List<String> messages = new ArrayList<>();
                 for (Long accountId : accountIds) {
                     (accountId);
                     ((pushMessage));
                 }
                 ("The number of messages pushed this time, {}",());

                 ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
                 while (()) {
                     List<String> sendBatch = ();
                     (() -> {
                         (RocketMqConstant.PLATFORM_CONDITION_COUPON_SEND_TOPIC, sendBatch, "Platform Send Coupon Message");
                     });
                 }
             }
         } catch (Exception e){
             ("consume error, consumption failed", e);
             return ConsumerConcurrentlyStatus.RECONSUME_LATER;
         }
         return ConsumerConcurrentlyStatus.CONSUME_SUCCESS;
     }
 }

4. The push system will listen to the following Topic consumption messages push the message of receiving coupons.

PLATFORM_CONDITION_COUPON_SEND_TOPIC
@Configuration
 public class ConsumerBeanConfig {
     ...
     @Bean("platFormConditionCouponConsumer")
     public DefaultMQPushConsumer platFormConditionCouponConsumer(PlatFormConditionCouponListener platFormPromotionListener) throws MQClientException {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_CONDITION_COUPON_SEND_CONSUMER_GROUP);
       (());
       (PLATFORM_CONDITION_COUPON_SEND_TOPIC, "*");
       (platFormPromotionListener);
       ();
       return consumer;
     }
 }

 @Component
 public class PlatFormConditionCouponListener implements MessageListenerConcurrently {
     //Message push factory provider
     @Autowired
     private FactoryProducer factoryProducer;

     @Autowired
     private RedisTemplate<String, String> redisTemplate;

     //Concurrent consumption messages
     @Override
     public ConsumerConcurrentlyStatus consumerMessage(List<MessageExt> msgList, ConsumerConcurrentlyContext context) {
         try {
             //Method 2: Use a custom business thread pool to handle tasks
             List<CompletableFuture<AltResult>> futureList = ()
                 .map(e -> (() -> handleMessageExt(e)))
                 .collect(());

             List<Throwable> resultList = ()
                 .map(CompletableFuture::join)
                 .filter(e -> != null)
                 .map(e -> )
                 .collect(());
         } catch (Exception e) {
             ("consume error, consumption failed", e);
             return ConsumerConcurrentlyStatus.RECONSUME_LATER;
         }
         return ConsumerConcurrentlyStatus.CONSUME_SUCCESS;
     }

     private AltResult handleMessageExt(MessageExt messageExt) {
         try {
             ("Execute the platform sending notification message logic, message content: {}", ());
             String msg = new String(());
             PlatformMessagePushMessage message = (msg, );

             //idempotent control
             if ((().get(PROMOTION_CONDITION_COUPON_KEY + ()))) {
                 return new AltResult(null);
             }

             //Get message service factory
             MessageSendServiceFactory messageSendServiceFactory = (());

             //Message sending service component
             MessageSendService messageSendService = ();

             //Construct message
             PlatformMessagePushMessage messagePushMessage = ()
                 .informType(())
                 .mainMessage(())
                 .userAccountId(())
                 .message(())
                 .build();

             MessageSendDTO messageSendDTO = (messagePushMessage);
             (messageSendDTO);

             //After sending it successfully, logging it to redis
             ().set(PROMOTION_CONDITION_COUPON_KEY + (), ().toString());
             ("Message push is completed, messageSendDTO:{}", messageSendDTO);

             return new AltResult(null);
         } catch (Exception e) {
             return new AltResult(e);
         }
     }

     //The return result of completeFuture is suitable for cases where no return value is
     //The ex field is null, which means that the task execution is successful
     //The ex field is not null, which means that the task execution failed, and set the exception to the ex field
     private static class AltResult {
         final Throwable ex;
         public AltResult(Throwable ex) {
              = ex;
         }
     }
 }

 

12. Refactoring of batch merging algorithm for sharded messages

The previous batch merge was merged according to the number of characters, and now it is reconstructed to merge according to the merged size of no more than 800KB and no more than 100 characters.

public class ListSplitter implements Iterator<List<String>> {
     //Set each batch at most no more than 800k, because RocketMQ official recommendation, it is not recommended that a message be longer than 1MB in length.
     //Encapsulate a RocketMQ message, including MessageBody, Topic, Addr and other data, so set it to be smaller
     private int sizeLimit = 800 * 1024;
     private final List<String> messages;
     private int currIndex;
     private int batchSize = 100;

     public ListSplitter(List<String> messages, Integer batchSize) {
          = messages;
          = batchSize;
     }

     public ListSplitter(List<String> messages) {
          = messages;
     }

     @Override
     public boolean hasNext() {
         return currIndex < ();
     }

     //Take a part of it from the list each time
     @Override
     public List<String> next() {
         int nextIndex = currIndex;
         int totalSize = 0;
         for (; nextIndex < (); nextIndex++) {
             String message = (nextIndex);
             //Get the length of each message
             int tmpSize = ();
             if (tmpSize > sizeLimit) {
                 if (nextIndex - currIndex == 0) {
                     nextIndex++;
                 }
                 break;
             }
             if (tmpSize + totalSize > sizeLimit || (nextIndex - currIndex) == batchSize ) {
                 break;
             } else {
                 totalSize += tmpSize;
             }
         }
         List<String> subList = (currIndex, nextIndex);
         currIndex = nextIndex;
         return subList;
     }

     @Override
     public void remove() {
         throw new UnsupportedOperationException("Not allowed to remove");
     }
 }

 

13. Implementation of push code for millions of portraits

(1) Timed push tasks of marketing system

(2) Push system consumption shard push task messages, send specific user push messages to MQ

(3) The push system consumes push messages from specific users for real push

 

(1) Timed push tasks of marketing system

@Component
 public class ScheduleSendMessageJobHandler {
     ...
     //Execute timed tasks, filter popular products and users to send them to MQ
     @XxlJob("hotGoodsPushHandler")
     public void hotGoodsPushHandler() {
         ("hotGoodsPushHandler starts execution");

         //Get popular products and user portraits, and simplify the business into a one-to-one relationship first
         List<HotGoodsCrontabDO> crontabDOs = (new Date());
         ("Get popular product and user profile data, crontabDOs:{}", JsonUtil.object2Json(crontabDOs));

         //Find out the user matching the corresponding portrait of each popular product
         for (HotGoodsCrontabDO crontabDO : crontabDOs) {
             ("Automatic sharding logic, current task: crontabDO:{}", JsonUtil.object2Json(crontabDO));
             if ((())) {
                 continue;
             }

             //Portraits corresponding to popular products
             MembershipPointDTO membershipPointDTO = JsonUtil.json2Object((), );
             if ((membershipPointDTO)) {
                 continue;
             }

             //Get the user entity that matches the image
             MembershipFilterConditionDTO conditionDTO = buildCondition(membershipPointDTO);
             PersonaFilterConditionDTO personaFilterConditionDTO = (conditionDTO);
             ("User query conditions: {}", personaFilterConditionDTO);

             //Get the maximum and minimum user ID matching the portrait user
             JsonResult<Long> accountMaxIdResult = (personaFilterConditionDTO);
             ("Get the maximum ID, result:{}", JsonUtil.object2Json(accountMaxIdResult));
             if (!()) {
                 ("Failed to get maximum ID, condition:{}", JsonUtil.object2Json(personaFilterConditionDTO));
                 throw new BaseBizException((), ());
             }
             JsonResult<Long> accountMinIdResult = (personaFilterConditionDTO);
             ("Get the minimum ID, result:{}", JsonUtil.object2Json(accountMinIdResult));
             if (!()) {
                 ("Failed to get the minimum ID, condition:{}", JsonUtil.object2Json(personaFilterConditionDTO));
                 throw new BaseBizException((), ());
             }

             //The user start ID that needs to be pushed
             //Note: This is an estimated value, because there will be many users who do not meet the criteria between the minimum ID and the maximum ID.
             //For these users, you need to filter out the next level of business logic using the selection criteria.
             Long minUserId = ();
             Long maxUserId = ();

             //Bucket is a user shard, corresponding to a startUserId -> endUserId, user ID range
             //According to certain algorithms, tens of millions of users can be used to push tasks to shard. For example, a push task after sharding includes 1,000 users/2,000 users
             //There are tens of thousands of key-value pairs in userBuckets. Each key-value pair is a startUserId -> endUserId push task shard
             final int userBucketSize = 1000;
             Map<Long, Long> userBuckets = new LinkedHashMap<>();
             AtomicBoolean doSharding = new AtomicBoolean(true);
             long startUserId = minUserId;
             ("Start sharding of the task population, startId:{}",minUserId);
             while (()) {
                 if ((maxUserId -minUserId) < userBucketSize) {
                     (startUserId, maxUserId);
                     (true, false);
                     break;
                 }
                 (startUserId, startUserId + userBucketSize);
                 startUserId += userBucketSize;
                 maxUserId -= userBucketSize;
             }

             //Combine the batch of RocketMQ messages that may be pushed by thousands of shards, and send tasks to MQ in batch mode, reducing the time spent on communicating with RocketMQ network
             List<String> hotProductPushTasks = new ArrayList<>();
             HotGoodsVO hotGoodsVO = buildHotGoodsVO(crontabDO);
             PlatformHotProductUserBucketMessage bucketMessage = ()
                 .hotGoodsVO((hotGoodsVO))
                 .personaFilterConditionDTO((personaFilterConditionDTO))
                 .build();
             for (<Long, Long> userBucket : ()) {
                 (());
                 (());

                 String promotionPushTaskJSON = JsonUtil.object2Json(bucketMessage);
                 ("User bucket construction side selection criteria: {}",());
                 (promotionPushTaskJSON);
             }

             //MESSAGE_BATCH_SIZE refers to the message batch size. Each batch message of RocketMQ contains 100 push tasks.
             //This way, 10,000 push task messages can be merged into 100 batch messages, and 100 network communications can be conducted to RocketMQ, which can greatly reduce the time spent sending messages
             ListSplitter splitter = new ListSplitter(hotProductPushTasks, MESSAGE_BATCH_SIZE);
             while (()) {
                 List<String> sendBatch = ();
                 ("Number of messages in this batch, {}",());
                 (() -> {
                     (RocketMqConstant.PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC, sendBatch, "Platform Popular Products Timed Task User Bucket Message");
                 });
             }
         }
     }
 }

(2) Push system consumption shard push task messages, send specific user push messages to MQ

The following Topics for monitoring MQ:

PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC
@Configuration
 public class ConsumerBeanConfig {
     ...
     @Bean("PlatFormHotProductUserBucketConsumer")
     public DefaultMQPushConsumer PlatFormHotProductUserBucketConsumer(PlatFormHotProductUserBucketListener platFormHotProductUserBucketListener) throws MQClientException {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_CONSUMER_GROUP);
         (());
         (PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC, "*");
         (platFormHotProductUserBucketListener);
         ();
         return consumer;
     }
 }

 @Component
 public class PlatFormHotProductUserBucketListener implements MessageListenerConcurrently {
     @DubboReference(version = "1.0.0")
     private PersonaApi personaApi;

     @Autowired
     private DefaultProducer producer;

     //Public message push thread pool
     @Autowired
     @Qualifier("sharedSendMsgThreadPool")
     private SafeThreadPool sharedSendMsgThreadPool;
    
     //Concurrent consumption messages
     @Override
     public ConsumerConcurrentlyStatus consumerMessage(List<MessageExt> msgList, ConsumerConcurrentlyContext context) {
         try {
             for (MessageExt messageExt : msgList) {
                 String msg = new String(());
                 PlatformHotProductUserBucketMessage hotProductMessagePussagePushTask = (msg, );
                 ("Execute the message logic of popular product push user bucket, message content: {}", hotProductMessagePushTask);

                 //1. Obtain the data in this popular product push task shard
                 String hotGoodString = ();
                 String personaFilterCondition = ();
                 HotGoodsVO hotGoodsVO = (hotGoodString, );
                 ("Selecting conditions, content: {}", personaFilterCondition);
                 if ((personaFilterCondition) || (hotGoodsVO)) {
                     continue;
                 }

                 PersonaFilterConditionDTO conditionDTO = (personaFilterCondition, );
                 Long startUserId = ();
                 Long endUserId = ();

                 //Pagination query conditions
                 PersonaConditionWithIdRange page = ()
                     .memberLevel(())
                     .memberPoint(())
                     .startId(startUserId)
                     .endId(endUserId)
                     .build();

                 //2. Query the user group corresponding to this push task shard
                 //Note: When querying, pass in query conditions to filter out user ids that do not meet the conditions

                 JsonResult<List<Long>> queryAccountIdsResult = (page);
                 List<Long> accountIds = ();

                 PlatformHotProductMessage hotMessage = ()
                     .goodsName(())
                     .goodsDesc(())
                     .keyWords(())
                     .build();
                 int handledBucketCount = 0;
                 List<String> messages = new ArrayList<>();
                 for (Long accountId : accountIds) {
                     handledBucketCount++;
                     (accountId);
                     ("Construct popular product MQ message, hotMessage: {}", hotMessage);
                     ((hotMessage));
                 }
                 ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
                 while (()) {
                     List<String> sendBatch = ();
                     ("Number of messages in this batch, {}", ());
                     (() -> {
                         (RocketMqConstant.PLATFORM_HOT_PRODUCT_SEND_TOPIC, sendBatch, "Platform Popular Products Timed Task User Bucket Message");
                     });
                 }
             }
         } catch (Exception e) {
             ("consume error, popular products notify consumption failed", e);
             //Because this is a push task, you can discard it if you fail if you fail
             return ConsumerConcurrentlyStatus.CONSUME_SUCCESS;
         }
         return ConsumerConcurrentlyStatus.CONSUME_SUCCESS;
     }
 }

(3) The push system consumes push messages from specific users for real push

The following Topics for monitoring MQ:

PLATFORM_HOT_PRODUCT_SEND_TOPIC
@Configuration
 public class ConsumerBeanConfig {
     ...
     @Bean("platformHotProductSendTopicConsumer")
     public DefaultMQPushConsumer platformHotProductConsumer(PlatFormHotProductListener platFormHotProductListener) throws MQClientException {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_HOT_PRODUCT_SEND_CONSUMER_GROUP);
         (());
         (PLATFORM_HOT_PRODUCT_SEND_TOPIC, "*");
         (platFormHotProductListener);
         ();
         return consumer;
     }
 }

 @Component
 public class PlatFormHotProductListener implements MessageListenerConcurrently {
     //Concurrent consumption messages
     @Override
     public ConsumerConcurrentlyStatus consumerMessage(List<MessageExt> msgList, ConsumerConcurrentlyContext context) {
         try {
             for (MessageExt messageExt : msgList) {
                 ("Execute the platform sending notification message logic, message content: {}", ());
                 String msg = new String(());
                 HashMap hotProductMessage = (msg , );

                 //Push notification
                 informByPush(hotProductMessage);
             }
         } catch (Exception e) {
             ("consume error, platform coupon consumption failed", e);
             //This consumption failed, re-use next time
             return ConsumerConcurrentlyStatus.RECONSUME_LATER;
         }
         return ConsumerConcurrentlyStatus.CONSUME_SUCCESS;
     }

     //Third-party platforms push messages to the app
     private void informByPush(HashMap message){
         String messageBody = "Speed ​​poke! Exquisite small objects,"
             + ("keywords")+"!"
             + ("goodsName")
             + ("goodsDesc");
         ("Message push: message content: {}", messageBody);
     }
 }

 

14. Full-link pressure measurement of PUSH in million-level users in production environment

Deployment of Push production environment for millions of users: The marketing system deploys 5 2-core 4G machines, the push system deploys 5 2-core 4G machines, 1 member system, and 1 portrait system.

 

Suppose the member system has a total of 150w user data, and now a promotional activity for push notifications for all members is launched, and the total amount of MQ data is about 150w. It takes about 27 minutes to start an activity for all members, and the push of all messages is completed, and the overall efficiency is still relatively high. So if it is a push of tens of millions, it will take about 27 * 5 to about 3 hours, which is quite similar to our expectations.

 

If 1.5 million users push tasks and shard them by 1,000 users, there will be a total of 1,500 shard tasks, and each shard task needs to process 1,000 messages. These 1500 personal shard tasks are sent to MQ through batch merge, which is also very fast. Assuming that every 100 shards are merged into one batch, there are only 15 batch messages. There are 5 push machines, and each machine has 30 threads, so there are a total of 150 threads. Suppose that it takes 200ms for a thread to push a message, then each thread can push 5 messages per second, and 5 machines can push 750 messages per second. 1.5 million / 750 = 2000s = 30 minutes, which is the result of calculating the estimated, which is not much different from the actual 27 minutes.

 

In some scenarios, such message push of all members does not need to receive push results. If you directly abandon the operation of obtaining push results, the efficiency can be slightly improved.