- preamble
- Applicable Scenarios
- Functional Description
- usage example
-
BufferQueue Internal Design Overview
- Segregation of Topics
- Partition design
- Concurrency support
- Dynamic Scaling of Partitions
- Segment Recycling Mechanism
-
Benchmark
- Write Performance Testing
- Consumer Performance Testing
preamble
BufferQueue is a high-performance buffer queue implementation written in .NET that supports multi-threaded concurrent operations.
The project was started frommocha A separate component of the project, modified to provide more generalized buffer queue functionality.
The currently supported buffer types are memory buffers, and more types of buffers will be considered for support in the future.
Applicable Scenarios
Scenarios where the speed between producer and consumer is inconsistent and concurrent batch processing of data is required.
Functional Description
- Supports creating multiple Topics, each Topic can have multiple data types. Each pair of Topic and data type corresponds to an independent buffer.
-
Supports creating multiple Consumer Groups, the consumption progress of each Consumer Group is independent. Supports multiple Consumer Groups consuming the same Topic concurrently.
-
Supports the creation of multiple Consumers in the same Consumer Group to consume data in a load-balanced manner.
-
Supports batch consumption of data, you can get multiple pieces of data at once.
-
Supports two consumption modes: pull mode and push mode.
-
Both pull mode and push mode support auto commit and manual commit. auto commit mode, the consumer automatically commits the consumption progress after receiving the data, and will not retry if the consumption fails. manual commit mode, the consumer needs to manually commit the consumption progress, and can retry if the consumption fails as long as it does not commit the progress. In manual commit mode, the consumer needs to manually commit the consumption progress, and if the consumption fails, it can retry as long as it does not commit the progress.
Note that the current version does not support dynamic expansion and contraction of consumers for the sake of simplifying the implementation, and you need to specify the number of consumers when creating a consumer.
usage example
Install the Nuget package:
dotnet add package BufferQueue
The project is based on , you need to register the service first.
BufferQueue supports two consumption modes: pull mode and push mode.
(options =>
{
(bufferOptions =>
{
// Each pair of Topics and data types has a separate buffer, which can be set to partitionNumber
<Foo>("topic-foo1", partitionNumber: 6);
<Foo>("topic-foo2", partitionNumber: 4).
})
// Add push mode consumers
// Scan the classes in the specified assembly for classes tagged with BufferPushCustomerAttribute.
// Register as a push mode consumer
.AddPushCustomers(typeof(Program).Assembly); }); // Add push mode consumers.
}).
// Consuming data in the HostedService using the pull pattern
<Foo1PullConsumerHostService>(); // Use pull mode in the HostedService to consume data.
Example of a consumer in pull mode:
public class Foo1PullConsumerHostService(
IBufferQueue bufferQueue,
ILogger<Foo1PullConsumerHostService> logger) : IHostedService
{
private readonly CancellationTokenSource _cancellationTokenSource = new();
public Task StartAsync(CancellationToken cancellationToken)
{
var token = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token)
.Token;
var consumers = <Foo>(
new BufferPullConsumerOptions
{
TopicName = "topic-foo1", GroupName = "group-foo1", AutoCommit = true, BatchSize = 100,
}, consumerNumber: 4);
foreach (var consumer in consumers)
{
_ = ConsumeAsync(consumer, token);
}
return ;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource.Cancel();
return ;
}
private async Task ConsumeAsync(IBufferPullConsumer<Foo> consumer, CancellationToken cancellationToken)
{
await foreach (var buffer in (cancellationToken))
{
foreach (var foo in buffer)
{
// Process the foo
(": {Foo}", foo);
}
}
}
}
Example of a consumer in push mode:
Register push mode consumers with the BufferPushCustomer feature.
The push consumer is registered with the DI container and can be injected into other services via the constructor. The consumer's lifecycle can be controlled by setting the ServiceLifetime.
The concurrency parameter in BufferPushCustomerAttribute is used to set the concurrency of the push consumer, which corresponds to the consumerNumber of the pull consumer.
[BufferPushCustomer(
topicName: "topic-foo2",
groupName: "group-foo2",
batchSize: 100,
serviceLifetime: ,
concurrency: 2)]
public class Foo2PushConsumer(ILogger<Foo2PushConsumer> logger) : IBufferAutoCommitPushConsumer<Foo>
{
public Task ConsumeAsync(IEnumerable<Foo> buffer, CancellationToken cancellationToken)
{
foreach (var foo in buffer)
{
(": {Foo}", foo);
}
return ;
}
}
[BufferPushCustomer(
"topic-bar",
"group-bar",
100,
,
2)]
public class BarPushConsumer(ILogger<BarPushConsumer> logger) : IBufferManualCommitPushConsumer<Bar>
{
public async Task ConsumeAsync(IEnumerable<Bar> buffer, IBufferConsumerCommitter committer,
CancellationToken cancellationToken)
{
foreach (var bar in buffer)
{
(": {Bar}", bar);
}
var commitTask = ();
if (!)
{
await ();
}
}
}
BufferQueue Internal Design Overview
Segregation of Topics
BufferQueue has the following characteristics:
-
BufferQueues of different Topics of the same data type do not interfere with each other.
-
BufferQueues of different data types under the same Topic do not interfere with each other.
This feature is realized through the following two-layer interface design:
-
IBufferQueue: according to theTopicName cap (a poem)Type parameter T Forwards the request to a specific IBufferQueue<T> implementation (implemented with the help of KeyedService), where the parameter T represents the type of data entity carried by the Buffer.
-
IBufferQueue<T>: specific BufferQueue implementation, responsible for managing the data under Topic. It belongs to the internal implementation of Buffer module and is not exposed to the public.
Partition design
To ensure the speed of consumption, BufferQueue divides the data into multiple Partitions, each of which is a separate queue, and each of which has a corresponding consumer thread.
The Producer writes data to each Partition in a polled fashion.
Consumers are not allowed to exceed the maximum number of Partitions, and Partitions are distributed equally to each Customer in the group.
When a Consumer is assigned more than one Partition, it is consumed in a round robin fashion.
The consumption progress of different consumption groups is recorded on each Partition, and the consumption progress of different groups does not interfere with each other.
Concurrency support
Producer supports concurrent writes.
Consumer consumption is bound to the Partition, in order to ensure the correct management of Partition consumption progress, Consumer does not support concurrent consumption.
To increase the rate of consumption, create multiple Consumers.
Dynamic Scaling of Partitions
The basic unit of a Partition is a Segment, which represents an array of data, and multiple Segments are combined into a Partition in the form of a linked list.
When a Segment is full, it is expanded by appending a Segment after it.
Each element of the array used to store data in a Segment is called a Slot, and each Slot has a unique self-incrementing Offset within the Partition.
Segment Recycling Mechanism
Each time a new Segment is added to a Partition, it will determine from the beginning whether the previous Segment has been consumed by all consumer groups, and recycle the last consumed Segment as a new Segment to be appended to the end of the Partition.
Benchmark
Test environment: Apple M2 Max 64GB
Write Performance Testing
Compare concurrency with BlockingCollection, where the number of concurrent threads is the number of logical CPU cores 12, and the partitionNumber is 1 and 12.
Test results
BufferQueue's write performance is significantly better than BlockingCollection when writing concurrently.
Consumer Performance Testing
pull mode consumer vs. BlockingCollection for concurrent read performance, with concurrent threads of 12 CPU logical cores and partitionNumber 12.
Test results
When consuming in batches, the consumption performance advantage of BufferQueue becomes more obvious as the batch size increases.