Location>code7788 >text

PHP to Go Series | ThinkPHP and Gin framework of Redis delayed message queuing technology practice

Popularity:340 ℃/2024-09-02 09:25:40

Hi everyone, I'm Codemaster Pioneer.

We are in a treasure or a multi-buy goods, if only placed an order but did not pay the actual payment, then in the order page will have a payment countdown, if after this point in time then the order will be automatically canceled. In such business scenarios, the general situation will be used to delay the queue.

Usually after the customer orders, the order data will be pushed to the delay queue and will set a delay time for the message, such as setting up five minutes, ten minutes, or fifteen minutes, etc., the specific length of time should still be measured in conjunction with the current business, and then the consumer side will be in the specified time arrives at the message for the payment of the payment status judgment, if the payment has been made is not dealt with, to or not paid! If it has been paid, it will not be processed, or unpaid, the order will be canceled, and release the inventory of goods.

We share this content, mainly based on Redis delayed queue implementation, of course, in addition to Redis can also use other technologies, such as RabbitMQ, Kafka, RocketMQ and other professional message queues. But the reason I use Redis is that its application scenarios are more extensive, we usually have more contact, and relative to the professional message queue it is not too complex configuration, easy to learn to get started, problems solved quickly, the path of learning things are easy to difficult.

In addition, if you are very skilled in the use of professional message queues mentioned above, you can also replace Redis with them, here is just a different storage medium, the logic of the implementation of the technology is not too different, the important thing is the design of the idea, we all take what you need.

Well, I will first introduce the logic of this delay queue implementation. Mainly divided into three parts, one is: the message is sent, if you set the delay time will be stored in the Redis delay queue, otherwise the message will be pushed directly to the Redis ready queue waiting to be consumed. Second: the expired message is taken out of the Redis delay queue and pushed to the Redis ready queue for consumption. Third: the consuming end will read out the messages from the Redis ready queue in order, and execute the corresponding business processing logic, if the processing fails, then the message will be pushed to the Redis delay queue again for the next retry.

The delayed queue mentioned here is the use of Redis ordered collection to achieve, it will be polled every second interval, if there is an expiration of the message, the message will be pushed to the Redis ready queue, and from the collection to remove the expired message, so you can wait for the consumer to consume. Next, we'll look at how to implement a Redis-based deferred queue from actual code.

Without further ado, let's get started! Let's take a look at the overall project directory structure, which is divided into two main sections, PHP and Go.

[manongsen@root php_to_go]$ tree -L 2
.
├── go_delay
│   ├── app
│   │   ├── controller
│   │   │   └── 
│   │   ├── config
│   │   │   └── 
│   │   ├── extend
│   │   │   └── 
│   │   └── 
│   ├── 
│   ├── 
│   └── 
└── php_delay
│   ├── app
│   │   ├── controller
│   │   │   └── 
│   ├── 
│   ├── 
│   ├── command
│   │   └── 
│   ├── route
│   │   └── 
│   ├── extend
│   │   └── 
│   ├── think
│   ├── vendor
│   └── .env

ThinkPHP

Use composer to create a php_delay project based on the ThinkPHP framework.

## current catalog
[manongsen@root ~]$ pwd
/home/manongsen/workspace/php_to_go/php_delay

## mounting ThinkPHP organizing plan
[manongsen@root php_delay]$ composer create-project topthink/think php_delay
[manongsen@root php_delay]$ cp . .env

## mounting Composer dependency package
[manongsen@root php_delay]$ composer require predis/predis
## Create a consumer script
[manongsen@root php_delay]$ php think make:command Consumer
## Create a producer script,For testing
[manongsen@root php_delay]$ php think make:command Producer

This is the core class of the delayed queue implementation, defining the ready, delayed, and failed message queues.send() method is used to send a message, where you can specify the$delay The parameter sets the delay time in seconds.wait() method is used to listen for messages on the consumer side. As you can see from the code below, multi-processing is also utilized here. The role of the parent process is to read the expiring message from the Redis ordered collection every second and push the message to the Redis ready queue, while the child process blocks and listens for messages from the ready queue, and callbacks the incoming messages to the user-defined business functions.

<?php
declare (strict_types = 1);

class Queue
{
    // The queue where ready messages are stored
    const QUEUE_READY = 'redis:queue:ready'; 

    // Queue where delayed messages are stored(The actual data structure is an ordered set)
    const QUEUE_DELAY = 'redis:queue:delay'; 

    // Queue where failure messages are stored
    const QUEUE_FAILED = 'redis:queue:failed'; 

    protected $_client;
    protected $_options = [
        'retry_seconds' => 5, // retry delay5unit of angle or arc equivalent one sixtieth of a degree
        'max_attempts'  => 5, // Maximum number of retries
    ];

    public function __construct()
    {
        // together with Redis establish a connection
        $this->_client = new \think\cache\driver\Redis(config(''));
        $this->_client->get("ping");
    }

    // send a message
    public function send($data, $delay = 0)
    {
        static $_id = 0;
        $id = \microtime(true) . '.' . (++$_id);
        $now = time();
        $package_str = \json_encode([
            'id'       => $id,    // messagesID
            'time'     => $now,   // current time
            'delay'    => $delay, // latency(unit of angle or arc equivalent one sixtieth of a degree)
            'attempts' => 0,      // Retries
            'data'     => $data   // messages内容
        ]);

        // 如果不是延时messages,则直接将messages推送到就绪队列
        if ($delay == 0) {
            $this->_client->lpush(static::QUEUE_READY, $package_str);
        } else {
            // 否则将messages写入到有序集合中
            $this->_client->zadd(static::QUEUE_DELAY, $now + $delay, $package_str);
        }
    }

    // Push data from an ordered collection to a ready queue
    public function tryToPullDelayQueue()
    {
        while (true) {
            try {
                $now = time(); // current time
                $options = ['LIMIT', 0, 128]; // each time you take 128 data entry
                $items = $this->_client->zrevrangebyscore(static::QUEUE_DELAY, $now, '-inf', $options);
                foreach ($items as $package_str) {
                    // Remove this data from the ordered set
                    $result = $this->_client->zrem(static::QUEUE_DELAY, $package_str);
                    if ($result !== 1) {
                        continue;
                    }
                    // connect dataJSONDeserialization parsing
                    $package = \json_decode($package_str, true);
                    if (!$package) {
                        // Failure to parse pushes to failure queue
                        $this->_client->lpush(static::QUEUE_FAILED, $package_str);
                        continue;
                    }
                    // connect data推送到就绪队列
                    $this->_client->lpush(static::QUEUE_READY, $package_str);
                }
            } catch (\Throwable $e) {
                echo $e->getMessage() . PHP_EOL;
            }

            // intervals1sPolling again after that
            sleep(1);
        }
    }

    // 监听messages
    public function wait($success_callback, $failure_callback)
    {
        echo "开始监听messages..." . PHP_EOL;
        // Creating a process
        // 父进程用于轮询有序集合messages
        // 子进程监听就绪队列messages
        $pid = pcntl_fork();
        if ($pid < 0) {
            exit('fork error');
        } else if($pid > 0) {
            // 轮询有序集合messages并推送到就绪队列
            (new \Queue())->tryToPullDelayQueue();
            pcntl_wait($status);
            exit();
        }

        while (true) {
            try {            
                // 阻塞监听就绪队列messages
                $data = $this->_client->brpop(static::QUEUE_READY, 0);
                if ($data) {
                    $package_str = $data[1];
                    // connect dataJSONDeserialization parsing
                    $package = json_decode($package_str, true);
                    if (!$package) {
                        // Failure to parse pushes to failure queue
                        $this->_client->lpush(static::QUEUE_FAILED, $package_str);
                    } else {
                        try {
                            // 将messages回调到我们在业务层面写的回调函数中
                            \call_user_func($success_callback, $package['data']);
                        } catch (\Throwable $e) {
                            $package['max_attempts'] = $this->_options['max_attempts'];
                            $package['error'] = $e->getMessage();
                            $package_modified = null;
                            // If an exception occurs and we set the failure callback function
                            if ($failure_callback) {
                                try {
                                    // then it will call back to the callback function we wrote at the business level
                                    $package_modified = \call_user_func($failure_callback, $e, $package);
                                } catch (\Throwable $ta) {
                                }
                            }
                            // 如果修改了messages内容,则重新构造messages
                            if (is_array($package_modified)) {
                                $package['data'] = $package_modified['data'] ?? $package['data'];
                                $package['attempts'] = $package_modified['attempts'] ?? $package['attempts'];
                                $package['max_attempts'] = $package_modified['max_attempts'] ?? $package['max_attempts'];
                                $package['error'] = $package_modified['error'] ?? $package['error'];
                            }
                            // 如果已经超过了Maximum number of retries,则将messages推送到失败队列
                            if (++$package['attempts'] > $package['max_attempts']) {
                                $this->fail($package);
                            } else {
                                // Otherwise into the ordered set,Waiting for the next round of inquiries
                                $this->retry($package);
                            }
                        }
                    }
                }
            } catch (\Throwable $e) {
                echo $e->getMessage() . PHP_EOL;
            }
        }
    }

    // Re-add to ordered set
    protected function retry($package)
    {
        // The delay time increases exponentially with the number of retries
        $delay = time() + $this->_options['retry_seconds'] * ($package['attempts']);
        $this->_client->zadd(static::QUEUE_DELAY, $delay, \json_encode($package, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT));
    }

    // Push to Failed Queue
    protected function fail($package)
    {
        $this->_client->lpush(static::QUEUE_FAILED, \json_encode($package, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT));
    }
}

This is the consumer side script, mainly to realize the specific business logic processing after receiving the message.

<?php
declare (strict_types = 1);

namespace app\command;

use think\facade\Cache;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

class Consumer extends Command
{
    protected function configure()
    {
        // command configuration
        $this->setName('app\command\consumer')
            ->setDescription('the app\command\consumer command');
    }

    protected function execute(Input $input, Output $output)
    {
        (new \Queue())->wait(function($data){
            // Here is the normal logic for receiving messages
            var_dump($data);
        }, function($e, $package){
            // Here is the logic for handling message exceptions
            return $package;
        });
    }
}

This one pushes the message, via the API interface, to the delayed queue.

<?php

namespace app\controller;

use app\BaseController;

class Notify extends BaseController
{
    public function sendMsg()
    {
        // reception (of transmitted signal) GET parameters
        $params = $this->request->param();
        if (empty($params["content"])) {
            return json(["code" => -1, "msg" => "Content cannot be empty"]);
        }
        $content = $params["content"];

        // push to a delayed queue 15 will be executed after a few seconds.
        (new \Queue())->send($content, 15);

        return json(["code" => 0, "msg" => "success"]);
    }
}

Let's actually test this by first executingphp think consumer Start the consumer and then execute thephp think run Start the service and finally invoke it using the Postman tool.

Gin

Initialize the go_delay project with the go mod.

## Current directory
[manongsen@root ~]$ pwd
/home/manongsen/workspace/php_to_go/go_delay

## Initialize the project
[manongsen@root go_delay]$ go mod init go_delay

## Install third-party dependencies
[manongsen@root go_delay]$ go get /gin-gonic/gin
[manongsen@root go_delay]$ /go-redis/redis

The logic here is similar to the PHP implementation above, with the caveat that in Go you're using concurrent processes to asynchronously poll for expiring messages from the Redis ordered collection, whereas PHP utilizes multiprocessing.

package extend

import (
"encoding/json"
"fmt"
"go_delay/app/config"
"time"

"/go-redis/redis"
)

var comId int

const (
// The queue where ready messages are stored
QUEUE_READY = "redis:queue:ready"

// The queue where delay messages are stored (the actual data structure is an ordered collection)
QUEUE_DELAY = "redis:queue:delay"

// Queue for failure messages
QUEUE_FAILED = "redis:queue:failed" // Queue where failed messages are stored.
)

type PackageData struct {
Id string `json: "id"` // The message ID.
Time int64 `json: "time"` // Current time.
Delay int `json: "delay"` // Delay in seconds.
Attempts int `json: "attempts"` // Number of retries.
MaxAttempts int `json: "max_attempts"` // Maximum number of retries.
Data string `json: "data"` // The content of the message.
Error string `json: "error"` // Error message.
}

type Queue struct {
RetrySeconds int
MaxAttempts int
MaxAttempts int }

func NewQueue() *Queue {
return &Queue{
RetrySeconds: 5, // retry delay 5 seconds
MaxAttempts: 5, // maximum number of retries
}
}

// Send the message
func (q *Queue) Send(data string, delay int) {
now := ().UnixMilli() / 1000
now := ().UnixMilli() / 1000
msgId := ("%d.%d", now, comId)
packageData := &PackageData{
Id: msgId, // messageId
Time: int64(now), // current time
Delay: delay, // delay time (in seconds)
Attempts: 0, // number of retries
Data: data, // message content
}
packageStr, err := (packageData)
if err ! = nil {
(" fail, err: %v\n", err)
return
}

// If it's not a delayed message, push the message directly to the ready queue
if delay == 0 {
(QUEUE_READY, packageStr)
} else {
// otherwise write the message to the ordered collection
z := {
Score: float64(int(now) + delay),
Member: packageStr,
}
(QUEUE_DELAY, z)
}
}

// Push data from the ordered collection to the ready queue
func (q *Queue) tryToPullDelayQueue() {
for {
// Current time
now := ().UnixMilli() / 1000
// Fetch 128 items at a time
z := {
Max: ("%d", now), Min: "-inf", now), z := { // current time now := (.
Min: "-inf", Offset: 0, z := {
Offset: 0,
Count: 128, }
}
cmd := (QUEUE_DELAY, z)
items, err := ()
if err ! = nil {
("ZRevRangeByScore fail, err: %v\n", err)
continue
}
for _, item := range items {
// Remove the data from the ordered set
intCmd := (QUEUE_DELAY, item)
if () ! = nil {
continue
}
var packageData *PackageData
// Deserialize and parse the JSON data.
err = ([]byte(item), &packageData)
if err ! = nil {
// If parsing fails, push to failure queue
(" fail, err: %v\n", err)
(QUEUE_FAILED, item)
continue
}
// Push the data to the ready queue.
(QUEUE_READY, item)
}

// Poll again after 1s interval
()
}
}

func (q *Queue) Wait(successCallback func(string) error, failureCallback func(error, *PackageData) *PackageData) {
// Start a thread that polls the ordered collection and pushes it to the ready queue.
go ()

for {
// Block listening on the ready queue
stringSliceCmd := (0, QUEUE_READY)
if () ! = nil {
(" fail, err: %v\n", ().Error()))
Error()))
}
data, err := ()
data, err := () if err ! = nil {
(" fail, err: %v\n", err)
continue
}
// Deserialize and parse the data JSON
var packageData *PackageData
packageStr := data[1]
err = ([]byte(packageStr), &packageData)
if err ! = nil {
(" fail, err: %v\n", err)
// Push to failure queue if parsing fails
(QUEUE_FAILED, packageStr)
continue
}

// Callback the message to the callback function we wrote at the business level
err = successCallback()
if err ! = nil {
("successCallback fail, err: %v\n", err)

// If an exception occurs and we set the failure callback function
= ()
= ()
if failureCallback ! = nil {
// then it will call back to the callback function we wrote at the business level
packageModified := failureCallback(err, packageData)
// Reconstruct the message
packageModified := failureCallback(err, packageData) // reconstruct the message
=
= =
= = = = = =
}
continue
}

// If the maximum number of retries has been exceeded, push the message to the failure queue
+= 1
if > {
(packageData)
} else {
// Otherwise go into the ordered collection and wait for the next round of polling
(packageData)
}
}
}

// Re-add to the ordered collection
func (q *Queue) retry(packageData *PackageData) {
// The delay increases exponentially with the number of retries
delay := ().Second() + *
packageStr, err := (packageData)
if err ! = nil {
(" fail, err: %v\n", err)
return
}
z := {
Score: float64(delay),
Member: packageStr, }
}
(QUEUE_DELAY, z)
}

// Push to the failed queue
func (q *Queue) fail(packageData *PackageData) {
packageStr, err := (packageData)
if err ! = nil {
(" fail, err: %v\n", err)
if err != nil { (" fail, err: %v\n", err)
}
(QUEUE_FAILED, packageStr)
}

func InitQueue() {
queue := NewQueue()
(func(data string) error {
// Receive the message correctly
("Message received: %s\n", data)
return nil
}, func(err error, packageData *PackageData) *PackageData {
// Add processing logic here if the message is abnormal
return packageData
})
}

utilizationgo () Starts a consumer. As you can see here, you don't need to start a separate consumer script process in Go, you just need to start an asynchronous coprocess that listens for messages, so it's a lot easier to implement Redis delayed queuing in Go than it is in PHP.

package main

import (
	"go_delay/app"
	"go_delay/app/config"
	"go_delay/app/extend"

	"/gin-gonic/gin"
)

func main() {
	r := ()
	(r)
	()
	go ()
	(":8001")
}

This one pushes the message, via the API interface, to the delayed queue.

package controller

import (
"go_delay/app/extend"
"net/http"

"/gin-gonic/gin"
)

func SendMsg(c *) {
// Receive GET parameters
content := ("content")
if len(content) == 0 {
(, {
"msg": "Content can't be empty",
"code": -1, })
})
return
}

// Push to delayed queue and execute after 15 seconds.
queue := ()
(content, 15)

// Directly return
(, {
"code": 0,
"msg": "success",
})
}

We directly executego run Start the service and then invoke it using the Postman utility.

concluding remarks

I believe that you have been based on Redis delayed queue implementation, some understanding of the way. As you can see from the example above, the core data structures used in this delayed queue are Redis lists and ordered collections. The ordered collection is used to store messages with a set delay duration, while the list stores ready messages, i.e., messages waiting to be consumed by consumers.

The difference between PHP and Go is that in PHP, you need to start a separate consumer script, and you need to poll an ordered collection for messages that are due to expire in an additional process, otherwise the message consumption logic will be blocked. In Go, on the other hand, you only need to asynchronously open a thread to wait for messages to arrive, and open another thread to poll for messages that are due to expire, so you can see that Go's advantage over PHP is greater.

In addition, in the Go language can also be used to replace Redis channel Channel, the same can be achieved delayed queuing, but Channel can not be persisted to disk, once the service hangs the message will be lost, so it is better to honestly use Redis good. Then good technical knowledge, you need to personally practice in order to absorb, so it is recommended that you manually practice, if you want to get a complete case code friends, you can reply to the public number "8392" can be, the content of this share ends here, I hope you can help.

Thanks for reading, personal opinion is for reference only, welcome to express different views in the comment section.

Welcome to follow, share, like, favorite, in the watch, I'm the author of WeChat public number "code farmer forefather".