Removed executeReadyPeriodicTasks() method
authorAaron Schulz <aschulz@wikimedia.org>
Thu, 7 May 2015 03:06:01 +0000 (20:06 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Thu, 7 May 2015 03:20:40 +0000 (20:20 -0700)
* Moved all these hacks to JobQueueDB, which is the only queue that
  should need this (for stock installs). Newer queues should always
  have the queue store manage stuff like this, not MediaWiki.
* This also avoids expensive object construction that does nothing
  when non-DB queues are used.

Change-Id: Id718cda25750be73044a049b39958cca55aa3172

includes/jobqueue/JobQueue.php
includes/jobqueue/JobQueueDB.php
includes/jobqueue/JobQueueFederated.php
includes/jobqueue/JobQueueGroup.php
includes/jobqueue/JobRunner.php

index 7df85ff..fd4234d 100644 (file)
@@ -548,35 +548,6 @@ abstract class JobQueue {
        protected function doWaitForBackups() {
        }
 
-       /**
-        * Return a map of task names to task definition maps.
-        * A "task" is a fast periodic queue maintenance action.
-        * Mutually exclusive tasks must implement their own locking in the callback.
-        *
-        * Each task value is an associative array with:
-        *   - name     : the name of the task
-        *   - callback : a PHP callable that performs the task
-        *   - period   : the period in seconds corresponding to the task frequency
-        *
-        * @return array
-        */
-       final public function getPeriodicTasks() {
-               $tasks = $this->doGetPeriodicTasks();
-               foreach ( $tasks as $name => &$def ) {
-                       $def['name'] = $name;
-               }
-
-               return $tasks;
-       }
-
-       /**
-        * @see JobQueue::getPeriodicTasks()
-        * @return array
-        */
-       protected function doGetPeriodicTasks() {
-               return array();
-       }
-
        /**
         * Clear any process and persistent caches
         *
index b1b650b..49de4b4 100644 (file)
@@ -301,6 +301,12 @@ class JobQueueDB extends JobQueue {
                                $job->metadata['id'] = $row->job_id;
                                break; // done
                        } while ( true );
+
+                       if ( !$job || mt_rand( 0, 9 ) == 0 ) {
+                               // Handled jobs that need to be recycled/deleted;
+                               // any recycled jobs will be picked up next attempt
+                               $this->recycleAndDeleteStaleJobs();
+                       }
                } catch ( DBError $e ) {
                        $this->throwDBException( $e );
                }
@@ -535,18 +541,6 @@ class JobQueueDB extends JobQueue {
                wfWaitForSlaves( false, $this->wiki, $this->cluster ?: false );
        }
 
-       /**
-        * @return array
-        */
-       protected function doGetPeriodicTasks() {
-               return array(
-                       'recycleAndDeleteStaleJobs' => array(
-                               'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
-                               'period' => ceil( $this->claimTTL / 2 )
-                       )
-               );
-       }
-
        /**
         * @return void
         */
@@ -589,6 +583,10 @@ class JobQueueDB extends JobQueue {
 
        protected function doGetSiblingQueuesWithJobs( array $types ) {
                $dbr = $this->getSlaveDB();
+               // @note: this does not check whether the jobs are claimed or not.
+               // This is useful so JobQueueGroup::pop() also sees queues that only
+               // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
+               // failed jobs so that they can be popped again for that edge case.
                $res = $dbr->select( 'job', 'DISTINCT job_cmd',
                        array( 'job_cmd' => $types ), __METHOD__ );
 
index 178ce8a..b6bb301 100644 (file)
@@ -373,18 +373,6 @@ class JobQueueFederated extends JobQueue {
                $this->throwErrorIfAllPartitionsDown( $failed );
        }
 
-       protected function doGetPeriodicTasks() {
-               $tasks = array();
-               /** @var JobQueue $queue */
-               foreach ( $this->partitionQueues as $partition => $queue ) {
-                       foreach ( $queue->getPeriodicTasks() as $task => $def ) {
-                               $tasks["{$partition}:{$task}"] = $def;
-                       }
-               }
-
-               return $tasks;
-       }
-
        protected function doFlushCaches() {
                static $types = array(
                        'empty',
index ebd547a..fdf7b87 100644 (file)
@@ -341,69 +341,6 @@ class JobQueueGroup {
                return $this->coalescedQueues;
        }
 
-       /**
-        * Execute any due periodic queue maintenance tasks for all queues.
-        *
-        * A task is "due" if the time ellapsed since the last run is greater than
-        * the defined run period. Concurrent calls to this function will cause tasks
-        * to be attempted twice, so they may need their own methods of mutual exclusion.
-        *
-        * @return int Number of tasks run
-        */
-       public function executeReadyPeriodicTasks() {
-               global $wgMemc;
-
-               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
-               $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
-               $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
-
-               $count = 0;
-               $tasksRun = array(); // (queue => task => UNIX timestamp)
-               foreach ( $this->getQueueTypes() as $type ) {
-                       $queue = $this->get( $type );
-                       foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
-                               if ( $definition['period'] <= 0 ) {
-                                       continue; // disabled
-                               } elseif ( !isset( $lastRuns[$type][$task] )
-                                       || $lastRuns[$type][$task] < ( time() - $definition['period'] )
-                               ) {
-                                       try {
-                                               if ( call_user_func( $definition['callback'] ) !== null ) {
-                                                       $tasksRun[$type][$task] = time();
-                                                       ++$count;
-                                               }
-                                       } catch ( JobQueueError $e ) {
-                                               MWExceptionHandler::logException( $e );
-                                       }
-                               }
-                       }
-               }
-
-               if ( $count === 0 ) {
-                       return $count; // nothing to update
-               }
-
-               $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) {
-                       if ( is_array( $lastRuns ) ) {
-                               foreach ( $tasksRun as $type => $tasks ) {
-                                       foreach ( $tasks as $task => $timestamp ) {
-                                               if ( !isset( $lastRuns[$type][$task] )
-                                                       || $timestamp > $lastRuns[$type][$task]
-                                               ) {
-                                                       $lastRuns[$type][$task] = $timestamp;
-                                               }
-                                       }
-                               }
-                       } else {
-                               $lastRuns = $tasksRun;
-                       }
-
-                       return $lastRuns;
-               } );
-
-               return $count;
-       }
-
        /**
         * @param string $name
         * @return mixed
index 0948092..5c68d90 100644 (file)
@@ -103,15 +103,6 @@ class JobRunner implements LoggerAwareInterface {
                        return $response;
                }
 
-               $group = JobQueueGroup::singleton();
-               // Handle any required periodic queue maintenance
-               $count = $group->executeReadyPeriodicTasks();
-               if ( $count > 0 ) {
-                       $msg = "Executed $count periodic queue task(s).";
-                       $this->logger->debug( $msg );
-                       $this->debugCallback( $msg );
-               }
-
                // Bail out if in read-only mode
                if ( wfReadOnly() ) {
                        $response['reached'] = 'read-only';
@@ -132,6 +123,8 @@ class JobRunner implements LoggerAwareInterface {
                        return $response;
                }
 
+               $group = JobQueueGroup::singleton();
+               
                // Flush any pending DB writes for sanity
                wfGetLBFactory()->commitMasterChanges();