Laravel5.7
队列驱动为redis
缓存驱动 CACHE_DRIVER=redis
config/queue.php
retry_after: 90
block_for: null
// 配置赋值方式
return new RedisQueue(
$this->redis, $config['queue'],
$config['connection'] ?? $this->connection,
$config['retry_after'] ?? 60,
$config['block_for'] ?? null
);
可见如果retry_after设置null,将会重置为60。
block_for可以是null。
一,投递队列
1、普通队列
$data = ['data' => Str::random(10), 'time' => date('Y-m-d H:i:s')];
dispatch(new TestJob($data))->onConnection('redis')->onQueue('testqueue');
普通队列会存储在一个list里面:queues:testqueue
每个元素就是任务的具体内容:
{"displayName":"App\\Jobs\\TestJob","job":"Illuminate\\Queue\\CallQueuedHandler@call","maxTries":null,"timeout":null,"timeoutAt":null,"data":{"commandName":"App\\Jobs\\TestJob","command":"O:16:\"App\\Jobs\\TestJob\":8:{s:4:\"data\";a:2:{s:4:\"data\";s:10:\"WKSmRQJsbs\";s:4:\"time\";s:19:\"2020-02-09 11:40:51\";}s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";s:5:\"redis\";s:5:\"queue\";s:9:\"testqueue\";s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:5:\"delay\";N;s:7:\"chained\";a:0:{}}"},"id":"0NB0RK9CKQRVbbuskTWUSs8Lp91XqYzW","attempts":0}
2、延迟队列
$data = ['data' => Str::random(10), 'time' => date('Y-m-d H:i:s')];
dispatch(new TestJob($data))->delay(now()->addMinute(20))->onConnection('redis')->onQueue('testqueue');
延迟队列会存储在一个zset里面:queues:testqueue:delayed
每个元素的value为任务的具体内容,score为下次被执行的时间。
value:{"displayName":"App\\Jobs\\TestJob","job":"Illuminate\\Queue\\CallQueuedHandler@call","maxTries":null,"timeout":null,"timeoutAt":null,"data":{"commandName":"App\\Jobs\\TestJob","command":"O:16:\"App\\Jobs\\TestJob\":8:{s:4:\"data\";a:2:{s:4:\"data\";s:10:\"lIRuVmQsdK\";s:4:\"time\";s:19:\"2020-02-09 11:40:59\";}s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";s:5:\"redis\";s:5:\"queue\";s:9:\"testqueue\";s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:5:\"delay\";O:25:\"Illuminate\\Support\\Carbon\":3:{s:4:\"date\";s:26:\"2020-02-09 12:00:59.123314\";s:13:\"timezone_type\";i:3;s:8:\"timezone\";s:3:\"UTC\";}s:7:\"chained\";a:0:{}}"},"id":"8utPrwDVunKYZpol2DgQJdaalZy2szMc","attempts":0}
score: 1581249659
3、retry队列
在 config/queue.php
设置了 retry_after 的值,当进程每次从主队列pop一个任务,同时会备份此任务到retry队列,后面会讲到。
retry队列存储在zset里面:queues:testqueue:reserved
每个元素的value为任务的具体内容,score为下次被执行的时间,也是 retry_after 的值。
value: {"maxTries":null,"attempts":1,"timeout":null,"job":"Illuminate\\Queue\\CallQueuedHandler@call","id":"0NB0RK9CKQRVbbuskTWUSs8Lp91XqYzW","timeoutAt":null,"data":{"command":"O:16:\"App\\Jobs\\TestJob\":8:{s:4:\"data\";a:2:{s:4:\"data\";s:10:\"WKSmRQJsbs\";s:4:\"time\";s:19:\"2020-02-09 11:40:51\";}s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";s:5:\"redis\";s:5:\"queue\";s:9:\"testqueue\";s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:5:\"delay\";N;s:7:\"chained\";a:0:{}}","commandName":"App\\Jobs\\TestJob"},"displayName":"App\\Jobs\\TestJob"}
score: 1581309060
二、执行队列
找到queue:work
命令对应的文件 vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php
命令行参数及注释:
'queue:work
{connection? : The name of the queue connection to work}
{--queue= : The names of the queues to work}
{--daemon : Run the worker in daemon mode (Deprecated)}
{--once : Only process the next job on the queue}
{--stop-when-empty : Stop when the queue is empty}
{--delay=0 : The number of seconds to delay failed jobs}
{--force : Force the worker to run even in maintenance mode}
{--memory=128 : The memory limit in megabytes}
{--sleep=3 : Number of seconds to sleep when no job is available}
{--timeout=60 : The number of seconds a child process can run}
{--tries=0 : Number of times to attempt a job before logging it failed}';
php artisan queue:work redis --queue=testqueue
关键代码:
protected function runWorker($connection, $queue)
{
$this->worker->setCache($this->laravel['cache']->driver());
return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
$connection, $queue, $this->gatherWorkerOptions()
);
}
只要没设置 --once 参数,那么就是daemon运行。
vendor/laravel/framework/src/Illuminate/Queue/Worker.php
public function daemon($connectionName, $queue, WorkerOptions $options)
{
if ($this->supportsAsyncSignals()) {
$this->listenForSignals();
}
$lastRestart = $this->getTimestampOfLastQueueRestart();
while (true) {
// Before reserving any jobs, we will make sure this queue is not paused and
// if it is we will just pause this worker for a given amount of time and
// make sure we do not need to kill this worker process off completely.
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
$this->pauseWorker($options, $lastRestart);
continue;
}
// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
if ($this->supportsAsyncSignals()) {
$this->registerTimeoutHandler($job, $options);
}
// If the daemon should run (not in maintenance mode, etc.), then we can run
// fire off this job for processing. Otherwise, we will need to sleep the
// worker so no more jobs are processed until they should be processed.
if ($job) {
$this->runJob($job, $connectionName, $options);
} else {
$this->sleep($options->sleep);
}
// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
$this->stopIfNecessary($options, $lastRestart, $job);
}
}
// 判断是否要终止程序
protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null)
{
if ($this->shouldQuit) {
$this->stop();
} elseif ($this->memoryExceeded($options->memory)) {
$this->stop(12);
} elseif ($this->queueShouldRestart($lastRestart)) {
$this->stop();
} elseif ($options->stopWhenEmpty && is_null($job)) {
$this->stop();
}
}
// 获取任务
protected function getNextJob($connection, $queue)
{
try {
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
}
}
} catch (Exception $e) {
$this->exceptions->report($e);
$this->stopWorkerIfLostConnection($e);
$this->sleep(1);
} catch (Throwable $e) {
$this->exceptions->report($e = new FatalThrowableError($e));
$this->stopWorkerIfLostConnection($e);
$this->sleep(1);
}
}
我们知道 --queue 参数可以设置多个队列,以逗号隔开,并且排在前面的先被消费完,才开始消费后面的队列,getNextJob方法可以说明这一点。
pop方法,我们定位到 vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php
public function pop($queue = null)
{
$this->migrate($prefixed = $this->getQueue($queue));
if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
return;
}
[$job, $reserved] = $nextJob;
if ($reserved) {
return new RedisJob(
$this->container, $this, $job,
$reserved, $this->connectionName, $queue ?: $this->default
);
}
}
$prefixed 为队列名 queues:testqueue
migrate 方法的作用是将延迟队列和retry队列中符合时间要求的任务迁移到主队列,并且是放在队列头部,优先消费。
我设置的 retry_after = 90,根据retry_after的赋值可以知道$this->retryAfter
永远是非null的,那么delayed和reserved永远会被migrate。
protected function migrate($queue)
{
$this->migrateExpiredJobs($queue.':delayed', $queue);
if (! is_null($this->retryAfter)) {
$this->migrateExpiredJobs($queue.':reserved', $queue);
}
}
public function migrateExpiredJobs($from, $to)
{
return $this->getConnection()->eval(
LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->currentTime()
);
}
public static function migrateExpiredJobs()
{
return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
end
end
return val
LUA;
}
迁移完之后,就开始获取主队列中的任务。
protected function retrieveNextJob($queue)
{
if (! is_null($this->blockFor)) {
return $this->blockingPop($queue);
}
return $this->getConnection()->eval(
LuaScripts::pop(), 2, $queue, $queue.':reserved',
$this->availableAt($this->retryAfter)
);
}
我设置的 block_for 为 null
由此可见,pop出来的任务都会被备份到retry队列中,成功执行完后就被删除。
那么,为什么要备份呢呢?试想一下,如果程序pop出一个任务,然后程序在处理过程中意外挂掉了,那么这个任务是不是就丢失了,显然这是不允许出现的,所以需要先备份。
LuaScripts::pop() 方法:
public static function pop()
{
return <<<'LUA'
-- Pop the first job off of the queue...
local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
-- Increment the attempt count and place job on the reserved queue...
reserved = cjson.decode(job)
reserved['attempts'] = reserved['attempts'] + 1
reserved = cjson.encode(reserved)
redis.call('zadd', KEYS[2], ARGV[1], reserved)
end
return {job, reserved}
LUA;
}
使用lua脚本可实现原子化操作,此方法确保了pop和zadd操作几乎同时执行,最大限度确保了数据不会丢失,到期时间为 retry_after ,因此可以避免任务丢失的情况。
retrieveNextJob() 方法返回一个数组:
$nextJob = array(2) {
[0]=>
string(550) "{"maxTries":null,"attempts":1,"timeout":null,"job":"Illuminate\\Queue\\CallQueuedHandler@call","id":"oe3DnfK5dUZ2alEfJjOb7R5L
YIzqMxFX","timeoutAt":null,"data":{"command":"O:16:\"App\\Jobs\\TestJob\":8:{s:4:\"data\";a:2:{s:4:\"data\";s:10:\"tNZ0DTq39w\";s:4:\"time\"
;s:19:\"2020-02-10 04:29:02\";}s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";s:5:\"redis\";s:5:\"queue\";s:9:\"testqueue\";s:15:\"chainConn
ection\";N;s:10:\"chainQueue\";N;s:5:\"delay\";N;s:7:\"chained\";a:0:{}}","commandName":"App\\Jobs\\TestJob"},"displayName":"App\\Jobs\\Test
Job"}"
[1]=>
string(550) "{"maxTries":null,"displayName":"App\\Jobs\\TestJob","timeout":null,"job":"Illuminate\\Queue\\CallQueuedHandler@call","id":"oe
3DnfK5dUZ2alEfJjOb7R5LYIzqMxFX","timeoutAt":null,"data":{"command":"O:16:\"App\\Jobs\\TestJob\":8:{s:4:\"data\";a:2:{s:4:\"data\";s:10:\"tNZ
0DTq39w\";s:4:\"time\";s:19:\"2020-02-10 04:29:02\";}s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";s:5:\"redis\";s:5:\"queue\";s:9:\"testqu
eue\";s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:5:\"delay\";N;s:7:\"chained\";a:0:{}}","commandName":"App\\Jobs\\TestJob"},"attempt
s":2}"
[$job, $reserved] = $nextJob;
$job = $nextJob[0];
$reserved = $nextJob[1];
$job 和 r e s e r v e d 是 同 一 个 任 务 , 只 是 reserved 是同一个任务,只是 reserved是同一个任务,只是reserved中attempts自增了1
最终将 j o b 和 job和 job和reserved合起来作为job返回到Worker。传递$reserved的目的是成功执行完之后方便将其删除。
接着就是处理job,回到Worker.php中
public function process($connectionName, $job, WorkerOptions $options)
{
try {
// First we will raise the before job event and determine if the job has already ran
// over its maximum attempt limits, which could primarily happen when this job is
// continually timing out and not actually throwing any exceptions from itself.
$this->raiseBeforeJobEvent($connectionName, $job);
$this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
$connectionName, $job, (int) $options->maxTries
);
// Here we will fire off the job and let it process. We will catch any exceptions so
// they can be reported to the developers logs, etc. Once the job is finished the
// proper events will be fired to let any listeners know this job has finished.
$job->fire();
$this->raiseAfterJobEvent($connectionName, $job);
} catch (Exception $e) {
$this->handleJobException($connectionName, $job, $options, $e);
} catch (Throwable $e) {
$this->handleJobException(
$connectionName, $job, $options, new FatalThrowableError($e)
);
}
}
当任务执行完或抛出异常就会删掉备份,如果是抛出异常就会写入失败记录。
基于上面的分析,虽然解决了数据安全问题,却暴露了多进程并发的问题。
retry_after = 90
TestJob任务执行时间是100秒
此时,我们添加一个任务到队列。
分别开启两个进程执行消费命令,你会发现,同一个任务,将会被这两个进程无限循环的消费下去,原因很好分析,就不多说了。鉴于此,你
应该将retry_after的值设置为大于任务的最大执行时间,才能保证这个备份在被删除之前不会被其他进程migrate。
三、终止消费进程
php artisan queue:restart
命令文件:vendor/laravel/framework/src/Illuminate/Queue/Console/RestartCommand.php
protected $name = 'queue:restart';
public function handle()
{
$this->laravel['cache']->forever('illuminate:queue:restart', $this->currentTime());
$this->info('Broadcasting queue restart signal.');
}
该命令没有参数,可见是全部终止!
该命令会在cache中增加一条记录,由于我设置的cache为redis,所以
key: laravel_cache:illuminate:queue:restart
value: 1581252433
每次执行该命令,value值将会被替换为当前时间戳。
程序在启动的时候会读取这个key作为$lastRestart,然后在消费任务前后都会判断是否有发起新的restart指令,如果有,则终止程序。因此restart命令是安全的,不是暴力的。当然key不会被清除。参考
vendor/laravel/framework/src/Illuminate/Queue/Worker.php
protected function queueShouldRestart($lastRestart)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}
/**
* Get the last queue restart timestamp, or null.
*
* @return int|null
*/
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache) {
return $this->cache->get('illuminate:queue:restart');
}
}
四、失败任务
失败任务可以存储在数据库中,可以查看,重试,删除。
重试就是将失败任务重新放回主队列进行消费的过程。
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- ryyc.cn 版权所有 湘ICP备2023022495号-3
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务