Introduction to Redis Stream
Redis Stream is a new Redis data type released with version 5.0:
Efficient Consumer Group: Allows multiple consumer groups to consume data from different parts of the same data stream, with each consumer group processing messages independently, which allows for parallel processing and efficiency.
blocking operation: Consumers can set up blocking operations so that they are woken up and start processing when new data is added to the stream, which helps to reduce resource consumption and improve responsiveness.
Data persistence: It can persist data into memory (which is written to a storage device when configured for local persistence) for storage, waiting to be consumed.
many producers, many consumers: Redis Streams can create a data channel between multiple producers and consumers, making the flow and processing of data more flexible.
Scalability and asynchronous communication: The number of consumers can be easily scaled by the user through the application and the communication between producers and consumers can be asynchronous, which helps to improve the overall performance of the system.
Meeting diverse needs: Redis Streams meets the needs of everything from real-time data processing to historical data access, while remaining easy to manage.
What Redis Stream can do
message queue: Redis Stream can be used as a reliable message queuing system that supports a publish/subscribe model where producers and consumers can send and receive messages asynchronously.
task scheduling: Redis Stream can be used to implement a distributed task scheduling system that distributes tasks to multiple consumers for processing, thereby increasing processing speed and system scalability.
event-driven architecture: Redis Stream can be used as a core component in an event-driven architecture for processing events from different services, enabling decoupling and flexibility.
Introduction to FreeRedis
FreeRedis is named from "free", "free", it and the name and FreeSql is a concept, simplicity is their consistent pursuit of the direction , the minimum support for . NET Framework 4.0 and Redis-server 7.2.
github MIT Open Source Protocol
Author's Blog Park Address
Official Introduction
NET-based Redis client supporting .NET Core 2.1+, .NET Framework 4.0+, Xamarin, and AOT.
- 🌈 All method names are consistent with redis-cli
- 🌌 Support for Redis clustering (server-side requirements 3.2 and above)
- ⛳ Support for Redis Sentinel Mode
- 🎣 Support for Master-Slave separation (Master-Slave)
- 📡 Support for Publishing Subscriptions (Pub-Sub)
- 📃 Support for Redis Lua Scripts
- 💻 Support for pipelines (Pipeline)
- 📰 Support Services
- 🌴 Support for GEO commands (server-side requirements 3.2 and above)
- 🌲 Support for STREAM-type commands (server-side requires version 5.0 and above)
- ⚡ Support for local caching (Client-side-caching, server-side requires version 6.0 and above)
- 🌳 Support for the RESP3 protocol for Redis 6
desired functionality
1. Producer production data
2. Post-consumer consumption data confirmation
3. Non-confirmation after consumer consumption data
4、Message monitoring for messages that have been consumed but not confirmed by timeout
5, has been consumed but the timeout is not confirmed message secondary consumption
Project dependencies
WPF
FreeRedis
NLog
redis-windows-7.2.5
Business Scenario Code
Redis commands involved
Creating Consumer GroupsXGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
Query consumer group informationXINFO STREAM key [FULL [COUNT count]]
Number of message queues (length)XLEN key
Add a message to the end of the queueXADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]
Consumer group members read messagesXREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
acknowledgement messageXACK key group id [id ...]
Delete MessageXDEL key id [id ...]
Get information about the queue that consumes unacknowledged messagesXPENDING key group [[IDLE min-idle-time] start end count [consumer]]
Transferring Unconsumed Message Consumers to the Current Consumer NameXAUTOCLAIM key group consumer min-idle-time start [COUNT count] [JUSTID]
coding
public partial class App : Application
{
public static Logger Logger = ();
public static RedisClient RedisHelper;
public static MainViewModel MainViewModel;
private void App_OnStartup(object sender, StartupEventArgs e)
{
+= Current_DispatcherUnhandledException;
try
{
//redis6The above version enables theACLUser management mechanism,The default username isdefault,Password can be ignored
RedisHelper = new RedisClient("127.0.0.1:6379,user=defualt,defaultDatabase=13");
= ;//serialize
= ;//反serialize
+= (s, ee) => (); //Printing the Command Log
MainViewModel = new MainViewModel();
}
catch (Exception exception)
{
();
(-100);
}
}
private void Current_DispatcherUnhandledException(object sender, e)
{
= true;
($"uncaught error:source (of information etc):{sender},incorrect:{e}");
}
private void App_OnExit(object sender, ExitEventArgs e)
{
();
}
}
Key elements of the
<StackPanel>
<WrapPanel ItemHeight="40" Margin="10,10,0,0" VerticalAlignment="Center">
<TextBlock Text="Number of production data:" VerticalAlignment="Center"></TextBlock>
<TextBox Text="{Binding RecordCount}" Width="100" MaxLength="9" VerticalAlignment="Center"></TextBox>
<TextBlock Text="Number of producers:" Margin="10,0,0,0" VerticalAlignment="Center"></TextBlock>
<TextBox Text="{Binding TaskCount}" Width="100" MaxLength="6" VerticalAlignment="Center"></TextBox>
<TextBlock Margin="10,0,0,0" VerticalAlignment="Center">
<Run Text="In the process of generating the first:"></Run>
<Run Text="{Binding ProducerIndex}"></Run>
<Run Text="data entry"></Run>
</TextBlock>
<Button Content="Generate data" HorizontalAlignment="Left" Margin="10,0,0,0" VerticalAlignment="Center" Width="103" Command="{Binding ProducerRelayCommand}" />
</WrapPanel>
<WrapPanel ItemHeight="40" Margin="0,10,0,0" VerticalAlignment="Center">
<TextBlock Text="Number of consumers:" Margin="10,0,0,0" VerticalAlignment="Center"></TextBlock>
<TextBox Text="{Binding ConsumerCount}" Width="100" MaxLength="6" VerticalAlignment="Center"></TextBox>
<TextBlock Margin="10,0,0,0" VerticalAlignment="Center">
<Run Text="Remaining unconsumed:"></Run>
<Run Text="{Binding ConsumeIndex}"></Run>
<Run Text="data entry。"></Run>
</TextBlock>
<CheckBox IsChecked="{Binding IsAutoAck,Mode=TwoWay}" Content="autoconfirmation" VerticalAlignment="Center" ></CheckBox>
<Button Content="Starting to consume" HorizontalAlignment="Left" Margin="10,0,0,0" VerticalAlignment="Center" Width="103" Command="{Binding ConsumeRelayCommand}" />
<Button Content="Consumption Unconfirmed Consumption Queue" HorizontalAlignment="Left" Margin="10,0,0,0" VerticalAlignment="Center" Width="103" Command="{Binding PendingRelayCommand}" />
</WrapPanel>
<WrapPanel>
<StackPanel>
<TextBlock Margin="10,0,0,0" Text="Queue Information:" VerticalAlignment="Center"></TextBlock>
<TextBox Margin="10,0,0,0" VerticalAlignment="Center" Height="310" Width="250" Text="{Binding StreamInfo}" VerticalScrollBarVisibility="Auto"></TextBox>
</StackPanel>
<StackPanel>
<TextBlock Margin="13,0,0,0" Text="Consumer Information:" VerticalAlignment="Center"></TextBlock>
<TextBox Margin="13,0,0,0" VerticalAlignment="Center" Height="310" Width="250" Text="{Binding ConsumeInfo}" VerticalScrollBarVisibility="Auto" TextWrapping="WrapWithOverflow"></TextBox>
</StackPanel>
<StackPanel>
<TextBlock Margin="13,0,0,0" Text="未确认Consumer Information:" VerticalAlignment="Center"></TextBlock>
<TextBox Margin="13,0,0,0" VerticalAlignment="Center" Height="310" Width="250" Text="{Binding PendingInfo}" VerticalScrollBarVisibility="Auto" TextWrapping="WrapWithOverflow"></TextBox>
</StackPanel>
<StackPanel>
<TextBlock Margin="13,0,0,0" Text="Consumption of unconfirmed information:" VerticalAlignment="Center"></TextBlock>
<TextBox Margin="13,0,0,0" VerticalAlignment="Center" Height="310" Width="250" Text="{Binding PendingConsumeInfo}" VerticalScrollBarVisibility="Visible" HorizontalScrollBarVisibility="Auto" TextWrapping="WrapWithOverflow"></TextBox>
</StackPanel>
</WrapPanel>
</StackPanel>
</Grid>
public partial class MainWindow : Window
{
public MainWindow()
{
InitializeComponent();
DataContext = ;
}
private void MainWindow_OnLoaded(object sender, RoutedEventArgs e)
{
= 1000;
= 5;
= 1;
= "Waiting for consumer information……";
= "loading……";
= "loading……";
}
}
MainViewModel declarations and variable definitions
public class MainViewModel : ObservableObject
{
#region Variable Definition
private readonly string _streamKey = "redisstream";
private readonly string _consumeGroupName = "counsumeGroup";
private DateTime _utcTime = new DateTime(1970, 1, 1, 0, 0, 0);
/// <summary>
/// Number of messages generated
/// </summary>
private static int _exchangeValue;
/// <summary>
/// Remaining unconsumed articles
/// </summary>
private static int _consumeValue;
/// <summary>
/// Consumer Information Display Queue
/// </summary>
private static ConcurrentQueue<string> _consumedQueue = new ConcurrentQueue<string>();
/// <summary>
/// Consumption Unconfirmed Display Queue
/// </summary>
private static ConcurrentQueue<string> _pendingConsumedQueue = new ConcurrentQueue<string>();
/// <summary>
/// exit token
/// </summary>
private CancellationTokenSource _cancellationTokenSource;
/// <summary>
/// Generate Message
/// </summary>
public RelayCommand ProducerRelayCommand { get; }
/// <summary>
/// consumer goods
/// </summary>
public RelayCommand ConsumeRelayCommand { get; }
/// <summary>
/// Consumption Unconfirmed Message Queue Consumption
/// </summary>
public RelayCommand PendingRelayCommand { get; }
private int _recordCount;
/// <summary>
/// data entry
/// </summary>
public int RecordCount
{
get => _recordCount;
set => SetProperty(ref _recordCount, value);
}
private int _taskCount;
/// <summary>
/// Turn on the number of backend producers
/// </summary>
public int TaskCount
{
get => _taskCount;
set => SetProperty(ref _taskCount, value);
}
private int _consumerCount;
/// <summary>
/// Number of consumers
/// </summary>
public int ConsumerCount
{
get => _consumerCount;
set => SetProperty(ref _consumerCount, value);
}
private int _producerIndex;
/// <summary>
/// Serial number in production
/// </summary>
public int ProducerIndex
{
get => (ref _producerIndex, _exchangeValue);
set
{
SetProperty(ref _producerIndex, _exchangeValue);
}
}
private long _consumeIndex;
/// <summary>
/// Serial number being consumed
/// </summary>
public long ConsumeIndex
{
get => (ref _consumeIndex);
set => SetProperty(ref _consumeIndex, value);
}
private string _streamInfo;
/// <summary>
/// Queue Information Display
/// </summary>
public string StreamInfo
{
get => _streamInfo;
set => SetProperty(ref _streamInfo, value);
}
private string _consumeInfo;
/// <summary>
/// Consumer Information Display
/// </summary>
public string ConsumeInfo
{
get => _consumeInfo;
set
{
value = $"暂无consumer goods[{:yyyy-MM-dd HH:mm:ss}]";
if (_consumedQueue.TryDequeue(out var message))
{
value = _consumeInfo + + message;
}
SetProperty(ref _consumeInfo, value);
}
}
private string _pendingInfo;
/// <summary>
/// 消费未确认Queue Information Display
/// </summary>
public string PendingInfo
{
get => _pendingInfo;
set => SetProperty(ref _pendingInfo, value);
}
private string _pedingConsumeInfo;
/// <summary>
/// Consumption of unconfirmed queue displays
/// </summary>
public string PendingConsumeInfo
{
get => _pedingConsumeInfo;
set
{
value = $"No unconfirmed consumption information at this time[{:yyyy-MM-dd HH:mm:ss}]";
if (_pendingConsumedQueue.TryDequeue(out var message))
{
value = _pedingConsumeInfo + + message;
}
SetProperty(ref _pedingConsumeInfo, value);
}
}
private bool _isProduceCanExec;
/// <summary>
/// Whether the generation task can be executed
/// </summary>
public bool IsProduceCanExec
{
get => _isProduceCanExec;
set
{
SetProperty(ref _isProduceCanExec, value);
();
}
}
private bool _isAutoAck;
/// <summary>
/// Whether to automatically confirm consumption information
/// </summary>
public bool IsAutoAck
{
get => _isAutoAck;
set
{
SetProperty(ref _isAutoAck, value);
();
}
}
#endregion
public MainViewModel()
{
ProducerRelayCommand = new RelayCommand(async () => await DoProduce(), () => !_isProduceCanExec);
ConsumeRelayCommand = new RelayCommand(async () => await DoConsume());
PendingRelayCommand = new RelayCommand(async () => await DoPendingConsume());
_cancellationTokenSource = new CancellationTokenSource();
var exist = (_streamKey);
if (!exist)
{
//Creating Consumer Groups,There can be multiple consumers in the same consumer group,They directly do not read the same message over and over again
(_streamKey, _consumeGroupName, MkStream: true);
}
else
{
var groups = (_streamKey);
if (groups == null || !())
{
(_streamKey, _consumeGroupName);
}
}
ConsumeIndex = (_streamKey);
DoLoadStreamInfo();
DoLoadPendingInfo();
}
}
message generator
private async Task DoProduce()
{
if (IsProduceCanExec)
{
return;
}
IsProduceCanExec = true;
if (TaskCount > RecordCount)
{
TaskCount = RecordCount;
}
_exchangeValue = 0;
ProducerIndex = 0;
if (TaskCount > 0)
{
var pageSize = RecordCount / TaskCount;
var tasks = (1, TaskCount).Select(x =>
{
return (() =>
{
var internalPageSize = pageSize;
if (TaskCount > 1 && x == TaskCount)
{
if (x * internalPageSize < RecordCount)
{
internalPageSize = RecordCount - (TaskCount - 1) * internalPageSize;
}
}
for (var i = 1; i <= internalPageSize; i++)
{
ProducerIndex = (ref _exchangeValue);
ConsumeIndex = (ref _consumeValue);
var dic = new Dictionary<string, MessageModel> { { $"user_{x}", new MessageModel { Age = 16, Description = $"descriptive:{ProducerIndex}", Id = 1, Name = "wang", Status = 1 } } };
(_streamKey, 0, "*", dic);
}
return ;
});
});
await (tasks);
}
IsProduceCanExec = false;
}
#endregion
message consumer
private Task DoConsume()
{
var groups = (_streamKey);
if (groups == null || !())
{
(_streamKey, _consumeGroupName);
}
//Add Consumer
var tasks = (1, _consumerCount).Select(c =>
{
var task = (async () =>
{
//Reading a message from a consumer group,Members in the same group do not get the same message repeatedly。
var streamRead = (_consumeGroupName, $"consumer{c}", 0, _streamKey, ">");
if (streamRead != null)
{
//get a message
var id = ;
var model = new Dictionary<string, MessageModel>(1)
{
{ [0].ToString(), <MessageModel>([1].ToString()) }
};
_consumedQueue.Enqueue($"consumer{c}Got the message.{id},{:yyyy-MM-dd HH:mm:ss}");
ConsumeInfo = "";
await (100);//Simulate business logic time consuming
if (IsAutoAck)
{
//ACK
var success = (_streamKey, _consumeGroupName, id);
if (success > 0)
{
//xdel
(_streamKey, id);
_consumedQueue.Enqueue($"consumer{c}Successfully consumed the message{id},{:yyyy-MM-dd HH:mm:ss}");
}
else
{
_consumedQueue.Enqueue($"consumer{c}messages{id}Added to the unconfirmed queue,{:yyyy-MM-dd HH:mm:ss}");
}
ConsumeInfo = "";
}
}
});
return task;
});
(tasks);
return ;
}
#endregion
Unconsumed message queue monitoring
private Task DoLoadStreamInfo()
{
(async () =>
{
while (! _cancellationTokenSource.IsCancellationRequested)
{
StreamInfo = "Querying queue information ......";
try
{
if (! (_streamKey))
{
StreamInfo = "Queue has not been created yet"; await (3000); {
await (3000); continue; { if (!
continue; }
}
var info = (_streamKey);
StreamInfo = info?.first_entry == null ? "Queue data is empty." : $"Queue length: {}{}First number: {info.first_entry.id}{}Last number: {info.last_entry.id}{}Update time: {:yyyy-MM-dd HH:mm:ss}";
ConsumeIndex = info?.first_entry == null ? 0 : (int);
}
catch (Exception e)
{
StreamInfo = $"Failed to get queue info: {}";
}
await (3000); }
}
}, _cancellationTokenSource.Token, , ); }
return ;
}
#endregion
Display of message queue messages that were not consumed successfully (timeout or business logic execution failure)
private Task DoLoadPendingInfo()
{
(async () =>
{
while (! _cancellationTokenSource.IsCancellationRequested)
{
PendingInfo = "Querying the unconfirmed queue at ......";
if (! (_streamKey))
{
PendingInfo = "Queue has not been created yet"; await (3000); { PendingInfo = "Queue has not been created yet"; if (!
await (3000); continue; { PendingInfo = "Queue has not been created yet"; await (3000)
continue; }
}
continue
} try
var info = (_streamKey, _consumeGroupName); if (info == null || == 0); } try {
if (info == null || == 0)
{
PendingInfo = $"No unconfirmed information. [{:yyyy-MM-dd HH:mm:ss}]";"; await (3000); }
await (3000);
continue; }
}
var infoTxt = $"Unconsumed number: {}{}Involving {}consumers {}Minimum number: {}{}Maximum number: {}{}Update time: [{:yyyy-MM-dd HH:mm:ss}]"; await (3000); continue
PendingInfo = infoTxt;
}
catch (Exception e)
{
PendingInfo = $"Failed to get queue information: {}";
}
await (3000); }
}
}, _cancellationTokenSource.Token); }
return ;
}
#endregion
Unconsumed messages are reconsumed
private Task DoPendingConsume()
{
(async () =>
{
while (!_cancellationTokenSource.IsCancellationRequested)
{
_pendingConsumedQueue.Enqueue($"Querying unconfirmed queue……[{:yyyy-MM-dd HH:mm:ss}]");
PendingConsumeInfo = "";
if (!(_streamKey))
{
_pendingConsumedQueue.Enqueue($"Queue not yet created……[{:yyyy-MM-dd HH:mm:ss}]");
PendingConsumeInfo = "";
await (3000);
continue;
}
try
{
//through (a gap)streamThe head of the queue(0-0location)gain2An article that has been read for longer than2Minute and unconfirmed message,Modify the owner topendingUserRe-consumption and confirmation。
var info = (_streamKey, _consumeGroupName, "pendingUser", 120000, "0-0", 2);
if (info == null || == null || == 0)
{
_pendingConsumedQueue.Enqueue("No information in the unconfirmed queue。[{:yyyy-MM-dd HH:mm:ss}]");
await (3000);
continue;
}
foreach (var entry in )
{
if (entry == null) continue;
_pendingConsumedQueue.Enqueue($"Unconfirmed consumption information:{}[{:yyyy-MM-dd HH:mm:ss}]");
(<MessageModel>([1].ToString()));
PendingConsumeInfo = "";
//ACK
await (100);//Simulate business logic execution time
var success = (_streamKey, _consumeGroupName, );
if (success > 0)
{
//xdel
if ((_streamKey, ) > 0)
{
_pendingConsumedQueue.Enqueue($"consumer[pendingUser]Successfully consumed the message{},{:yyyy-MM-dd HH:mm:ss}");
}
else
{
_pendingConsumedQueue.Enqueue($"[pendingUser]removing{}[fail (e.g. experiments)],{:yyyy-MM-dd HH:mm:ss}");
}
}
else
{
_pendingConsumedQueue.Enqueue($"[pendingUser]consumers{}[fail (e.g. experiments)],{:yyyy-MM-dd HH:mm:ss}");
}
}
}
catch (Exception e)
{
PendingConsumeInfo = $"gain队列信息fail (e.g. experiments):{}";
}
PendingConsumeInfo = "";
await (3000);
}
}, _cancellationTokenSource.Token);
return ;
}
#endregion
summarize
This time we have implemented a simple to deploy, high performance, high availability message queue with Redis's Stream data type, which can be applied on small to medium sized projects in scenarios where you need to handle the flow of data.
bibliography
① Redis Streams
②Redis Commands