Location>code7788 >text

HiJobQueue: a simple thread-safe task queue

Popularity:466 ℃/2025-01-23 06:54:26

HiJobQueue: a simple thread-safe task queue

Overview

HiJobQueueis a thread-safe task queue used to manage and execute asynchronous tasks in a multi-threaded environment. Its design refers toJobQueue in Cobalt project, and made appropriate simplifications.HiJobQueueProvides task push (push), task popup (pop), queue exit (quit) and other functions, suitable for scenarios that require asynchronous task scheduling.


Core functions

  1. Thread safety

    • usestd::mutexandstd::condition_variableImplement a thread-safe task queue.
  2. Task scheduling

    • Supports asynchronous push and pop-up of tasks.
  3. Exit mechanism

    • supplyquit()Method for safely stopping the task queue.
  4. Cross-platform

    • Implemented using the C++ standard library and does not rely on platform-specific APIs.

Implement code

The following isHiJobQueueThe implementation code:

#pragma once

 #include <mutex>
 #include <functional>
 #include <queue>
 #include <condition_variable>

 /**
  * @brief Thread-safe task queue, used to manage and execute asynchronous tasks.
  */
 class HiJobQueue final {
 public:
     using Job = std::function<void()>; // Task type

 public:
     HiJobQueue() : is_exit_(false) {}

     /**
      * @brief Push the task to the queue.
      * @param job The task to be performed.
      * @return If the queue has exited, return false; otherwise return true.
      */
     bool push(Job job);

     /**
      * @brief Pop tasks from the queue.
      * @param job is used to store pop-up tasks.
      * @return If the queue is empty and has exited, returns false; otherwise returns true.
      */
     bool pop(Job& job);

     /**
      * @brief Get the number of tasks in the queue.
      * @return The number of tasks in the queue.
      */
     size_t size();

     /**
      * @brief Exit the queue and stop task processing.
      */
     void quit();

     /**
      * @brief Check if the queue has exited.
      * @return true if the queue has exited; false otherwise.
      */
     bool is_quited();

     // Disable copy constructor and assignment operator
     HiJobQueue(HiJobQueue&) = delete;
     HiJobQueue(const HiJobQueue&) = delete;

 private:
     bool is_exit_; //Queue exit flag
     std::mutex mutex_; // Mutex lock, protects queue access
     std::condition_variable cond_; // Condition variable, used for task notification
     std::queue<Job> queue_; // task queue
 };

 // accomplish

 bool HiJobQueue::push(Job job) {
     std::lock_guard<std::mutex> locker(mutex_);
     if (is_exit_) {
         return false;
     }
     queue_.push(std::move(job));
     cond_.notify_one();
     return true;
 }

 bool HiJobQueue::pop(Job& job) {
     std::unique_lock<std::mutex> locker(mutex_);
     cond_.wait(locker, [this]() { return is_exit_ || !queue_.empty(); });
     if (is_exit_ && queue_.empty()) {
         return false;
     }
     job = std::move(queue_.front());
     queue_.pop();
     return true;
 }

 size_t HiJobQueue::size() {
     std::lock_guard<std::mutex> locker(mutex_);
     return queue_.size();
 }

 void HiJobQueue::quit() {
     std::lock_guard<std::mutex> locker(mutex_);
     is_exit_ = true;
     cond_.notify_all();
 }

 bool HiJobQueue::is_quited() {
     std::lock_guard<std::mutex> locker(mutex_);
     return is_exit_;
 }

test case

To verifyHiJobQueueFor the correctness and thread safety, we designed the following test cases:

test code

#include <gtest/>
 #include <future>
 #include <atomic>
 #include <thread>
 #include <chrono>
 #include "hi_job_queue.h"

 class TestCls {
 public:
     void test(const char* text, int i) {
         printf("%s-%d\n", text, i);
     }
 };

 TEST(HiJobQueueTest, ConcurrentPushPop) {
     HiJobQueue queue;
     TestCls cls;

     std::atomic<int> job_count{0}; // Used to count the number of tasks executed

     //Start two thread consumption tasks
     auto f1 = std::async(std::launch::async, [&] {
         HiJobQueue::Job job;
         while ((job)) {
             job();
             job_count++;
         }
     });

     auto f2 = std::async(std::launch::async, [&] {
         HiJobQueue::Job job;
         while ((job)) {
             job();
             job_count++;
         }
     });

     //Start two thread production tasks
     auto f3 = std::async(std::launch::async, [&] {
         for (int i = 0; i < 200; i++) {
             (std::bind(&TestCls::test, &cls, "test1", i));
             std::this_thread::sleep_for(std::chrono::milliseconds(5)); // Cross-platform sleep
         }
     });

     auto f4 = std::async(std::launch::async, [&] {
         for (int i = 0; i < 200; i++) {
             (std::bind(&TestCls::test, &cls, "test2", i));
             std::this_thread::sleep_for(std::chrono::milliseconds(5)); // Cross-platform sleep
         }
     });

     // Wait for the production task to complete
     ();
     ();

     //Exit the queue
     ();

     // Wait for the consumption task to complete
     ();
     ();

     // Verify that all tasks are executed
     EXPECT_EQ(job_count.load(), 400); // 200 (test1) + 200 (test2)
 }

 TEST(HiJobQueueTest, QuitBehavior) {
     HiJobQueue queue;

     //Start a thread consumption task
     auto consumer = std::async(std::launch::async, [&] {
         HiJobQueue::Job job;
         while ((job)) {
             job();
         }
     });

     // Push some tasks
     for (int i = 0; i < 10; i++) {
         ([]() {});
     }

     //Exit the queue
     ();

     // Wait for the consumer thread to end
     ();

     // Verify that the queue has exited
     EXPECT_TRUE(queue.is_quited());

     // Tasks cannot be pushed after verification exits
     EXPECT_FALSE(([]() {}));
 }

 TEST(HiJobQueueTest, EmptyQueueBehavior) {
     HiJobQueue queue;

     // Verify the pop behavior when the queue is empty
     HiJobQueue::Job job;
     EXPECT_FALSE((job));

     //Exit the queue
     ();

     // Pop behavior after verification exits
     EXPECT_FALSE((job));
 }

Test case description

  1. ConcurrentPushPop

    • Testing in a multi-threaded environmentpushandpopof concurrent behavior.
    • Verify that all tasks are performed correctly.
  2. QuitBehavior

    • Test the behavior when the queue exits.
    • Verify that no new tasks will be accepted after exiting.
  3. EmptyQueueBehavior

    • Test behavior when the queue is empty.
    • After verification and exitpopbehavior.

Applicable scenarios

HiJobQueueApplicable to the following scenarios:

  1. Multi-threaded task scheduling

    • In scenarios where tasks need to be distributed to multiple worker threads for execution,HiJobQueueCan be used as a task scheduler.
    • For example: task queue in thread pool.
  2. event driven architecture

    • In an event-driven system,HiJobQueueCan be used to store and process events.
    • For example: event queue in GUI application.
  3. Asynchronous task processing

    • In scenarios where asynchronous execution of tasks is required,HiJobQueueCan be used to store tasks and be processed by background threads.
    • For example: asynchronous writing of the log system.
  4. producer-consumer model

    • In the producer-consumer model,HiJobQueueCan be used as a shared task buffer.
    • For example: distribution of multi-threaded download tasks.

Advantages and Disadvantages Analysis

advantage

  1. Thread safety

    • usestd::mutexandstd::condition_variableEnsure security in multi-threaded environments.
  2. Simple and easy to use

    • Provides a simple interface (pushpopquit), easy to integrate into existing projects.
  3. Cross-platform

    • Based on the C++ standard library, it does not rely on platform-specific APIs and has good portability.
  4. Exit mechanism

    • supplyquit()method to safely stop the task queue and avoid resource leaks.
  5. lightweight

    • The code is concise, the performance overhead is small, and it is suitable for scenarios with high performance requirements.

shortcoming

  1. Single function

    • Only basic task queue functions are supported, and priority scheduling or task cancellation is not supported.
  2. Performance bottleneck

    • In high concurrency scenarios,std::mutexMay become a performance bottleneck.
    • If you need higher performance, you can consider lock-free queues (such asboost::lockfree::queue)。
  3. Task type restrictions

    • The task type isstd::function<void()>, does not support return value or parameter passing.
    • If you need more complex task types, you need to expand it yourself.
  4. Lack of task status management

    • Task status management (such as task completion notification or error handling) is not supported.

Summarize

HiJobQueueis a simple but powerful thread-safe task queue suitable for asynchronous task scheduling in multi-threaded environments. by referenceJobQueue in Cobalt project, we implemented a more lightweight version and verified its correctness and thread safety through unit tests. Hope this article can help you understand and useHiJobQueue