Location>code7788 >text

(F) Redis message queues List, Streams

Popularity:659 ℃/2024-07-29 15:37:39

Is Redis suitable for message queues? What are the solutions? The first step is to understand the message access requirements and workflow of a message queue.

1、Message queue

We generally refer to the component of a message queue that sends messages as the producer and the component that receives messages as the consumer. The following figure shows the architectural model of a generic message queue:
Message queues have to fulfill three requirements when accessing messages, which are message order preservation, handling duplicate messages and ensuring message reliability.

(1) Message order preservation

Although the consumer processes messages asynchronously, the consumer still needs to process messages in the order in which they are sent by the producer, to avoid messages sent later being processed first.

(2) Repeat message processing

When a consumer reads a message from a message queue, sometimes the message is retransmitted due to network congestion. If duplicate messages are processed multiple times, this can cause a business logic to be executed multiple times, resulting in data problems.

(3) Message reliability assurance

When a consumer is processing a message, there may also be a situation where the message is not processed to completion due to a failure or downtime. At this point, the message queue needs to be able to provide a guarantee of message reliability, that is, when the consumer restarts, it can re-read the message to process it again, otherwise, there will be a problem of missed message processing.

2. List Program

Lists inherently access data in first-in-first-out order, so if you use a list as a message queue to store messages, you can already meet the requirements of the message conservation of orderThe producer can use the LPUSH command to write the messages to be sent to the List sequentially. Specifically, the producer can use the LPUSH command to write the messages to be sent to the List in order, while the consumer can use the RPOP command to read the messages from the other end of the List in the order in which they were written and process them sequentially.
List does not actively notify the consumer that new messages have been written, and if the consumer calls the RPOP command in a round-robin fashion, it will incur CPU overhead. Redis provides the BRPOP command, known as a blocking read, which automatically blocks the client when no queue data has been read until new data has been written to the queue, and then it starts to read the new data. This approach saves CPU overhead compared to a consumer program that keeps calling the RPOP command itself.

in addressing Repeat Message HandlingOn the one hand, the message queue has to be able to provide a globally unique ID for each message, and on the other hand, the consumer program has to keep a record of the IDs of the messages that have already been processed, and if they have been processed, the consumer program will not process them anymore. This processing property, also known as idempotence, means that the consumer receives the same processing result one or more times for the same message. However, the List itself does not generate IDs for each message, so a globally unique ID for the message needs to be generated by the producer program before sending the message and included in the message for the consumer to process.

in order to Ensure message reliabilityThe List type provides the BRPOPLPUSH command, which is used to let the consumer program read a message from a List, and at the same time, Redis will insert the message into another List (which can be called a backup List) to store it, so that if the consumer program reads the message but doesn't process it correctly, it can read the message again and process it from the backup List when it is restarted. so that if the consumer program reads the message but doesn't process it properly, when it restarts, it can read the message from the backup List and process it again.

3. Streams Program

If the producer sends messages very quickly and the consumer processes them slowly, the messages in the List will accumulate and put a lot of pressure on Redis memory, and the List doesn't support simultaneous processing by multiple consumers. This is where the Streams datatype provided by Redis since version 5.0 comes in. Streams is a data type designed specifically for message queues, and it provides a rich set of commands for message queue operations:

  • XADD: Inserts messages, ensures order, can automatically generate globally unique IDs
  • XREAD: used to read messages, can read data by ID
  • XREADGROUP: reads messages in consumption group form
  • XPENDING: command can be used to query the messages that have been read but not yet acknowledged by all consumers within each consumer group
  • XACK: command is used to confirm to the message queue that message processing is complete

XADD The format of the command to insert a new message is in the form of a key-value pair, for example, to insert a message into a message queue with the name mqstream:

XADD mqstream * repo 5
"1599203861727-0"

Where * means that Redis will automatically generate a globally unique ID for the inserted data, such as "1599203861727-0". You can also set an ID directly after the message queue name without using *, as long as it is globally unique.

The automatically generated ID consists of two parts, the first part "1599203861727" is the current server time in milliseconds when the data is inserted, and the second part indicates the message number of the inserted message within the current milliseconds, starting from zero. For example, "1599203861727-0" means the first message within the milliseconds of "1599203861727".

XREAD When reading a message, you can specify a message ID and start reading from the next message of this message ID. Setting the block configuration item enables a blocking read operation similar to BRPOP, in milliseconds. For example, starting with message ID 1599203861727-0, read all subsequent messages (3 in total)

XREAD BLOCK 100 STREAMS  mqstream 1599203861727-0
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
      2) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
      3) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"

In another example, the command ends with $ to read the latest message, and a block 10000 configuration item is set to indicate that XREAD will block for 10000 milliseconds (i.e., 10 seconds) when reading the latest message if no message arrives, and then return. When there are no messages in the message queue mqstream, XREAD returns null after 10 seconds.

XREAD block 10000 streams mqstream $
(nil)
(10.00s)

XGROUP Creating a consumer group is a feature that differentiates it from a List, and after it is created, Streams can be used to create a consumer group. XREADGROUP command to allow consumers within the consumer group to read the message.
For example, we execute the following command to create a consumer group named group1 that consumes the message queue mqstream

XGROUP create mqstream group1 0
OK

Execute the command, let consumer1 in the group1 consumer group read all the messages from mqstream, the last parameter of the command is ">", which means to start reading from the first message that has not been consumed yet. Before consumer1 reads a message, no other consumer in group1 has read a message, so consumer1 gets all the messages in the mqstream message queue, totaling 4 messages.

XREADGROUP group group1 consumer1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"
      2) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
      3) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
      4) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"

If a message in the queue has already been read by another consumer, it cannot be read by another consumer, for example, null is returned when consumer2 within group1 is asked to read the message again.

XREADGROUP group group1 consumer2  streams mqstream 0
1) 1) "mqstream"
   2) (empty list or set)

The purpose of a consumer group is to load-balance by having multiple consumers in the group share reads, for example, by having consumer1, 2, and 3 in group2 each read a message

XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"
XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"

In order to ensure that consumers can still read unprocessed messages after a failure or downtime, Streams automatically uses an internal queue (also known as a PENDING List) to hold messages read by each consumer in a consumer group until the consumer notifies Streams with the XACK command that "the message has been processed! ". If the consumer does not successfully process the message, it does not send an XACK command to Streams, and the message is still retained. At this point, the Consumer can reboot and then notify Streams with the XPENDING command to view messages that have been read but not yet acknowledged as processed.

For example, look at the number of messages that have been read but not yet acknowledged by each consumer in group2. The second and third rows of the XPENDING return result represent the minimum and maximum IDs of messages read by all consumers in group2, respectively.

XPENDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"

If you need to see more about what data has been read by a particular consumer, you can execute the following command. consumer2 The ID of the message that has been read is 1599274912765-0

XPENDING mqstream group2 - + 10 consumer2
1) 1) "1599274912765-0"
   2) "consumer2"
   3) (integer) 513336
   4) (integer) 1

When 1599274912765-0 has been processed by consumer2, consumer2 can then use the XACK command notifies Streams, and this message is then deleted. When we look at it again with the XPENDING command, we can see that consumer2 no longer has any messages that have been read, but not yet acknowledged for processing.


 XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)

A table summarizing the features and differences between message queues implemented as Lists and Streams.
Redis Redis is a very lightweight key-value database, Kafka, RabbitMQ is specifically oriented to the message queuing scenario of the heavyweight software, for example, the operation of Kafka will need to deploy the ZooKeeper. if the components of the distributed system message communication is not too much, then Redis only need to use a limited amount of memory space to meet the needs of the message storage, and that the The high-performance features of Redis can support fast message reading and writing, which makes it a good solution for message queues.