Location>code7788 >text

It smells good to use MQ like this at work!

Popularity:764 ℃/2024-12-19 20:36:04

preamble

Message Queuing (MQ) is one of the indispensable technologies in distributed systems.

For many partners, when you first contact MQ, you may think that it is just a "messaging tool", but when you use it, you will find that it is simply the "lubricant" of the system.

Whether it's decoupling, peak shaving, or asynchronous task processing, you can't get away from MQ.

Below I combined with practical scenarios, from simple to complex, one by one to break down the 10 classic use of MQ, I hope you will be helpful.

1. Asynchronous processing: making it easy on the system

take

Partners are not often encountered in this situation: the user submits an operation, such as placing an order, and then to send an SMS notification.

If you call the SMS interface directly in the main process, once the SMS service responds slowly, it will slow down the whole operation.

Users get tired of waiting and their minds simply fall apart.

prescription

Use MQ to pull out non-critical processes for asynchronous processing. When placing an order, directly to the MQ to "send a text message" thing, the order service can immediately respond to the user, and the text message thing to let the MQ and the consumer to take care of.

sample code (computing)

// Order service: producer
Order order = createOrder(); // Order generation logic
("order_exchange", "order_key", order); ("Order has been generated.
("Order has been generated, texting task to MQ"); ;

// SMS service: consumer
@RabbitListener(queues = "sms_queue")
public void sendSms(Order order) {
    ("Sending SMS, Order ID: " + ()); // Call the SMS service interface.
    // Call the SMS service interface
}

in-depth analysis

The benefits of this approach are:Decoupling of the main process from the drag of slow services. The order service just minds its own business. It doesn't matter if the SMS service hangs up. MQ will put the message on hold and continue to process it when the SMS service is back up.

2. Peak flow reduction: stabilizing the system against collapse

take

The annual "Double Eleven" e-commerce promotion, the user kills the goods rushed in a swarm.

The sudden influx of highly concurrent requests will not only overwhelm the application service, but also directly make the database "down".

prescription

The spike request is first written to the MQ, and the back-end service consumes messages from the MQ at a steady rate to process the order.

This prevents the system from being overwhelmed by transient flows and also improves the smoothness of the treatment.

sample code (computing)

// User submits seckill request: producer
("seckill_exchange", "seckill_key", userRequest); ("seckill_exchange", "seckill_key", userRequest).
("User seckill request is in the queue"); ;

// Seconds service: consumer
@RabbitListener(queues = "seckill_queue")
public void processSeckill(UserRequest request) {
    ("Processing a Seckill request, user ID: " + ());
    // Execute the seckill logic
}

in-depth analysis

MQ is the equivalent of a buffer pool here, distributing transient traffic evenly over a period of time for processing.Improved system stability and better user experience

3. Service decoupling: reducing interlocking constraints

take

For example, an order system needs to notify the inventory system to deduct inventory and also notify the payment system to complete the deduction.

If you call it directly with a synchronized interface, the dependencies between services are so strong that if one service hangs, the whole chain will be dragged down.

prescription

The order service is only responsible for dropping messages into the MQ, and the inventory service and payment service each consume messages from the MQ.

This way the order service doesn't need to rely on them directly.

sample code (computing)

// Order service: producer
("order_exchange", "order_key", order);
("Order Generation Message Sent"); ;

// Inventory service: consumer
@RabbitListener(queues = "stock_queue")
public void updateStock(Order order) {
    ("Deducting stock, Order ID: " + ());
}

// Payment service: consumer
@RabbitListener(queues = "payment_queue")
public void processPayment(Order order) {
    ("Processing payment, Order ID: " + ());
}

in-depth analysis

With MQ, services can be loosely coupled.

Even if the inventory service hangs, it won't affect the order generation process, theDramatically increase the fault tolerance of the system

4. Distributed transactions: ensuring data consistency

take

The Order Service requires both order generation and inventory deduction, which involves two different database operations.

If one succeeds and one fails, it results in inconsistent data.

prescription

Distributed transactions via MQ.

After the order service generates the order, it gives the task of deducting inventory to the MQ, which ultimately achieves data consistency.

sample code (computing)

// Order service: producer
("order_exchange", "order_key", order); ("Order_exchange", "order_key", order);
("Order Creation Message Sent"); ;

// Inventory service: consumer
@RabbitListener(queues = "stock_queue")
public void updateStock(Order order) {
    ("Updating stock, Order ID: " + ());
    // Execute the stock deduction logic
}

in-depth analysis

The problem of distributed transactions is solved by "eventual consistency", which means that although the data may be inconsistent for a short period of time, the final state must be correct.

5. Broadcast notification: one message to notify multiple services

take

For example, if the price of a product is adjusted, the inventory, search, and recommendation services need to be synchronized and updated.

If each service had to be notified individually, the workload would be enormous.

prescription

MQ's broadcast mode (Fanout) allows multiple consumers to subscribe to the same message, realizing "one send, many receive".

sample code (computing)

// Producer: broadcast message
("price_update_exchange", "", priceUpdate);
("Item price update message broadcast");;

// Consumer 1: Inventory Service
@RabbitListener(queues = "stock_queue")
public void updateStockPrice(PriceUpdate priceUpdate) {
    ("Stock price update: " + ());
}

// Consumer 2: Search Service
@RabbitListener(queues = "search_queue")
public void updateSearchPrice(PriceUpdate priceUpdate) {
    ("Search price update:" + ());
}

in-depth analysis

This pattern allows multiple services to receive the same message, theVery scalable

6. Log collection: centralization of distributed logs

take

Logs generated by multiple services need to be stored and analyzed in a uniform manner.

If you write directly to the database, it can lead to performance bottlenecks.

prescription

Each service writes logs to the MQ, and the log analysis system consumes messages from the MQ and processes them uniformly.

sample code (computing)

// server-side: producer
("log_exchange", "log_key", logEntry); ("Log_exchange", "log_key", logEntry);
("log sent");;

// Log analysis service: consumer
@RabbitListener(queues = "log_queue")
public void processLog(LogEntry log) {
    ("Log processing: " + ());
    // Store or analyze the logic
}

7. Deferred tasks: timed trigger operations

take

After a user places an order, if payment is not made within 30 minutes, the order needs to be automatically canceled.

prescription

Use MQ's delayed queuing feature to set how long messages are delayed from being consumed.

sample code (computing)

// Producer: send delay message
("delay_exchange", "delay_key", order, message -> {
    ().setDelay(30 * 60 * 1000); // delay 30 minutes
    return message; }); { ().setDelay(30 * 60 * 1000)
}); { ("order_cancellation_key", order, message -> {
("Order cancelation task is set");;

// Consumer: handling delayed messages
@RabbitListener(queues = "delay_queue")
public void cancelOrder(Order order) {
    ("Cancel order: " + ()); }
    // Cancel order logic
}

8. Data synchronization: maintaining data consistency across systems

take

In a distributed system, multiple services depend on the same data source.

For example, the e-commerce platform needs to be synchronized to the caching system and the recommender system after the order status is updated.

Having each service pull data directly from the database would increase the pressure on the database and could also lead to latency or inconsistency issues.

prescription

Data synchronization using MQ. After the order service updates the status of the order, it sends the update to the MQ, and the caching and recommendation services consume the message from the MQ and synchronize the data.

sample code (computing)

Order services: producers

// Send the message to the MQ after updating the status of the order
Order order = updateOrderStatus(orderId, "PAID"); // update order status to paid
("order_exchange", "order_status_key", order); ("order_exchange", "order_status_key", order); // Update the order status to paid.
("The order status update message was sent: " + ());

Caching services: consumers

@RabbitListener(queues = "cache_update_queue")
public void updateCache(Order order) {
    ("Updating the cache, Order ID: " + () + " Status: " + ());
    // Update cache logic
    ((), ());
}

Recommended Services: Consumers

@RabbitListener(queues = "recommendation_queue")
public void updateRecommendation(Order order) {
    ("Updating the recommendation system, Order ID: " + () + " Status: " + ());
    // Update the recommendation service logic
    (order);
}

in-depth analysis

The benefits of data synchronization through MQ are:

  1. Reducing pressure on databases: Avoid multiple services querying the database at the same time.
  2. Final Consistency: Even if the processing of a service is delayed, MQ guarantees that messages are not lost and that the data state is ultimately consistent across all services.

9. Distributed task scheduling

take

Some tasks need to be performed on a regular basis, such as cleaning up expired orders in the early hours of the day.

These orders may be distributed across multiple services, and if each service runs a timed task independently, there may be issues with duplicate processing or missed tasks.

prescription

Use MQ to uniformly distribute and schedule tasks, and each service consumes tasks from MQ and executes them according to its own business requirements.

sample code (computing)

Task Scheduling Service: Producer

// Timed Task Generator
@Scheduled(cron = "0 0 0 * * ?") // Triggered at dawn every day
public void generateTasks() {
    List<Task> expiredTasks = ();
    for (Task task : expiredTasks) {
        ("task_exchange", "task_routing_key", task);
        ("Tasks sent:" + ());
    }
}

Order Service: Consumer

@RabbitListener(queues = "order_task_queue")
public void processOrderTask(Task task) {
    ("Processing order task:" + ());
    // Execute the order cleanup logic
    (task);
}

Inventory services: consumers

@RabbitListener(queues = "stock_task_queue")
public void processStockTask(Task task) {
    ("Processing stock task:" + ());
    // Execute the stock release logic
    (task);
}

in-depth analysis

Distributed task scheduling can be a solution:

  1. repeatable: Each service handles only the tasks in its own queue.
  2. omissions from the mandate: MQ ensures that tasks are delivered reliably and prevented from being lost.

10. Document processing: asynchronous execution of large document tasks

take

After a user uploads a large file, the file needs to be processed (e.g., format conversion, compression, etc.) and stored.

If these tasks are performed synchronously, the front-end page may keep loading, resulting in a poor user experience.

prescription

Immediately after the user uploads a file, the task is written to the MQ, the background processes the file asynchronously, and the user is notified or updated with the status when processing is complete.

sample code (computing)

Upload Service: Producer

// After uploading the file, write the task to the MQ
FileTask fileTask = new FileTask();
(fileId).
("COMPRESS");
("file_task_exchange", "file_task_key", fileTask);
("File processing task sent, file ID:" + fileId);

Document processing services: consumers

@RabbitListener(queues = "file_task_queue")
public void processFileTask(FileTask fileTask) {
    ("Processing file task: " + () + " Operation: " + ());
    // Simulate the file processing logic
    if ("COMPRESS".equals(())) {
        (())); } else if ("CONVERT".equals(())) {
    } else if ("CONVERT".equals(())) {
        (()); } else if ("CONVERT".equals(())) { (()))
    }
    // Update the task status
    ((), "COMPLETED"); }
}

Front-end polling or callback notifications

// Front-end polling for document processing status
setInterval(() => {
    fetch(`/file/status?fileId=${fileId}`)
        .then(response => ())
        .then(status => {
            if (status === "COMPLETED") {
                alert("File processing complete!") ;
            }
        });
}, 5000);

in-depth analysis

Advantages of asynchronous file processing:

  1. Enhancing the user experience: The main thread returns quickly, reducing user waiting time.
  2. Flexible expansion of back-office tasks: Supports multiple operating logics and adapts to complex document processing needs.

summarize

Message queues are not just a tool for delivering messages, but also a tool for decoupling the system and improving stability and scalability.

Each of these 10 classic scenarios addresses a specific business pain point.

I hope this article has helped you understand MQ application scenarios!

One final note (ask for attention, don't patronize me)

If this article is helpful to you, or inspired, help pay attention to my eponymous public number: Su San said technology, your support is my biggest motivation to keep writing.

Ask for a one-click trifecta: like, retweet, and watch at.

Concerned about the public number: [Su San said technology], in the public number reply: into the big factory, you can get free access to my recent organization of 100,000 words of the interview dictionary, a lot of partners rely on this dictionary to get a number of big factory offers.