Location>code7788 >text

Redis Series: Using Stream to Implement Message Queues (Graphic Summary + Go Case)

Popularity:266 ℃/2024-08-07 08:57:46

Redis 24-part collection

1 Pilot

We are in the process ofRedis Series 14: Implementing Message Queues with ListsThis article discusses in detail how to implement message queues using Lists, but at the same time sees many limitations, such as:

  • No support for message acknowledgment mechanismNo good ACK response.
  • Message backtracking is not supportedIt is not possible to troubleshoot the problem and do message analysis
  • List is executed according to the FIFO mechanism, so there is a risk of message stacking.
  • Inefficient searching, as a linear structure, locating a piece of data in a List requires traversal, O(N) time complexity
  • The concept of a Consumer Group does not exist., unable to realize multiple consumers to form a group for consumption

2 About Stream

Redis Stream is a new data structure introduced in Redis version 5.0, which is mainly used toEfficient processing of streaming data, especially for message queuing, logging and real-time data analysis scenarios
The following is a summary of the Redis StreamKey Features:
1. Data structure: A Redis Stream is a log data structure consisting of ordered messages.Each message has a globally unique ID, ensuring that messages are sequential and traceable.
2. Message ID: The ID of a message consists of two parts, a millisecond timestamp and a sequence number. This designEnsures monotonic incremental message IDs, i.e., the ID of the new message is always greater than the ID of the old message.
3. Consumer groupsRedis Stream supports the concept of consumer groups, allowing multiple consumers to subscribe to Stream as a group, and each message will only be processed by one consumer within the group, avoiding duplicate consumption of messages.

And the main advantages.

1. Persistent storage: Messages in the Stream can bePersistent storage ensures no data loss, messages can be restored even after a Redis server reboot.
2. Orderliness: Messages are added to the Stream by generating message IDs in the order in which they are generated, and can beRetrieving messages according to specified conditions ensures message ordering
3. Multicast and packet consumption: Supports multiple consumers consuming messages in the same stream at the same time, and consumers can be organized into consumption groups to implement theGroup consumption of messages
4. Message Acknowledgement Mechanism: Consumers can get the best of both worlds through theXACK command to confirm successful consumptionMessages are guaranteed to be back-consumed at least once to ensure that they are not processed repeatedly.
5. Blocking Read: The consumer can choose the blocking read mode, when there is no new message, the consumer will wait until a new message arrives.
6. message traceability: Facilitates complementary counting, special data handling, and backtracking of problems

3 Main commands

1. XADD: Add a message to the Stream. If the specified Stream does not exist, it will be created automatically.
2. XREAD: Get the list of messages in the Stream in a blocking/non-blocking manner.
3. XREADGROUP: Reads messages from a consumer group and supports blocking reads.
4. XACK: Confirms that the consumer has successfully processed the message.
5. XGROUP: Used to manage consumer groups, including operations such as creating, setting IDs, and destroying consumer groups.
6. XPENDING: Query pending messages in the consumer group.

3.1 XADD Message Logging

The XADD command is used to add messages to the Redis Stream data structure.

3.1.1 Basic syntax of the XADD command

XADD stream_name [MAXLEN maxlen] [ID id] field1 value1 [field2 value2 ...]

1. stream_name: Specifies the name of the Stream to which the message is to be added.
2. MAXLEN maxlen: Optional parameter to limit the maximum length of the Stream. When the length of Stream reaches maxlen, old messages will be deleted automatically.
3. ID id: Optional parameter to specify the ID of the message. if this parameter is not specified, Redis automatically generates a unique ID.
4. field1 value1 [field2 value2 ...]: The fields and values of the message, the content of the message in the form of key-value.

An important use of the XADD command is to implement the message publishing feature, which allows publishers to add messages to the Stream using the XADD command.

3.1.2 XADD example

Suppose we have a file nameduserinfo_streamStream and would like to add to it a Stream containing asensor_idcap (a poem)temperaturefield of the message, we can use the following command:

XADD userinfo_stream * user_name brand age 18

In this example, the*Indicates to have Redis automatically generate a unique message ID. the message contains two fields:usernamecap (a poem)ageand their values arebrandcap (a poem)18. So this side records a user information with the name ofbrandAge18Age.

3.1.3 What to look for?

  • If the specified Streamnon-existentThe XADD command willCreate a new Stream
  • The ID of the message is unique, and Redis ensures that theThe IDs of messages in Stream are monotonically increasing. If ID is specified, the ID of the new message must be greater than the IDs of all existing messages in the Stream.
  • utilizationThe MAXLEN parameter allows you to limit the size of the Stream, which is very useful when dealing with a large number of messages and prevents Stream from taking up too much memory or disk space.

3.2 XREAD Message Consumption

i.e., read the message from the queue (consume it)

3.2.1 Basic Syntax of the XREAD Command

The basic syntax of the XREAD command is as follows:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

1. COUNT count: This is an optional parameter to specify the maximum number of messages to be read at one time. If not specified, the default is 1.
2. BLOCK milliseconds: This is also an optional parameter to specify the blocking time (in milliseconds). If a blocking time is specified and there are currently no consumable messages, the client will block for the specified time and wait. If this parameter is not set or is set to 0, the command will return immediately, regardless of whether there are consumable messages.
3. STREAMS key [key ...] ID [ID ...]: This section specifies the streams (Streams) to be consumed and the corresponding starting message ID. multiple streams and corresponding starting IDs can be specified at a time.

Working mechanism of the XREAD command
1. Read the message after the specified IDThe : XREAD command returns the message after the specified ID (not the message itself with the specified ID). If no ID is specified, or if the specified ID does not exist in the stream, then the command will read the message from the beginning or end of the stream, depending on the value of the ID (e.g., "0-0" means read from the beginning of the stream, and "$" means read from the stream's current max. ID of the stream).
2. Blocking of readings: When the BLOCK parameter is set, if there are no currently consumable messages, the client will enter a blocking state until a new message arrives or the blocking timeout expires. This mechanism is very suitable for realizing the consumer waiting for the producer to produce a new message scenario.
3. Support for multiple streams: The XREAD command supports reading messages from multiple streams at the same time, simply specify multiple streams and corresponding start IDs in the command.

3.2.2 XREAD example

Suppose we have a file nameduserinfo_streamstream and want to read messages from that stream. Here are some examples:
1. Non-blocking reading of the latest news

XREAD COUNT 1 STREAMS userinfo_stream $

This command will try to retrieve the data from theuserinfo_streamReads the latest message (if any) from the stream.$is a special ID indicating the current maximum ID of the stream.

2. Blocking the reading of the latest news

XREAD COUNT 1 BLOCK 1000 STREAMS userinfo_stream $

This command blocks for 1000 milliseconds, waiting for theuserinfo_streamA new message appears in the stream. If a new message arrives within 1000 milliseconds, the command returns that message; otherwise, the command times out and returns nil.

3. Reading from a specific ID

XREAD COUNT 2 STREAMS userinfo_stream 1722159931000-0
1) 1) "userinfo_stream"
    2)  1) 1) "1722159931000-0"
         2) 1) "user_name"
             2) "brand"
             3) "age"
             4) "18"

This command will take a page from theuserinfo_streamRead in a stream with an ID greater than or equal to1722159931000-0message that returns at most data.

3.2.3 What to look for?

1. Uniqueness of the message ID: In Redis Streams, each message has a globally unique message ID.This message ID consists of two parts: a timestamp and a sequence number. The timestamp indicates when the message was added to the stream, and the sequence number indicates the order of messages added within the same timestamp.
2. Consumer groups: While the XREAD command itself does not directly address the concept of consumer groups, Redis Streams also supports the consumer group pattern, which allows a group of consumers to collaborate on consuming messages in the same stream.In consumer group mode, the XREADGROUP command is usually used instead of the XREAD command to read messages.
3. Performance considerations: The XREAD command may consume more CPU and memory resources when reading a large number of messages. Therefore, in practical applications, you need to set the value of the COUNT parameter reasonably according to the actual situation, so as to avoid performance problems caused by reading too many messages at once.

3.3 Consumer Group Consumer Group Model

Typical multicast pattern, in scenarios with high real-time requirements, if you want to speed up the processing of messages. Then this is a good choice. we let the queue be logically partitioned with different consumer groups to isolate the consumption. So:

image

Consumer groups allow multiple consumers (clients or processes) to work together to process messages in the same stream. Each consumer group maintains its own consumption offset (i.e., the location of processed messages) to support load balancing and fault tolerance among consumers.

3.3.1 Creating consumer groups

Use the XGROUP CREATE command to create a consumer group.

# stream_name: queue name
# consumer_group: consumer group
# msgIdStartIndex: start position of message Id
# msgIdStartIndex: the end of the message id.
# $ Indicates that the consumer group is created from the current end of the stream (i.e., the latest message). If the stream does not exist, the MKSTREAM option will automatically create the stream
XGROUP CREATE stream_name consumer_group msgIdStartIndex-msgIdStartIndex
# or
XGROUP CREATE stream_name consumer_group $ MKSTREAM

The following is an example implementation with consumer group 1 (consumer_group1) and consumer group 2 (consumer_group2) created for the queue userinfo_stream:

> xgroup create userinfo_stream consumer_group1 0-0
OK
> xgroup create userinfo_stream consumer_group2 0-0
OK

3.3.2 Reading messages

Consumers can be reached through theXREADGROUP command reads a message from a consumer group.XREADGROUP command not only reads the messages, but also updates the status of the consumers in the consumer group, i.e., marks which messages have been read.

# group_name: consumer group name
# consumer_name: consumer name
# COUNT number: count number of consumers
# BLOCK ms: Indicates that the command will block for up to xx milliseconds if there are no new messages in the stream, 0 means infinite blocking.
# stream_name: queue name
# id: message consumption ID
# []: for optional parameters
# `>`: placed at the end of the command parameters, means start reading from messages that have not been consumed yet;

XREADGROUP GROUP group_name consumer_name [COUNT number] [BLOCK ms] STREAMS stream_name [stream ...] id [id ...]
# or
XREADGROUP GROUP group_name consumer_name COUNT 1 BLOCK 2000 STREAMS stream_name >

The following is an example implementation where consumer consumer1 of consumer_group1 reads a message from userinfo_stream in a blocking fashion:

XREADGROUP GROUP consumer_group1 consumer1 COUNT 1 BLOCK 0 STREAMS userinfo_stream >
1) 1) "userinfo_stream"
   2) 1) 1) "1722159931000-0"
         2) 1) "user_name"
            2) "brand"
            3) "age"
            4) "18"

3.3.3 Acknowledgement messages

After processing the message, the consumer needs to send an XACK command to acknowledge the message. This tells Redis that the message has been successfully processed and can be removed from the consumer group's list of pending messages

# stream_name: queue name
# group_name: Consumer group name.
# <message-id> is the ID of the message to be acknowledged.

XACK stream_name group_name <message-id>.
# ACK acknowledges two messages
XACK userinfo_stream consumer_group1 1722159931000-0 1722159932000-0
(integer) 2

3.3.4 PLE: message reliability assurance

The PEL (Pending Entries List) records messages that are currently being read by the consumer but have not yet been acknowledged (ACK). These messages remain in the PEL until they are successfully processed by the consumer and an ACK command is sent. If a consumer crashes or fails to send an ACK command in time, Redis ensures that these messages are reassigned to other consumers for processing, thus enabling reliable message delivery.

XPENDING stream_name group_name

In the following example, we look at theuserinfo_stream Consumption group inconsumer_group1 The message information that has been read but not acknowledged by individual consumers in the

XPENDING userinfo_stream consumer_group1
1) (integer) 2 # number of unacknowledged messages
2) "1722159931000-0"
3) "1722159932000-0"

See the official website documentation for detailed stream operations:/docs/data-types/streams-tutorial/

4 Implementing Stream Queue Capabilities with Golang

4.1 First install go-redis/redis library

> go get /go-redis/redis/v8
go: downloading /go-redis/redis v6.15.9+incompatible
go: downloading /go-redis/redis/v8 v8.11.5
go: downloading /dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
go: downloading /cespare/xxhash/v2 v2.1.2
go: added /cespare/xxhash/v2 v2.1.2
go: added /dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
go: added /go-redis/redis/v8 v8.11.5

Note: v8 here is the version number of the library, you can adjust it accordingly

logical implementation

package main
  
import (
	"context"
	"fmt"
	"log"
	"time"
  
	"/go-redis/redis/v8"
)
  
func main() {
	// connect toRedis
	rdb := (&{
		Addr: "localhost:6379", // Redisaddress
		Password: "", // cryptographic(If there is one.)  
		DB: 0, // Use the defaultDB
	})
  
	ctx := ()
  
	// establishStream
	_, err := (ctx, &{
		Stream: "mystream",
		Values: map[string]interface{}{
			"field1": "value1",
			"field2": "value2",
		},
	}).Result()
	if err != nil {
		("Failed to add message to stream: %v", err)
	}  
  
	// establishConsumer Group
	_, err = (ctx, "mystream", "mygroup", "$").Result()
	if err != nil && err != {
		("Failed to create consumer group: %v", err)
	}  
  
	// Consumer reads the message
	go func() {
		for {
			msgs, err := (ctx, &{
				Group:    "mygroup",
				Consumer: "myconsumer",
				Streams:  []string{"mystream", ">"},
				Count:    1,
				Block: 1000, // blockage1000millisecond
			}).Result()
			if err != nil {
				if err == {
					// overtime pay,Nothing new.
					continue
				}  
				("Failed to read from stream: %v", err)
			}  
  
			for _, msg := range msgs[0].Messages {
				("Received: %s %s\n", , )
  
				// acknowledgement message
				_, err = (ctx, "mystream", "mygroup", ).Result()
				if err != nil {
					("Failed to ack message: %v", err)
				}  
			}  
		}  
	}()
  
	// Simulate the producer to continue sending messages
	for i := 0; i < 5; i++ {
		_, err := (ctx, &{
			Stream: "mystream",
			Values: map[string]interface{}{
				"field1": ("value%d", i+1),
				"field2": "another value",
			},
			MaxLen: 100,
			Approximate: true,
		}).Result()
		if err != nil {
			("Failed to add message to stream: %v", err)
		}  
		(2 * ) // Simulated production intervals
	}  
  
	// take note of:In practice,trump card (in card games)goroutineUsually does not exit immediately,Instead, it will wait for certain triggers

5 Application Scenarios

1. Message queues: Redis Stream can be used as a message queue, supporting publish, subscribe and consume messages.
2. Logging: Write log information to Redis Stream for subsequent query and analysis.
3. Real-time data analysis: Combine with other data structures of Redis (e.g. Sorted Set, Hash, etc.) to analyze the data in Stream in real time.

6 Summary

Redis Stream is a great addition to Redis in the area of message queuing and streaming data processing. It provides simple but powerful data streaming capabilities, providing developers with more options and flexibility. The advantages of Stream over List are as follows:

  • Support for message acknowledgement mechanism (ACK answer acknowledgement)
  • Supports message backtracking for troubleshooting and message analysis.
  • There exists the concept of Consumer Group, which allows for group consumption and batch consumption, and can load multiple consumption instances.