3 * Redis-backed job queue code.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
21 * @author Aaron Schulz
25 * Class to handle job queues stored in Redis
30 class JobQueueRedis
extends JobQueue
{
31 /** @var RedisConnectionPool */
34 protected $server; // string; server address
36 const ROOTJOB_TTL
= 1209600; // integer; seconds to remember root jobs (14 days)
37 const MAX_AGE_PRUNE
= 604800; // integer; seconds a job can live once claimed (7 days)
41 * - redisConf : An array of parameters to RedisConnectionPool::__construct().
42 * - server : A hostname/port combination or the absolute path of a UNIX socket.
43 * If a hostname is specified but no port, the standard port number
44 * 6379 will be used. Required.
45 * @param array $params
47 public function __construct( array $params ) {
48 parent
::__construct( $params );
49 $this->server
= $params['redisConf']['server'];
50 $this->redisPool
= RedisConnectionPool
::singleton( $params['redisConf'] );
54 * @see JobQueue::doIsEmpty()
58 protected function doIsEmpty() {
59 if ( mt_rand( 0, 99 ) == 0 ) {
60 $this->doInternalMaintenance();
63 $conn = $this->getConnection();
65 return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 );
66 } catch ( RedisException
$e ) {
67 $this->throwRedisException( $this->server
, $conn, $e );
72 * @see JobQueue::doGetSize()
76 protected function doGetSize() {
77 if ( mt_rand( 0, 99 ) == 0 ) {
78 $this->doInternalMaintenance();
81 $conn = $this->getConnection();
83 return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
84 } catch ( RedisException
$e ) {
85 $this->throwRedisException( $this->server
, $conn, $e );
90 * @see JobQueue::doGetAcquiredCount()
94 protected function doGetAcquiredCount() {
95 if ( mt_rand( 0, 99 ) == 0 ) {
96 $this->doInternalMaintenance();
99 $conn = $this->getConnection();
101 if ( $this->claimTTL
> 0 ) {
102 return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
106 } catch ( RedisException
$e ) {
107 $this->throwRedisException( $this->server
, $conn, $e );
112 * @see JobQueue::doBatchPush()
116 * @throws MWException
118 protected function doBatchPush( array $jobs, $flags ) {
119 if ( !count( $jobs ) ) {
123 // Convert the jobs into a list of field maps
124 $items = array(); // (uid => job fields map)
125 foreach ( $jobs as $job ) {
126 $item = $this->getNewJobFields( $job );
127 $items[$item['uid']] = $item;
130 $dedupUids = array(); // list of uids to check for duplicates
131 foreach ( $items as $item ) {
132 if ( $this->isHashUid( $item['uid'] ) ) { // hash identifier => de-duplicate
133 $dedupUids[] = $item['uid'];
137 $conn = $this->getConnection();
139 // Find which of these jobs are duplicates of unclaimed jobs in the queue...
140 if ( count( $dedupUids ) ) {
141 $conn->multi( Redis
::PIPELINE
);
142 foreach ( $dedupUids as $uid ) { // check if job data exists
143 $conn->exists( $this->prefixWithQueueKey( 'data', $uid ) );
145 if ( $this->claimTTL
> 0 ) { // check which jobs were claimed
146 foreach ( $dedupUids as $uid ) {
147 $conn->hExists( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime' );
149 list( $exists, $claimed ) = array_chunk( $conn->exec(), count( $dedupUids ) );
151 $exists = $conn->exec();
152 $claimed = array(); // no claim system
154 // Remove the duplicate jobs to cut down on pushing duplicate uids...
155 foreach ( $dedupUids as $k => $uid ) {
156 if ( $exists[$k] && empty( $claimed[$k] ) ) {
157 unset( $items[$uid] );
161 // Actually push the non-duplicate jobs into the queue...
162 if ( count( $items ) ) {
163 $uids = array_keys( $items );
164 $conn->multi( Redis
::MULTI
); // begin (atomic trx)
165 $conn->mSet( $this->prefixKeysWithQueueKey( 'data', $items ) );
166 call_user_func_array(
167 array( $conn, 'lPush' ),
168 array_merge( array( $this->getQueueKey( 'l-unclaimed' ) ), $uids )
170 $res = $conn->exec(); // commit (atomic trx)
171 if ( in_array( false, $res, true ) ) {
172 wfDebugLog( 'JobQueueRedis', "Could not insert {$this->type} job(s)." );
176 wfIncrStats( 'job-insert', count( $items ) );
177 wfIncrStats( 'job-insert-duplicate', count( $jobs ) - count( $items ) );
178 } catch ( RedisException
$e ) {
179 $this->throwRedisException( $this->server
, $conn, $e );
186 * @see JobQueue::doPop()
188 * @throws MWException
190 protected function doPop() {
193 if ( mt_rand( 0, 99 ) == 0 ) {
194 $this->doInternalMaintenance();
197 $conn = $this->getConnection();
200 // Atomically pop an item off the queue and onto the "claimed" list
201 $uid = $conn->rpoplpush(
202 $this->getQueueKey( 'l-unclaimed' ),
203 $this->getQueueKey( 'l-claimed' )
205 if ( $uid === false ) {
206 break; // no jobs; nothing to do
209 wfIncrStats( 'job-pop' );
210 $conn->multi( Redis
::PIPELINE
);
211 $conn->get( $this->prefixWithQueueKey( 'data', $uid ) );
212 if ( $this->claimTTL
> 0 ) {
213 // Set the claim timestamp metadata. If this step fails, then
214 // the timestamp will be assumed to be the current timestamp by
215 // recycleAndDeleteStaleJobs() as of the next time that it runs.
216 // If two runners claim duplicate jobs, one will abort here.
217 $conn->hSetNx( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime', time() );
219 // If this fails, the message key will be deleted in cleanupClaimedJobs().
220 // If two runners claim duplicate jobs, one of them will abort here.
222 $this->prefixWithQueueKey( 'h-meta', $uid ),
223 $this->prefixWithQueueKey( 'data', $uid ) );
225 list( $item, $ok ) = $conn->exec();
226 if ( $item === false ||
( $this->claimTTL
&& !$ok ) ) {
227 wfDebug( "Could not find or delete job $uid; probably was a duplicate." );
228 continue; // job was probably a duplicate
231 // If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed
232 $job = $this->getJobFromFields( $item ); // may be false
233 } while ( !$job ); // job may be false if invalid
234 } catch ( RedisException
$e ) {
235 $this->throwRedisException( $this->server
, $conn, $e );
238 // Flag this job as an old duplicate based on its "root" job...
240 if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
241 wfIncrStats( 'job-pop-duplicate' );
242 return DuplicateJob
::newFromJob( $job ); // convert to a no-op
244 } catch ( MWException
$e ) {} // don't lose jobs over this
250 * @see JobQueue::doAck()
253 * @throws MWException
255 protected function doAck( Job
$job ) {
256 if ( $this->claimTTL
> 0 ) {
257 $conn = $this->getConnection();
259 // Get the exact field map this Job came from, regardless of whether
260 // the job was transformed into a DuplicateJob or anything of the sort.
261 $item = $job->metadata
['sourceFields'];
263 $conn->multi( Redis
::MULTI
); // begin (atomic trx)
264 // Remove the first instance of this job scanning right-to-left.
265 // This is O(N) in the worst case, but is likely to be much faster since
266 // jobs are pushed to the left and we are starting from the right, where
267 // the longest running jobs are likely to be. These should be the first
268 // jobs to be acknowledged assuming that job run times are roughly equal.
269 $conn->lRem( $this->getQueueKey( 'l-claimed' ), $item['uid'], -1 );
270 // Delete the job data and its claim metadata
272 $this->prefixWithQueueKey( 'h-meta', $item['uid'] ),
273 $this->prefixWithQueueKey( 'data', $item['uid'] ) );
274 $res = $conn->exec(); // commit (atomic trx)
276 if ( in_array( false, $res, true ) ) {
277 wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
280 } catch ( RedisException
$e ) {
281 $this->throwRedisException( $this->server
, $conn, $e );
288 * @see JobQueue::doDeduplicateRootJob()
291 * @throws MWException
293 protected function doDeduplicateRootJob( Job
$job ) {
294 $params = $job->getParams();
295 if ( !isset( $params['rootJobSignature'] ) ) {
296 throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
297 } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
298 throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
300 $key = $this->getRootJobKey( $params['rootJobSignature'] );
302 $conn = $this->getConnection();
304 $timestamp = $conn->get( $key ); // current last timestamp of this job
305 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
306 return true; // a newer version of this root job was enqueued
308 // Update the timestamp of the last root job started at the location...
309 return $conn->set( $key, $params['rootJobTimestamp'], self
::ROOTJOB_TTL
); // 2 weeks
310 } catch ( RedisException
$e ) {
311 $this->throwRedisException( $this->server
, $conn, $e );
316 * Check if the "root" job of a given job has been superseded by a newer one
320 * @throws MWException
322 protected function isRootJobOldDuplicate( Job
$job ) {
323 $params = $job->getParams();
324 if ( !isset( $params['rootJobSignature'] ) ) {
325 return false; // job has no de-deplication info
326 } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
327 wfDebugLog( 'JobQueueRedis', "Cannot check root job; missing 'rootJobTimestamp'." );
331 $conn = $this->getConnection();
333 // Get the last time this root job was enqueued
334 $timestamp = $conn->get( $this->getRootJobKey( $params['rootJobSignature'] ) );
335 } catch ( RedisException
$e ) {
336 $this->throwRedisException( $this->server
, $conn, $e );
339 // Check if a new root job was started at the location after this one's...
340 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
344 * Do any job recycling or queue cleanup as needed
347 * @return integer Number of jobs recycled/deleted
348 * @throws MWException
350 protected function doInternalMaintenance() {
351 return ( $this->claimTTL
> 0 ) ?
352 $this->recycleAndDeleteStaleJobs() : $this->cleanupClaimedJobs();
356 * Recycle or destroy any jobs that have been claimed for too long
358 * @return integer Number of jobs recycled/deleted
359 * @throws MWException
361 protected function recycleAndDeleteStaleJobs() {
363 // For each job item that can be retried, we need to add it back to the
364 // main queue and remove it from the list of currenty claimed job items.
365 $conn = $this->getConnection();
367 // Avoid duplicate insertions of items to be re-enqueued
368 $conn->multi( Redis
::MULTI
);
369 $conn->setnx( $this->getQueueKey( 'lock' ), 1 );
370 $conn->expire( $this->getQueueKey( 'lock' ), 3600 );
371 if ( $conn->exec() !== array( true, true ) ) { // lock
372 return $count; // already in progress
376 $claimCutoff = $now - $this->claimTTL
;
377 $pruneCutoff = $now - self
::MAX_AGE_PRUNE
;
379 // Get the list of all claimed jobs
380 $claimedUids = $conn->lRange( $this->getQueueKey( 'l-claimed' ), 0, -1 );
381 // Get a map of (uid => claim metadata) for all claimed jobs
382 $metadata = $conn->mGet( $this->prefixValuesWithQueueKey( 'h-meta', $claimedUids ) );
384 $uidsPush = array(); // items IDs to move to the "unclaimed" queue
385 $uidsRemove = array(); // item IDs to remove from "claimed" queue
386 foreach ( $claimedUids as $i => $uid ) { // all claimed items
387 $info = $metadata[$i] ?
$metadata[$i] : array();
388 if ( isset( $info['ctime'] ) ||
isset( $info['rctime'] ) ) {
389 // Prefer "ctime" (set by pop()) over "rctime" (set by this function)
390 $ctime = isset( $info['ctime'] ) ?
$info['ctime'] : $info['rctime'];
391 // Claimed job claimed for too long?
392 if ( $ctime < $claimCutoff ) {
393 // Get the number of failed attempts
394 $attempts = isset( $info['attempts'] ) ?
$info['attempts'] : 0;
395 if ( $attempts < self
::MAX_ATTEMPTS
) {
396 $uidsPush[] = $uid; // retry it
397 } elseif ( $ctime < $pruneCutoff ) {
398 $uidsRemove[] = $uid; // just remove it
402 // If pop() failed to set the claim timestamp, set it to the current time.
403 // Since that function sets this non-atomically *after* moving the job to
404 // the "claimed" queue, it may be the case that it just didn't set it yet.
405 $conn->hSet( $this->prefixWithQueueKey( 'h-meta', $uid ), 'rctime', $now );
409 $conn->multi( Redis
::MULTI
); // begin (atomic trx)
410 if ( count( $uidsPush ) ) { // move from "l-claimed" to "l-unclaimed"
411 call_user_func_array(
412 array( $conn, 'lPush' ),
413 array_merge( array( $this->getQueueKey( 'l-unclaimed' ) ), $uidsPush )
415 foreach ( $uidsPush as $uid ) {
416 $conn->lRem( $this->getQueueKey( 'l-claimed' ), $uid, -1 );
417 $conn->hDel( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime', 'rctime' );
418 $conn->hIncrBy( $this->prefixWithQueueKey( 'h-meta', $uid ), 'attempts', 1 );
421 foreach ( $uidsRemove as $uid ) { // remove from "l-claimed"
422 $conn->lRem( $this->getQueueKey( 'l-claimed' ), $uid, -1 );
423 $conn->delete( // delete job data and metadata
424 $this->prefixWithQueueKey( 'h-meta', $uid ),
425 $this->prefixWithQueueKey( 'data', $uid ) );
427 $res = $conn->exec(); // commit (atomic trx)
429 if ( in_array( false, $res, true ) ) {
430 wfDebugLog( 'JobQueueRedis', "Could not recycle {$this->type} job(s)." );
432 $count +
= ( count( $uidsPush ) +
count( $uidsRemove ) );
433 wfIncrStats( 'job-recycle', count( $uidsPush ) );
436 $conn->delete( $this->getQueueKey( 'lock' ) ); // unlock
437 } catch ( RedisException
$e ) {
438 $this->throwRedisException( $this->server
, $conn, $e );
445 * Destroy any jobs that have been claimed
447 * @return integer Number of jobs deleted
448 * @throws MWException
450 protected function cleanupClaimedJobs() {
452 // Make sure the message for claimed jobs was deleted
453 // and remove the claimed job IDs from the "claimed" list.
454 $conn = $this->getConnection();
456 // Avoid races and duplicate effort
457 $conn->multi( Redis
::MULTI
);
458 $conn->setnx( $this->getQueueKey( 'lock' ), 1 );
459 $conn->expire( $this->getQueueKey( 'lock' ), 3600 );
460 if ( $conn->exec() !== array( true, true ) ) { // lock
461 return $count; // already in progress
463 // Get the list of all claimed jobs
464 $uids = $conn->lRange( $this->getQueueKey( 'l-claimed' ), 0, -1 );
465 if ( count( $uids ) ) {
466 // Delete the message keys and delist the corresponding ids.
467 // Since the only other changes to "l-claimed" are left pushes, we can just strip
468 // off the elements read here using a right trim based on the number of ids read.
469 $conn->multi( Redis
::MULTI
); // begin (atomic trx)
470 $conn->lTrim( $this->getQueueKey( 'l-claimed' ), 0, -count( $uids ) - 1 );
471 $conn->delete( array_merge(
472 $this->prefixValuesWithQueueKey( 'h-meta', $uids ),
473 $this->prefixValuesWithQueueKey( 'data', $uids )
475 $res = $conn->exec(); // commit (atomic trx)
477 if ( in_array( false, $res, true ) ) {
478 wfDebugLog( 'JobQueueRedis', "Could not purge {$this->type} job(s)." );
480 $count +
= count( $uids );
483 $conn->delete( $this->getQueueKey( 'lock' ) ); // unlock
484 } catch ( RedisException
$e ) {
485 $this->throwRedisException( $this->server
, $conn, $e );
495 protected function getNewJobFields( Job
$job ) {
497 // Fields that describe the nature of the job
498 'type' => $job->getType(),
499 'namespace' => $job->getTitle()->getNamespace(),
500 'title' => $job->getTitle()->getDBkey(),
501 'params' => $job->getParams(),
502 // Additional metadata
503 'uid' => $job->ignoreDuplicates()
504 ?
wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
505 : wfRandomString( 32 ),
506 'timestamp' => time() // UNIX timestamp
511 * @param $fields array
514 protected function getJobFromFields( array $fields ) {
515 $title = Title
::makeTitleSafe( $fields['namespace'], $fields['title'] );
517 $job = Job
::factory( $fields['type'], $title, $fields['params'] );
518 $job->metadata
['sourceFields'] = $fields;
525 * @param $uid string Job UID
526 * @return bool Whether $uid is a SHA-1 hash based identifier for de-duplication
528 protected function isHashUid( $uid ) {
529 return strlen( $uid ) == 31;
533 * Get a connection to the server that handles all sub-queues for this queue
535 * @return Array (server name, Redis instance)
536 * @throws MWException
538 protected function getConnection() {
539 $conn = $this->redisPool
->getConnection( $this->server
);
541 throw new MWException( "Unable to connect to redis server." );
547 * @param $server string
548 * @param $conn RedisConnRef
549 * @param $e RedisException
550 * @throws MWException
552 protected function throwRedisException( $server, RedisConnRef
$conn, $e ) {
553 $this->redisPool
->handleException( $server, $conn, $e );
554 throw new MWException( "Redis server error: {$e->getMessage()}\n" );
558 * @param $prop string
561 private function getQueueKey( $prop ) {
562 list( $db, $prefix ) = wfSplitWikiID( $this->wiki
);
563 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type
, $prop );
567 * @param string $signature Hash identifier of the root job
570 private function getRootJobKey( $signature ) {
571 list( $db, $prefix ) = wfSplitWikiID( $this->wiki
);
572 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type
, 'rootjob', $signature );
576 * @param $prop string
577 * @param $string string
580 private function prefixWithQueueKey( $prop, $string ) {
581 return $this->getQueueKey( $prop ) . ':' . $string;
585 * @param $prop string
586 * @param $items array
589 private function prefixValuesWithQueueKey( $prop, array $items ) {
591 foreach ( $items as $item ) {
592 $res[] = $this->prefixWithQueueKey( $prop, $item );
598 * @param $prop string
599 * @param $items array
602 private function prefixKeysWithQueueKey( $prop, array $items ) {
604 foreach ( $items as $key => $item ) {
605 $res[$this->prefixWithQueueKey( $prop, $key )] = $item;