Location>code7788 >text

handwritten producer-consumer model

Popularity:289 ℃/2024-10-17 00:48:09

preamble

The producer-consumer pattern is a very classic multi-threaded concurrent collaboration pattern, to understand the producer-consumer problem can make us deepen our understanding of concurrent programming.

The so-called producer-consumer, in fact, contains two types of threads, one is the producer thread used to produce data, the other is the consumer thread used to consume data, in order to understand the relationship between the coupled producer and the consumer, usually use the shared data area, like a warehouse, the producer produces the data and then placed it directly in the shared data area, and doesn't need to care about the behavior of the consumer; and the consumer only needs to The consumer only needs to get the data from the shared data area and does not need to care about the behavior of the producer.

Such concurrent collaboration between threads should be available in this shared data area:

  1. Block the producer from continuing to produce data if the shared data area is full;
  2. Block the consumer from continuing to consume data if the shared data area is empty;

There are three ways in which the producer-consumer problem can be implemented:

  1. utilizationBlockingQueue realization
  2. utilizationsynchronizedand the Object wait/notify message notification mechanism;
  3. Using LockCondition The await/signal message notification mechanism;

BlockingQueue implements producer-consumer.

BlockingQueue provides blockable methods for insertion and removal. When the queue container is full, the producer thread is blocked until the queue is not full; when the queue container is empty, the consumer thread is blocked until the queue is non-empty.

With this queue, the producer will only need to focus on production, and do not have to care about the consumer's consumption behavior, not to mention waiting for the consumer thread to finish executing; consumers also only care about consumption, do not have to care about how the producer is produced, and do not have to wait for the producer to produce.

public class ProductorConsumer {

    private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        ExecutorService service = (15);
        for (int i = 0; i < 5; i++) {
            (new Productor(queue));
        }
        for (int i = 0; i < 10; i++) {
            (new Consumer(queue));
        }
    }


    static class Productor implements Runnable {

        private BlockingQueue queue;

        public Productor(BlockingQueue queue) {
             = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Random random = new Random();
                    int i = ();
                    ("autotroph (original producers in a food chain)" + ().getName() + "Production data" + i);
                    (i);
                }
            } catch (InterruptedException e) {
                ();
            }
        }
    }

    static class Consumer implements Runnable {
        private BlockingQueue queue;

        public Consumer(BlockingQueue queue) {
             = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Integer element = (Integer) ();
                    ("consumers" + ().getName() + "Ongoing consumption data" + element);
                }
            } catch (InterruptedException e) {
                ();
            }
        }
    }

}

synchronized implements producer-consumer

This is actually a manual implementation of a blocking queue

import ;
import ;
import ;

public class MyBlockingQueue {

    //Queue
    private final Queue<String> myQueue = new LinkedList<>();

    // maximum length
    private static final int MAXSIZE = 20;; //Maximum Length
    private static final int MINSIZE = 0;

    // Get the length of the queue
    public int getSize() {
        return (); // Get the length of the queue.
    }

    //producer
    public void push(String str) throws Exception {
        // Get the object lock
        synchronized (myQueue) {

            //Block if the queue is full
            while (getSize() == MAXSIZE) {
                ();
            }

            (str);
            (().getName() + "Place in element " + str);

            // wake up the consumer thread and the consumer and producer compete for the lock themselves
            ();
        }
    }

    //consumer
    public String pop() throws Exception {
        synchronized (myQueue) {
            String result = null;

            //Block if queue is empty
            while (getSize() == MINSIZE) {
                String result = null; // block while (getSize() == MINSIZE) {
            }
            //first in, first out
            result = ();

            (().getName() + "Element taken out" + result);
            // Wake up the producer thread and the consumer and producer compete for the lock themselves
            ();

            return result.
        }
    }

    public static void main(String args[]) {

        MyBlockingQueue myBlockingQueue = new MyBlockingQueue();

        // Both threads, both executed and finished printing
        CyclicBarrier barrier = new CyclicBarrier(2, () -> {
            ("Production is over, off duty, consumers come back tomorrow!") ;
        });

        // Producer thread
        new Thread(() -> {
            // Fifty hard-working producers looping to add elements to the queue
            try {
                for (int i = 0; i < 50; i++) {
                    ("--" + i);
                }
                // After the production is done
                ();
            } catch (Exception e) {
                ();
            }
        }, "producer").start(); }

        // Consumer thread
        new Thread(() -> {
            // 50 consumers with white takes frantically fetching elements into the queue
            try {
                for (int j = 0; j < 50; j++) {
                    ();
                }
                // after consuming
                ();
            } catch (Exception e) {
                (); }
            }
        }, "Consumer").start();

    }
}

Condition Realization of producer-consumer

public class BoundedQueue {

    /**
     * producer container
     */
    private LinkedList<Object> buffer;
    /**
     * //What is the maximum container value
     */
    private int maxSize;
    
    private Lock lock;
    
    /**
     * full
     */
    private Condition fullCondition;
    
    /**
     * grievances
     */
    private Condition notFullCondition;

    BoundedQueue(int maxSize) {
         = maxSize;
        buffer = new LinkedList<Object>();
        lock = new ReentrantLock();
        fullCondition = ();
        notFullCondition = ();
    }

    /**
     * autotroph (original producers in a food chain)
     *
     * @param obj
     * @throws InterruptedException
     */
    public void put(Object obj) throws InterruptedException {
        //Acquisition Lock
        ();
        try {
            while (maxSize == ()) {
                //full,Added threads go into a wait state
                ();
            }
            (obj);
            //notifications
            ();
        } finally {
            ();
        }
    }

    /**
     * consumers
     *
     * @return
     * @throws InterruptedException
     */
    public Object get() throws InterruptedException {
        Object obj;
        ();
        try {
            while (() == 0) {
                //There's no more data in the queue. The thread enters the wait state
                ();
            }
            obj = ();
            //notifications
            ();
        } finally {
            ();
        }
        return obj;
    }

}

Application Scenarios of the Producer-Consumer Pattern

The producer-consumer model is generally used to decouple the process of producing data from the process of consuming it by splitting the party that produces the data from the party that consumes it.

Excutor Task Execution Framework

By decoupling task submission and task execution, the operation submitting the task is equivalent to a producer and the operation executing the task is equivalent to a consumer.

For example, using Excutor to build a web server for processing threaded requests: the producer submits the task to the thread pool, which creates threads to process the task, and if the number of tasks that need to be run is greater than the number of threads in the thread pool's base threads, then the task is thrown to the blocking queue (the way through the thread pool+blocking queue is a lot more efficient than just using a blocking queue, because the consumers are able to handle it directly, without each consumer having to take the task out of the blocking queue first before executing it). (the thread pool + blocking queue approach is much more efficient than using only one blocking queue, because consumers can handle the task directly, without each consumer having to take the task out of the blocking queue before executing it)

Message Middleware MQ

Double eleven, will produce a large number of orders, then it is impossible to handle so many orders at the same time, you need to put the order into a queue inside, and then by a dedicated thread to deal with orders.

Here the user order is the producer, the thread to process the order is the consumer; another example is the 12306 ticket function, first by a container to store the order submitted by the user, and then by the thread dedicated to processing the order slowly processed, so that it can be in a short period of time to support high concurrency service.

The case of tasks with longer processing times

For example, uploading attachments and processing, then this time the user can be uploaded and processing attachments into two processes, using a queue to temporarily store the user's uploaded attachments, and then immediately return to the user uploaded successfully, and then there is a dedicated thread to deal with the attachments in the queue.

Advantages of Producer-Consumer Pattern:

  • Decoupling: decoupling producer and consumer classes to eliminate dependencies between code and simplify workload management
  • Reuse: independent reuse and extension of producer and consumer classes by separating them from each other
  • Adjusting concurrency: Since the processing speeds of the producer and the consumer are not the same, the concurrency can be adjusted to give more concurrency to the slower party to increase the processing speed of the task.
  • Asynchronous: for the producer and consumer can each perform their own duties, the producer only needs to care about whether there is still data in the buffer, do not need to wait for the consumer to finish processing; for the consumer, you only need to pay attention to the contents of the buffer, do not need to pay attention to the producer, through the asynchronous way to support high-concurrency, a time-consuming process is split into two phases of production and consumption, so that the producer because of the execution of the time of put The producer can support high concurrency because the time it takes to perform put is shorter.
  • Distributed support: producers and consumers communicate through queues, so they do not need to run on the same machine, in a distributed environment can be used as a queue through the list of redis, and consumers only need to poll the queue for data. It also supports cluster scalability, so that when a machine goes down, it doesn't cause the whole cluster to go down.

Interview questions column

Interview questions columnIt's online, so feel free to visit.

  • If you don't know how to write a resume, resume projects don't know how to package them;
  • If there's something on your resume that you're not sure if you should put on it or not;
  • If there are some comprehensive questions you don't know how to answer;

Then feel free to private message me and I will help you in any way I can.

About the Author.

From the first-line programmer Seven's exploration and practice, continuous learning iteration in the~

This article is included in my personal blog:https://

Public number: seven97, welcome to follow~