Merge "clone is not a function"
[lhc/web/wiklou.git] / includes / jobqueue / JobQueueGroup.php
index dbb85d7..5bd1cc9 100644 (file)
@@ -28,7 +28,7 @@
  * @since 1.21
  */
 class JobQueueGroup {
-       /** @var array */
+       /** @var JobQueueGroup[] */
        protected static $instances = array();
 
        /** @var ProcessCacheLRU */
@@ -40,6 +40,9 @@ class JobQueueGroup {
        /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
        protected $coalescedQueues;
 
+       /** @var Job[] */
+       protected $bufferedJobs = array();
+
        const TYPE_DEFAULT = 1; // integer; jobs popped by default
        const TYPE_ANY = 2; // integer; any job
 
@@ -94,18 +97,19 @@ class JobQueueGroup {
                } else {
                        $conf = $conf + $wgJobTypeConf['default'];
                }
+               $conf['aggregator'] = JobQueueAggregator::singleton();
 
                return JobQueue::factory( $conf );
        }
 
        /**
-        * Insert jobs into the respective queues of with the belong.
+        * Insert jobs into the respective queues of which they belong
         *
         * This inserts the jobs into the queue specified by $wgJobTypeConf
         * and updates the aggregate job queue information cache as needed.
         *
-        * @param Job|Job[] $jobs A single Job or a list of Jobs
-        * @throws MWException
+        * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
+        * @throws InvalidArgumentException
         * @return void
         */
        public function push( $jobs ) {
@@ -114,18 +118,15 @@ class JobQueueGroup {
                        return;
                }
 
+               $this->assertValidJobs( $jobs );
+
                $jobsByType = array(); // (job type => list of jobs)
                foreach ( $jobs as $job ) {
-                       if ( $job instanceof IJobSpecification ) {
-                               $jobsByType[$job->getType()][] = $job;
-                       } else {
-                               throw new MWException( "Attempted to push a non-Job object into a queue." );
-                       }
+                       $jobsByType[$job->getType()][] = $job;
                }
 
                foreach ( $jobsByType as $type => $jobs ) {
                        $this->get( $type )->push( $jobs );
-                       JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
                }
 
                if ( $this->cache->has( 'queues-ready', 'list' ) ) {
@@ -136,6 +137,42 @@ class JobQueueGroup {
                }
        }
 
+       /**
+        * Buffer jobs for insertion via push() or call it now if in CLI mode
+        *
+        * Note that MediaWiki::restInPeace() calls pushLazyJobs()
+        *
+        * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
+        * @return void
+        * @since 1.26
+        */
+       public function lazyPush( $jobs ) {
+               if ( PHP_SAPI === 'cli' ) {
+                       $this->push( $jobs );
+                       return;
+               }
+
+               $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+
+               // Throw errors now instead of on push(), when other jobs may be buffered
+               $this->assertValidJobs( $jobs );
+
+               $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
+       }
+
+       /**
+        * Push all jobs buffered via lazyPush() into their respective queues
+        *
+        * @return void
+        * @since 1.26
+        */
+       public static function pushLazyJobs() {
+               foreach ( self::$instances as $group ) {
+                       $group->push( $group->bufferedJobs );
+                       $group->bufferedJobs = array();
+               }
+       }
+
        /**
         * Pop a job off one of the job queues
         *
@@ -153,9 +190,6 @@ class JobQueueGroup {
                if ( is_string( $qtype ) ) { // specific job type
                        if ( !in_array( $qtype, $blacklist ) ) {
                                $job = $this->get( $qtype )->pop();
-                               if ( !$job ) {
-                                       JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
-                               }
                        }
                } else { // any job in the "default" jobs types
                        if ( $flags & self::USE_CACHE ) {
@@ -179,7 +213,6 @@ class JobQueueGroup {
                                if ( $job ) { // found
                                        break;
                                } else { // not found
-                                       JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
                                        $this->cache->clear( 'queues-ready' );
                                }
                        }
@@ -192,10 +225,10 @@ class JobQueueGroup {
         * Acknowledge that a job was completed
         *
         * @param Job $job
-        * @return bool
+        * @return void
         */
        public function ack( Job $job ) {
-               return $this->get( $job->getType() )->ack( $job );
+               $this->get( $job->getType() )->ack( $job );
        }
 
        /**
@@ -215,7 +248,6 @@ class JobQueueGroup {
         * This does nothing for certain queue classes.
         *
         * @return void
-        * @throws MWException
         */
        public function waitForBackups() {
                global $wgJobTypeConf;
@@ -345,73 +377,6 @@ class JobQueueGroup {
                return $this->coalescedQueues;
        }
 
-       /**
-        * Execute any due periodic queue maintenance tasks for all queues.
-        *
-        * A task is "due" if the time ellapsed since the last run is greater than
-        * the defined run period. Concurrent calls to this function will cause tasks
-        * to be attempted twice, so they may need their own methods of mutual exclusion.
-        *
-        * @return int Number of tasks run
-        */
-       public function executeReadyPeriodicTasks() {
-               global $wgMemc;
-
-               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
-               $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
-               $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
-
-               $count = 0;
-               $tasksRun = array(); // (queue => task => UNIX timestamp)
-               foreach ( $this->getQueueTypes() as $type ) {
-                       $queue = $this->get( $type );
-                       foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
-                               if ( $definition['period'] <= 0 ) {
-                                       continue; // disabled
-                               } elseif ( !isset( $lastRuns[$type][$task] )
-                                       || $lastRuns[$type][$task] < ( time() - $definition['period'] )
-                               ) {
-                                       try {
-                                               if ( call_user_func( $definition['callback'] ) !== null ) {
-                                                       $tasksRun[$type][$task] = time();
-                                                       ++$count;
-                                               }
-                                       } catch ( JobQueueError $e ) {
-                                               MWExceptionHandler::logException( $e );
-                                       }
-                               }
-                       }
-                       // The tasks may have recycled jobs or release delayed jobs into the queue
-                       if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) {
-                               JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
-                       }
-               }
-
-               if ( $count === 0 ) {
-                       return $count; // nothing to update
-               }
-
-               $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) {
-                       if ( is_array( $lastRuns ) ) {
-                               foreach ( $tasksRun as $type => $tasks ) {
-                                       foreach ( $tasks as $task => $timestamp ) {
-                                               if ( !isset( $lastRuns[$type][$task] )
-                                                       || $timestamp > $lastRuns[$type][$task]
-                                               ) {
-                                                       $lastRuns[$type][$task] = $timestamp;
-                                               }
-                                       }
-                               }
-                       } else {
-                               $lastRuns = $tasksRun;
-                       }
-
-                       return $lastRuns;
-               } );
-
-               return $count;
-       }
-
        /**
         * @param string $name
         * @return mixed
@@ -435,4 +400,24 @@ class JobQueueGroup {
                        }
                }
        }
+
+       /**
+        * @param array $jobs
+        * @throws InvalidArgumentException
+        */
+       private function assertValidJobs( array $jobs ) {
+               foreach ( $jobs as $job ) { // sanity checks
+                       if ( !( $job instanceof IJobSpecification ) ) {
+                               throw new InvalidArgumentException( "Expected IJobSpecification objects" );
+                       }
+               }
+       }
+
+       function __destruct() {
+               $n = count( $this->bufferedJobs );
+               if ( $n > 0 ) {
+                       $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
+                       trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
+               }
+       }
 }