[JobQueue] Improved job recycle rate for small queues.
authorAaron Schulz <aschulz@wikimedia.org>
Sat, 12 Jan 2013 00:13:29 +0000 (16:13 -0800)
committerAaron Schulz <aschulz@wikimedia.org>
Wed, 6 Feb 2013 17:49:34 +0000 (09:49 -0800)
* Make the recycling a bit more periodic rather than based on
  how often pop() gets called essentially. This works better if
  a queue does not have jobs inserted very often.

Change-Id: I64fbc8afbb1cf096717ba4bfc6fe7b7715abdb72

includes/job/JobQueue.php
includes/job/JobQueueDB.php
includes/job/JobQueueGroup.php
maintenance/nextJobDB.php
maintenance/runJobs.php

index 92beb2c..4e0acd2 100644 (file)
@@ -324,4 +324,32 @@ abstract class JobQueue {
         * @return void
         */
        protected function doWaitForBackups() {}
+
+       /**
+        * Return a map of task names to task definition maps.
+        * A "task" is a fast periodic queue maintenance action.
+        * Mutually exclusive tasks must implement their own locking in the callback.
+        *
+        * Each task value is an associative array with:
+        *   - name     : the name of the task
+        *   - callback : a PHP callable that performs the task
+        *   - period   : the period in seconds corresponding to the task frequency
+        *
+        * @return Array
+        */
+       final public function getPeriodicTasks() {
+               $tasks = $this->doGetPeriodicTasks();
+               foreach ( $tasks as $name => &$def ) {
+                       $def['name'] = $name;
+               }
+               return $tasks;
+       }
+
+       /**
+        * @see JobQueue::getPeriodicTasks()
+        * @return Array
+        */
+       protected function doGetPeriodicTasks() {
+               return array();
+       }
 }
index 51b35fd..4df5c07 100644 (file)
@@ -28,6 +28,7 @@
  * @since 1.21
  */
 class JobQueueDB extends JobQueue {
+       const ROOTJOB_TTL     = 1209600; // integer; seconds to remember root jobs (14 days)
        const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
        const CACHE_TTL_LONG  = 300; // integer; seconds to cache info that is kept up to date
        const MAX_AGE_PRUNE   = 604800; // integer; seconds a job can live once claimed
@@ -215,10 +216,6 @@ class JobQueueDB extends JobQueue {
 
                $uuid = wfRandomString( 32 ); // pop attempt
                $job = false; // job popped off
-               // Occasionally recycle jobs back into the queue that have been claimed too long
-               if ( mt_rand( 0, 99 ) == 0 ) {
-                       $this->recycleStaleJobs();
-               }
                do { // retry when our row is invalid or deleted as a duplicate
                        // Try to reserve a row in the DB...
                        if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
@@ -401,10 +398,10 @@ class JobQueueDB extends JobQueue {
         *
         * @return integer Number of jobs recycled/deleted
         */
-       protected function recycleStaleJobs() {
+       public function recycleAndDeleteStaleJobs() {
                global $wgMemc;
 
-               $now   = time();
+               $now = time();
                list( $dbw, $scope ) = $this->getMasterDB();
                $count = 0; // affected rows
 
@@ -519,7 +516,7 @@ class JobQueueDB extends JobQueue {
                        }
 
                        // Update the timestamp of the last root job started at the location...
-                       return $wgMemc->set( $key, $params['rootJobTimestamp'], 14*86400 ); // 2 weeks
+                       return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
                } );
 
                return true;
@@ -557,6 +554,18 @@ class JobQueueDB extends JobQueue {
                wfWaitForSlaves();
        }
 
+       /**
+        * @return Array
+        */
+       protected function doGetPeriodicTasks() {
+               return array(
+                       'recycleAndDeleteStaleJobs' => array(
+                               'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
+                               'period'   => ceil( $this->claimTTL / 2 )
+                       )
+               );
+       }
+
        /**
         * @return Array (DatabaseBase, ScopedCallback)
         */
index cf0215b..6d9d590 100644 (file)
@@ -228,4 +228,58 @@ class JobQueueGroup {
                }
                return $types;
        }
+
+       /**
+        * Execute any due periodic queue maintenance tasks for all queues.
+        *
+        * A task is "due" if the time ellapsed since the last run is greater than
+        * the defined run period. Concurrent calls to this function will cause tasks
+        * to be attempted twice, so they may need their own methods of mutual exclusion.
+        *
+        * @return integer Number of tasks run
+        */
+       public function executeReadyPeriodicTasks() {
+               global $wgMemc;
+
+               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+               $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
+               $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
+
+               $count = 0;
+               $tasksRun = array(); // (queue => task => UNIX timestamp)
+               foreach ( $this->getQueueTypes() as $type ) {
+                       $queue = $this->get( $type );
+                       foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
+                               if ( $definition['period'] <= 0 ) {
+                                       continue; // disabled
+                               } elseif ( !isset( $lastRuns[$type][$task] )
+                                       || $lastRuns[$type][$task] < ( time() - $definition['period'] ) )
+                               {
+                                       if ( call_user_func( $definition['callback'] ) !== null ) {
+                                               $tasksRun[$type][$task] = time();
+                                               ++$count;
+                                       }
+                               }
+                       }
+               }
+
+               $wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) {
+                       if ( is_array( $lastRuns ) ) {
+                               foreach ( $tasksRun as $type => $tasks ) {
+                                       foreach ( $tasks as $task => $timestamp ) {
+                                               if ( !isset( $lastRuns[$type][$task] )
+                                                       || $timestamp > $lastRuns[$type][$task] )
+                                               {
+                                                       $lastRuns[$type][$task] = $timestamp;
+                                               }
+                                       }
+                               }
+                       } else {
+                               $lastRuns = $tasksRun;
+                       }
+                       return $lastRuns;
+               } );
+
+               return $count;
+       }
 }
index 032d6f9..6606375 100644 (file)
@@ -48,6 +48,9 @@ class nextJobDB extends Maintenance {
                        $types = JobQueueGroup::singleton()->getDefaultQueueTypes();
                }
 
+               // Handle any required periodic queue maintenance
+               $this->executeReadyPeriodicTasks();
+
                $memcKey = 'jobqueue:dbs:v3';
                $pendingDbInfo = $wgMemc->get( $memcKey );
 
@@ -78,6 +81,7 @@ class nextJobDB extends Maintenance {
                        return; // no DBs with jobs or cache is both empty and locked
                }
 
+               $type = $this->getOption( 'type', false );
                $pendingDBs = $pendingDbInfo['pendingDBs']; // convenience
                do {
                        $again = false;
@@ -156,14 +160,36 @@ class nextJobDB extends Maintenance {
 
                $pendingDBs = array(); // (job type => (db list))
                foreach ( $wgLocalDatabases as $db ) {
-                       $types = JobQueueGroup::singleton( $db )->getQueuesWithJobs();
-                       foreach ( $types as $type ) {
+                       foreach ( JobQueueGroup::singleton( $db )->getQueuesWithJobs() as $type ) {
                                $pendingDBs[$type][] = $db;
                        }
                }
 
                return $pendingDBs;
        }
+
+       /**
+        * Do all ready periodic jobs for all databases every 5 minutes (and .1% of the time)
+        * @return integer
+        */
+       private function executeReadyPeriodicTasks() {
+               global $wgLocalDatabases, $wgMemc;
+
+               $count = 0;
+               $memcKey = 'jobqueue:periodic:lasttime';
+               $timestamp = (int)$wgMemc->get( $memcKey ); // UNIX timestamp or 0
+               if ( ( time() - $timestamp ) > 300 || mt_rand( 0, 999 ) == 0 ) { // 5 minutes
+                       if ( $wgMemc->add( "$memcKey:rebuild", 1, 1800 ) ) { // lock
+                               foreach ( $wgLocalDatabases as $db ) {
+                                       $count += JobQueueGroup::singleton( $db )->executeReadyPeriodicTasks();
+                               }
+                               $wgMemc->set( $memcKey, time() );
+                               $wgMemc->delete( "$memcKey:rebuild" ); // unlock
+                       }
+               }
+
+               return $count;
+       }
 }
 
 $maintClass = "nextJobDb";
index 0cf0217..a78acd5 100644 (file)
@@ -76,6 +76,12 @@ class RunJobs extends Maintenance {
                $n = 0;
 
                $group = JobQueueGroup::singleton();
+               // Handle any required periodic queue maintenance
+               $count = $group->executeReadyPeriodicTasks();
+               if ( $count > 0 ) {
+                       $this->runJobsLog( "Executed $count periodic queue task(s)." );
+               }
+
                do {
                        $job = ( $type === false )
                                ? $group->pop( JobQueueGroup::TYPE_DEFAULT, JobQueueGroup::USE_CACHE )