HiJobQueue: a simple thread-safe task queue
Overview
HiJobQueue
is a thread-safe task queue used to manage and execute asynchronous tasks in a multi-threaded environment. Its design refers toJobQueue in Cobalt project, and made appropriate simplifications.HiJobQueue
Provides task push (push
), task popup (pop
), queue exit (quit
) and other functions, suitable for scenarios that require asynchronous task scheduling.
Core functions
-
Thread safety:
- use
std::mutex
andstd::condition_variable
Implement a thread-safe task queue.
- use
-
Task scheduling:
- Supports asynchronous push and pop-up of tasks.
-
Exit mechanism:
- supply
quit()
Method for safely stopping the task queue.
- supply
-
Cross-platform:
- Implemented using the C++ standard library and does not rely on platform-specific APIs.
Implement code
The following isHiJobQueue
The implementation code:
#pragma once
#include <mutex>
#include <functional>
#include <queue>
#include <condition_variable>
/**
* @brief Thread-safe task queue, used to manage and execute asynchronous tasks.
*/
class HiJobQueue final {
public:
using Job = std::function<void()>; // Task type
public:
HiJobQueue() : is_exit_(false) {}
/**
* @brief Push the task to the queue.
* @param job The task to be performed.
* @return If the queue has exited, return false; otherwise return true.
*/
bool push(Job job);
/**
* @brief Pop tasks from the queue.
* @param job is used to store pop-up tasks.
* @return If the queue is empty and has exited, returns false; otherwise returns true.
*/
bool pop(Job& job);
/**
* @brief Get the number of tasks in the queue.
* @return The number of tasks in the queue.
*/
size_t size();
/**
* @brief Exit the queue and stop task processing.
*/
void quit();
/**
* @brief Check if the queue has exited.
* @return true if the queue has exited; false otherwise.
*/
bool is_quited();
// Disable copy constructor and assignment operator
HiJobQueue(HiJobQueue&) = delete;
HiJobQueue(const HiJobQueue&) = delete;
private:
bool is_exit_; //Queue exit flag
std::mutex mutex_; // Mutex lock, protects queue access
std::condition_variable cond_; // Condition variable, used for task notification
std::queue<Job> queue_; // task queue
};
// accomplish
bool HiJobQueue::push(Job job) {
std::lock_guard<std::mutex> locker(mutex_);
if (is_exit_) {
return false;
}
queue_.push(std::move(job));
cond_.notify_one();
return true;
}
bool HiJobQueue::pop(Job& job) {
std::unique_lock<std::mutex> locker(mutex_);
cond_.wait(locker, [this]() { return is_exit_ || !queue_.empty(); });
if (is_exit_ && queue_.empty()) {
return false;
}
job = std::move(queue_.front());
queue_.pop();
return true;
}
size_t HiJobQueue::size() {
std::lock_guard<std::mutex> locker(mutex_);
return queue_.size();
}
void HiJobQueue::quit() {
std::lock_guard<std::mutex> locker(mutex_);
is_exit_ = true;
cond_.notify_all();
}
bool HiJobQueue::is_quited() {
std::lock_guard<std::mutex> locker(mutex_);
return is_exit_;
}
test case
To verifyHiJobQueue
For the correctness and thread safety, we designed the following test cases:
test code
#include <gtest/>
#include <future>
#include <atomic>
#include <thread>
#include <chrono>
#include "hi_job_queue.h"
class TestCls {
public:
void test(const char* text, int i) {
printf("%s-%d\n", text, i);
}
};
TEST(HiJobQueueTest, ConcurrentPushPop) {
HiJobQueue queue;
TestCls cls;
std::atomic<int> job_count{0}; // Used to count the number of tasks executed
//Start two thread consumption tasks
auto f1 = std::async(std::launch::async, [&] {
HiJobQueue::Job job;
while ((job)) {
job();
job_count++;
}
});
auto f2 = std::async(std::launch::async, [&] {
HiJobQueue::Job job;
while ((job)) {
job();
job_count++;
}
});
//Start two thread production tasks
auto f3 = std::async(std::launch::async, [&] {
for (int i = 0; i < 200; i++) {
(std::bind(&TestCls::test, &cls, "test1", i));
std::this_thread::sleep_for(std::chrono::milliseconds(5)); // Cross-platform sleep
}
});
auto f4 = std::async(std::launch::async, [&] {
for (int i = 0; i < 200; i++) {
(std::bind(&TestCls::test, &cls, "test2", i));
std::this_thread::sleep_for(std::chrono::milliseconds(5)); // Cross-platform sleep
}
});
// Wait for the production task to complete
();
();
//Exit the queue
();
// Wait for the consumption task to complete
();
();
// Verify that all tasks are executed
EXPECT_EQ(job_count.load(), 400); // 200 (test1) + 200 (test2)
}
TEST(HiJobQueueTest, QuitBehavior) {
HiJobQueue queue;
//Start a thread consumption task
auto consumer = std::async(std::launch::async, [&] {
HiJobQueue::Job job;
while ((job)) {
job();
}
});
// Push some tasks
for (int i = 0; i < 10; i++) {
([]() {});
}
//Exit the queue
();
// Wait for the consumer thread to end
();
// Verify that the queue has exited
EXPECT_TRUE(queue.is_quited());
// Tasks cannot be pushed after verification exits
EXPECT_FALSE(([]() {}));
}
TEST(HiJobQueueTest, EmptyQueueBehavior) {
HiJobQueue queue;
// Verify the pop behavior when the queue is empty
HiJobQueue::Job job;
EXPECT_FALSE((job));
//Exit the queue
();
// Pop behavior after verification exits
EXPECT_FALSE((job));
}
Test case description
-
ConcurrentPushPop
:- Testing in a multi-threaded environment
push
andpop
of concurrent behavior. - Verify that all tasks are performed correctly.
- Testing in a multi-threaded environment
-
QuitBehavior
:- Test the behavior when the queue exits.
- Verify that no new tasks will be accepted after exiting.
-
EmptyQueueBehavior
:- Test behavior when the queue is empty.
- After verification and exit
pop
behavior.
Applicable scenarios
HiJobQueue
Applicable to the following scenarios:
-
Multi-threaded task scheduling:
- In scenarios where tasks need to be distributed to multiple worker threads for execution,
HiJobQueue
Can be used as a task scheduler. - For example: task queue in thread pool.
- In scenarios where tasks need to be distributed to multiple worker threads for execution,
-
event driven architecture:
- In an event-driven system,
HiJobQueue
Can be used to store and process events. - For example: event queue in GUI application.
- In an event-driven system,
-
Asynchronous task processing:
- In scenarios where asynchronous execution of tasks is required,
HiJobQueue
Can be used to store tasks and be processed by background threads. - For example: asynchronous writing of the log system.
- In scenarios where asynchronous execution of tasks is required,
-
producer-consumer model:
- In the producer-consumer model,
HiJobQueue
Can be used as a shared task buffer. - For example: distribution of multi-threaded download tasks.
- In the producer-consumer model,
Advantages and Disadvantages Analysis
advantage
-
Thread safety:
- use
std::mutex
andstd::condition_variable
Ensure security in multi-threaded environments.
- use
-
Simple and easy to use:
- Provides a simple interface (
push
、pop
、quit
), easy to integrate into existing projects.
- Provides a simple interface (
-
Cross-platform:
- Based on the C++ standard library, it does not rely on platform-specific APIs and has good portability.
-
Exit mechanism:
- supply
quit()
method to safely stop the task queue and avoid resource leaks.
- supply
-
lightweight:
- The code is concise, the performance overhead is small, and it is suitable for scenarios with high performance requirements.
shortcoming
-
Single function:
- Only basic task queue functions are supported, and priority scheduling or task cancellation is not supported.
-
Performance bottleneck:
- In high concurrency scenarios,
std::mutex
May become a performance bottleneck. - If you need higher performance, you can consider lock-free queues (such as
boost::lockfree::queue
)。
- In high concurrency scenarios,
-
Task type restrictions:
- The task type is
std::function<void()>
, does not support return value or parameter passing. - If you need more complex task types, you need to expand it yourself.
- The task type is
-
Lack of task status management:
- Task status management (such as task completion notification or error handling) is not supported.
Summarize
HiJobQueue
is a simple but powerful thread-safe task queue suitable for asynchronous task scheduling in multi-threaded environments. by referenceJobQueue in Cobalt project, we implemented a more lightweight version and verified its correctness and thread safety through unit tests. Hope this article can help you understand and useHiJobQueue
。