您好,欢迎来到锐游网。
搜索
您的当前位置:首页Laravel消息队列源码分析

Laravel消息队列源码分析

来源:锐游网

Laravel5.7
队列驱动为redis
缓存驱动 CACHE_DRIVER=redis

config/queue.php
    retry_after: 90
    block_for: null
// 配置赋值方式
return new RedisQueue(
    $this->redis, $config['queue'],
    $config['connection'] ?? $this->connection,
    $config['retry_after'] ?? 60,
    $config['block_for'] ?? null
);

可见如果retry_after设置null,将会重置为60。
block_for可以是null。

一,投递队列
1、普通队列

$data = ['data' => Str::random(10), 'time' => date('Y-m-d H:i:s')];
dispatch(new TestJob($data))->onConnection('redis')->onQueue('testqueue');

普通队列会存储在一个list里面:queues:testqueue
每个元素就是任务的具体内容:

{"displayName":"App\\Jobs\\TestJob","job":"Illuminate\\Queue\\CallQueuedHandler@call","maxTries":null,"timeout":null,"timeoutAt":null,"data":{"commandName":"App\\Jobs\\TestJob","command":"O:16:\"App\\Jobs\\TestJob\":8:{s:4:\"data\";a:2:{s:4:\"data\";s:10:\"WKSmRQJsbs\";s:4:\"time\";s:19:\"2020-02-09 11:40:51\";}s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";s:5:\"redis\";s:5:\"queue\";s:9:\"testqueue\";s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:5:\"delay\";N;s:7:\"chained\";a:0:{}}"},"id":"0NB0RK9CKQRVbbuskTWUSs8Lp91XqYzW","attempts":0}

2、延迟队列

$data = ['data' => Str::random(10), 'time' => date('Y-m-d H:i:s')];
dispatch(new TestJob($data))->delay(now()->addMinute(20))->onConnection('redis')->onQueue('testqueue');

延迟队列会存储在一个zset里面:queues:testqueue:delayed
每个元素的value为任务的具体内容,score为下次被执行的时间。

value:{"displayName":"App\\Jobs\\TestJob","job":"Illuminate\\Queue\\CallQueuedHandler@call","maxTries":null,"timeout":null,"timeoutAt":null,"data":{"commandName":"App\\Jobs\\TestJob","command":"O:16:\"App\\Jobs\\TestJob\":8:{s:4:\"data\";a:2:{s:4:\"data\";s:10:\"lIRuVmQsdK\";s:4:\"time\";s:19:\"2020-02-09 11:40:59\";}s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";s:5:\"redis\";s:5:\"queue\";s:9:\"testqueue\";s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:5:\"delay\";O:25:\"Illuminate\\Support\\Carbon\":3:{s:4:\"date\";s:26:\"2020-02-09 12:00:59.123314\";s:13:\"timezone_type\";i:3;s:8:\"timezone\";s:3:\"UTC\";}s:7:\"chained\";a:0:{}}"},"id":"8utPrwDVunKYZpol2DgQJdaalZy2szMc","attempts":0}
score: 1581249659

3、retry队列
config/queue.php 设置了 retry_after 的值,当进程每次从主队列pop一个任务,同时会备份此任务到retry队列,后面会讲到。
retry队列存储在zset里面:queues:testqueue:reserved
每个元素的value为任务的具体内容,score为下次被执行的时间,也是 retry_after 的值。

value: {"maxTries":null,"attempts":1,"timeout":null,"job":"Illuminate\\Queue\\CallQueuedHandler@call","id":"0NB0RK9CKQRVbbuskTWUSs8Lp91XqYzW","timeoutAt":null,"data":{"command":"O:16:\"App\\Jobs\\TestJob\":8:{s:4:\"data\";a:2:{s:4:\"data\";s:10:\"WKSmRQJsbs\";s:4:\"time\";s:19:\"2020-02-09 11:40:51\";}s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";s:5:\"redis\";s:5:\"queue\";s:9:\"testqueue\";s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:5:\"delay\";N;s:7:\"chained\";a:0:{}}","commandName":"App\\Jobs\\TestJob"},"displayName":"App\\Jobs\\TestJob"}
score: 1581309060

二、执行队列

找到queue:work命令对应的文件 vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php

命令行参数及注释:

'queue:work
    {connection? : The name of the queue connection to work}
    {--queue= : The names of the queues to work}
    {--daemon : Run the worker in daemon mode (Deprecated)}
    {--once : Only process the next job on the queue}
    {--stop-when-empty : Stop when the queue is empty}
    {--delay=0 : The number of seconds to delay failed jobs}
    {--force : Force the worker to run even in maintenance mode}
    {--memory=128 : The memory limit in megabytes}
    {--sleep=3 : Number of seconds to sleep when no job is available}
    {--timeout=60 : The number of seconds a child process can run}
    {--tries=0 : Number of times to attempt a job before logging it failed}';
php artisan queue:work redis --queue=testqueue

关键代码:

protected function runWorker($connection, $queue)
{
    $this->worker->setCache($this->laravel['cache']->driver());

    return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
        $connection, $queue, $this->gatherWorkerOptions()
    );
}

只要没设置 --once 参数,那么就是daemon运行。

vendor/laravel/framework/src/Illuminate/Queue/Worker.php
public function daemon($connectionName, $queue, WorkerOptions $options)
{
    if ($this->supportsAsyncSignals()) {
        $this->listenForSignals();
    }

    $lastRestart = $this->getTimestampOfLastQueueRestart();

    while (true) {
        // Before reserving any jobs, we will make sure this queue is not paused and
        // if it is we will just pause this worker for a given amount of time and
        // make sure we do not need to kill this worker process off completely.
        if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
            $this->pauseWorker($options, $lastRestart);

            continue;
        }

        // First, we will attempt to get the next job off of the queue. We will also
        // register the timeout handler and reset the alarm for this job so it is
        // not stuck in a frozen state forever. Then, we can fire off this job.
        $job = $this->getNextJob(
            $this->manager->connection($connectionName), $queue
        );

        if ($this->supportsAsyncSignals()) {
            $this->registerTimeoutHandler($job, $options);
        }

        // If the daemon should run (not in maintenance mode, etc.), then we can run
        // fire off this job for processing. Otherwise, we will need to sleep the
        // worker so no more jobs are processed until they should be processed.
        if ($job) {
            $this->runJob($job, $connectionName, $options);
        } else {
            $this->sleep($options->sleep);
        }

        // Finally, we will check to see if we have exceeded our memory limits or if
        // the queue should restart based on other indications. If so, we'll stop
        // this worker and let whatever is "monitoring" it restart the process.
        $this->stopIfNecessary($options, $lastRestart, $job);
    }
}
// 判断是否要终止程序
protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null)
{
    if ($this->shouldQuit) {
        $this->stop();
    } elseif ($this->memoryExceeded($options->memory)) {
        $this->stop(12);
    } elseif ($this->queueShouldRestart($lastRestart)) {
        $this->stop();
    } elseif ($options->stopWhenEmpty && is_null($job)) {
        $this->stop();
    }
}

// 获取任务
protected function getNextJob($connection, $queue)
{
    try {
        foreach (explode(',', $queue) as $queue) {
            if (! is_null($job = $connection->pop($queue))) {
                return $job;
            }
        }
    } catch (Exception $e) {
        $this->exceptions->report($e);

        $this->stopWorkerIfLostConnection($e);

        $this->sleep(1);
    } catch (Throwable $e) {
        $this->exceptions->report($e = new FatalThrowableError($e));

        $this->stopWorkerIfLostConnection($e);

        $this->sleep(1);
    }
}

我们知道 --queue 参数可以设置多个队列,以逗号隔开,并且排在前面的先被消费完,才开始消费后面的队列,getNextJob方法可以说明这一点。

pop方法,我们定位到 vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php

public function pop($queue = null)
{
    $this->migrate($prefixed = $this->getQueue($queue));

    if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
        return;
    }

    [$job, $reserved] = $nextJob;

    if ($reserved) {
        return new RedisJob(
            $this->container, $this, $job,
            $reserved, $this->connectionName, $queue ?: $this->default
        );
    }
}

$prefixed 为队列名 queues:testqueue

migrate 方法的作用是将延迟队列和retry队列中符合时间要求的任务迁移到主队列,并且是放在队列头部,优先消费。
我设置的 retry_after = 90,根据retry_after的赋值可以知道$this->retryAfter永远是非null的,那么delayed和reserved永远会被migrate。

protected function migrate($queue)
{
    $this->migrateExpiredJobs($queue.':delayed', $queue);

    if (! is_null($this->retryAfter)) {
        $this->migrateExpiredJobs($queue.':reserved', $queue);
    }
}

public function migrateExpiredJobs($from, $to)
{
    return $this->getConnection()->eval(
        LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->currentTime()
    );
}

public static function migrateExpiredJobs()
{
    return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

for i = 1, #val, 100 do
    redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
end
end

return val
LUA;
}

迁移完之后,就开始获取主队列中的任务。

protected function retrieveNextJob($queue)
{
    if (! is_null($this->blockFor)) {
        return $this->blockingPop($queue);
    }

    return $this->getConnection()->eval(
        LuaScripts::pop(), 2, $queue, $queue.':reserved',
        $this->availableAt($this->retryAfter)
    );
}

我设置的 block_for 为 null

由此可见,pop出来的任务都会被备份到retry队列中,成功执行完后就被删除。

那么,为什么要备份呢呢?试想一下,如果程序pop出一个任务,然后程序在处理过程中意外挂掉了,那么这个任务是不是就丢失了,显然这是不允许出现的,所以需要先备份。

LuaScripts::pop() 方法:
public static function pop()
{
    return <<<'LUA'
-- Pop the first job off of the queue...
local job = redis.call('lpop', KEYS[1])
local reserved = false

if(job ~= false) then
-- Increment the attempt count and place job on the reserved queue...
reserved = cjson.decode(job)
reserved['attempts'] = reserved['attempts'] + 1
reserved = cjson.encode(reserved)
redis.call('zadd', KEYS[2], ARGV[1], reserved)
end

return {job, reserved}
LUA;
}

使用lua脚本可实现原子化操作,此方法确保了pop和zadd操作几乎同时执行,最大限度确保了数据不会丢失,到期时间为 retry_after ,因此可以避免任务丢失的情况。

retrieveNextJob() 方法返回一个数组:

$nextJob = array(2) {
 [0]=>
 string(550) "{"maxTries":null,"attempts":1,"timeout":null,"job":"Illuminate\\Queue\\CallQueuedHandler@call","id":"oe3DnfK5dUZ2alEfJjOb7R5L
YIzqMxFX","timeoutAt":null,"data":{"command":"O:16:\"App\\Jobs\\TestJob\":8:{s:4:\"data\";a:2:{s:4:\"data\";s:10:\"tNZ0DTq39w\";s:4:\"time\"
;s:19:\"2020-02-10 04:29:02\";}s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";s:5:\"redis\";s:5:\"queue\";s:9:\"testqueue\";s:15:\"chainConn
ection\";N;s:10:\"chainQueue\";N;s:5:\"delay\";N;s:7:\"chained\";a:0:{}}","commandName":"App\\Jobs\\TestJob"},"displayName":"App\\Jobs\\Test
Job"}"
 [1]=>
 string(550) "{"maxTries":null,"displayName":"App\\Jobs\\TestJob","timeout":null,"job":"Illuminate\\Queue\\CallQueuedHandler@call","id":"oe
3DnfK5dUZ2alEfJjOb7R5LYIzqMxFX","timeoutAt":null,"data":{"command":"O:16:\"App\\Jobs\\TestJob\":8:{s:4:\"data\";a:2:{s:4:\"data\";s:10:\"tNZ
0DTq39w\";s:4:\"time\";s:19:\"2020-02-10 04:29:02\";}s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";s:5:\"redis\";s:5:\"queue\";s:9:\"testqu
eue\";s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:5:\"delay\";N;s:7:\"chained\";a:0:{}}","commandName":"App\\Jobs\\TestJob"},"attempt
s":2}"

[$job, $reserved] = $nextJob;

$job = $nextJob[0];
$reserved = $nextJob[1];

$job 和 r e s e r v e d 是 同 一 个 任 务 , 只 是 reserved 是同一个任务,只是 reservedreserved中attempts自增了1

最终将 j o b 和 job和 jobreserved合起来作为job返回到Worker。传递$reserved的目的是成功执行完之后方便将其删除。

接着就是处理job,回到Worker.php中

public function process($connectionName, $job, WorkerOptions $options)
{
    try {
        // First we will raise the before job event and determine if the job has already ran
        // over its maximum attempt limits, which could primarily happen when this job is
        // continually timing out and not actually throwing any exceptions from itself.
        $this->raiseBeforeJobEvent($connectionName, $job);

        $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
            $connectionName, $job, (int) $options->maxTries
        );

        // Here we will fire off the job and let it process. We will catch any exceptions so
        // they can be reported to the developers logs, etc. Once the job is finished the
        // proper events will be fired to let any listeners know this job has finished.
        $job->fire();

        $this->raiseAfterJobEvent($connectionName, $job);
    } catch (Exception $e) {
        $this->handleJobException($connectionName, $job, $options, $e);
    } catch (Throwable $e) {
        $this->handleJobException(
            $connectionName, $job, $options, new FatalThrowableError($e)
        );
    }
}

当任务执行完或抛出异常就会删掉备份,如果是抛出异常就会写入失败记录。

基于上面的分析,虽然解决了数据安全问题,却暴露了多进程并发的问题。

retry_after = 90
TestJob任务执行时间是100秒
此时,我们添加一个任务到队列。

分别开启两个进程执行消费命令,你会发现,同一个任务,将会被这两个进程无限循环的消费下去,原因很好分析,就不多说了。鉴于此,你
应该将retry_after的值设置为大于任务的最大执行时间,才能保证这个备份在被删除之前不会被其他进程migrate。

三、终止消费进程

php artisan queue:restart

命令文件:vendor/laravel/framework/src/Illuminate/Queue/Console/RestartCommand.php

protected $name = 'queue:restart';

public function handle()
{
    $this->laravel['cache']->forever('illuminate:queue:restart', $this->currentTime());

    $this->info('Broadcasting queue restart signal.');
}

该命令没有参数,可见是全部终止!

该命令会在cache中增加一条记录,由于我设置的cache为redis,所以

key: laravel_cache:illuminate:queue:restart
value: 1581252433

每次执行该命令,value值将会被替换为当前时间戳。

程序在启动的时候会读取这个key作为$lastRestart,然后在消费任务前后都会判断是否有发起新的restart指令,如果有,则终止程序。因此restart命令是安全的,不是暴力的。当然key不会被清除。参考

vendor/laravel/framework/src/Illuminate/Queue/Worker.php
protected function queueShouldRestart($lastRestart)
{
    return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}

/**
 * Get the last queue restart timestamp, or null.
 *
 * @return int|null
 */
protected function getTimestampOfLastQueueRestart()
{
    if ($this->cache) {
        return $this->cache->get('illuminate:queue:restart');
    }
}

四、失败任务

失败任务可以存储在数据库中,可以查看,重试,删除。

重试就是将失败任务重新放回主队列进行消费的过程。

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- ryyc.cn 版权所有 湘ICP备2023022495号-3

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务