Update ternary notation
[lhc/web/wiklou.git] / includes / job / JobQueueGroup.php
index e483e05..a3ec8a7 100644 (file)
  * @since 1.21
  */
 class JobQueueGroup {
-       /** @var Array */
+       /** @var array */
        protected static $instances = array();
 
        /** @var ProcessCacheLRU */
        protected $cache;
 
-       protected $wiki; // string; wiki ID
+       /** @var string Wiki ID */
+       protected $wiki;
+
+       /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
+       protected $coalescedQueues;
 
        const TYPE_DEFAULT = 1; // integer; jobs popped by default
        const TYPE_ANY = 2; // integer; any job
@@ -55,7 +59,7 @@ class JobQueueGroup {
        }
 
        /**
-        * @param string $wiki Wiki ID
+        * @param bool|string $wiki Wiki ID
         * @return JobQueueGroup
         */
        public static function singleton( $wiki = false ) {
@@ -63,6 +67,7 @@ class JobQueueGroup {
                if ( !isset( self::$instances[$wiki] ) ) {
                        self::$instances[$wiki] = new self( $wiki );
                }
+
                return self::$instances[$wiki];
        }
 
@@ -78,7 +83,7 @@ class JobQueueGroup {
        /**
         * Get the job queue object for a given queue type
         *
-        * @param $type string
+        * @param string $type
         * @return JobQueue
         */
        public function get( $type ) {
@@ -100,12 +105,15 @@ class JobQueueGroup {
         * This inserts the jobs into the queue specified by $wgJobTypeConf
         * and updates the aggregate job queue information cache as needed.
         *
-        * @param $jobs Job|array A single Job or a list of Jobs
+        * @param Job|array $jobs A single Job or a list of Jobs
         * @throws MWException
         * @return bool
         */
        public function push( $jobs ) {
                $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+               if ( !count( $jobs ) ) {
+                       return true;
+               }
 
                $jobsByType = array(); // (job type => list of jobs)
                foreach ( $jobs as $job ) {
@@ -141,8 +149,8 @@ class JobQueueGroup {
         * This pops a job off a queue as specified by $wgJobTypeConf and
         * updates the aggregate job queue information cache as needed.
         *
-        * @param $qtype integer|string JobQueueGroup::TYPE_DEFAULT or type string
-        * @param $flags integer Bitfield of JobQueueGroup::USE_* constants
+        * @param int|string $qtype JobQueueGroup::TYPE_DEFAULT or type string
+        * @param int $flags Bitfield of JobQueueGroup::USE_* constants
         * @return Job|bool Returns false on failure
         */
        public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) {
@@ -154,6 +162,7 @@ class JobQueueGroup {
                        if ( !$job ) {
                                JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
                        }
+
                        return $job;
                } else { // any job in the "default" jobs types
                        if ( $flags & self::USE_CACHE ) {
@@ -190,7 +199,7 @@ class JobQueueGroup {
        /**
         * Acknowledge that a job was completed
         *
-        * @param $job Job
+        * @param Job $job
         * @return bool
         */
        public function ack( Job $job ) {
@@ -201,7 +210,7 @@ class JobQueueGroup {
         * Register the "root job" of a given job into the queue for de-duplication.
         * This should only be called right *after* all the new jobs have been inserted.
         *
-        * @param $job Job
+        * @param Job $job
         * @return bool
         */
        public function deduplicateRootJob( Job $job ) {
@@ -250,24 +259,83 @@ class JobQueueGroup {
        /**
         * Get the list of job types that have non-empty queues
         *
-        * @return Array List of job types that have non-empty queues
+        * @return array List of job types that have non-empty queues
         */
        public function getQueuesWithJobs() {
                $types = array();
-               foreach ( $this->getQueueTypes() as $type ) {
-                       if ( !$this->get( $type )->isEmpty() ) {
-                               $types[] = $type;
+               foreach ( $this->getCoalescedQueues() as $info ) {
+                       $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
+                       if ( is_array( $nonEmpty ) ) { // batching features supported
+                               $types = array_merge( $types, $nonEmpty );
+                       } else { // we have to go through the queues in the bucket one-by-one
+                               foreach ( $info['types'] as $type ) {
+                                       if ( !$this->get( $type )->isEmpty() ) {
+                                               $types[] = $type;
+                                       }
+                               }
                        }
                }
+
                return $types;
        }
 
+       /**
+        * Get the size of the queus for a list of job types
+        *
+        * @return array Map of (job type => size)
+        */
+       public function getQueueSizes() {
+               $sizeMap = array();
+               foreach ( $this->getCoalescedQueues() as $info ) {
+                       $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
+                       if ( is_array( $sizes ) ) { // batching features supported
+                               $sizeMap = $sizeMap + $sizes;
+                       } else { // we have to go through the queues in the bucket one-by-one
+                               foreach ( $info['types'] as $type ) {
+                                       $sizeMap[$type] = $this->get( $type )->getSize();
+                               }
+                       }
+               }
+
+               return $sizeMap;
+       }
+
+       /**
+        * @return array
+        */
+       protected function getCoalescedQueues() {
+               global $wgJobTypeConf;
+
+               if ( $this->coalescedQueues === null ) {
+                       $this->coalescedQueues = array();
+                       foreach ( $wgJobTypeConf as $type => $conf ) {
+                               $queue = JobQueue::factory(
+                                       array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf );
+                               $loc = $queue->getCoalesceLocationInternal();
+                               if ( !isset( $this->coalescedQueues[$loc] ) ) {
+                                       $this->coalescedQueues[$loc]['queue'] = $queue;
+                                       $this->coalescedQueues[$loc]['types'] = array();
+                               }
+                               if ( $type === 'default' ) {
+                                       $this->coalescedQueues[$loc]['types'] = array_merge(
+                                               $this->coalescedQueues[$loc]['types'],
+                                               array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
+                                       );
+                               } else {
+                                       $this->coalescedQueues[$loc]['types'][] = $type;
+                               }
+                       }
+               }
+
+               return $this->coalescedQueues;
+       }
+
        /**
         * Check if jobs should not be popped of a queue right now.
         * This is only used for performance, such as to avoid spamming
         * the queue with many sub-jobs before they actually get run.
         *
-        * @param $type string
+        * @param string $type
         * @return bool
         */
        public function isQueueDeprioritized( $type ) {
@@ -275,12 +343,16 @@ class JobQueueGroup {
                        return $this->cache->get( 'isDeprioritized', $type );
                }
                if ( $type === 'refreshLinks2' ) {
-                       // Don't keep converting refreshLinks2 => refreshLinks jobs if the
+                       // Don't keep converting refreshLinksPartition => refreshLinks jobs if the
                        // later jobs have not been done yet. This helps throttle queue spam.
-                       $deprioritized = !$this->get( 'refreshLinks' )->isEmpty();
+                       // @TODO: this is mostly a WMF-specific hack and should be removed when
+                       // refreshLinks2 jobs are drained.
+                       $deprioritized = !$this->get( 'refreshLinks' )->getSize() > 10000;
                        $this->cache->set( 'isDeprioritized', $type, $deprioritized );
+
                        return $deprioritized;
                }
+
                return false;
        }
 
@@ -291,7 +363,7 @@ class JobQueueGroup {
         * the defined run period. Concurrent calls to this function will cause tasks
         * to be attempted twice, so they may need their own methods of mutual exclusion.
         *
-        * @return integer Number of tasks run
+        * @return int Number of tasks run
         */
        public function executeReadyPeriodicTasks() {
                global $wgMemc;
@@ -308,27 +380,27 @@ class JobQueueGroup {
                                if ( $definition['period'] <= 0 ) {
                                        continue; // disabled
                                } elseif ( !isset( $lastRuns[$type][$task] )
-                                       || $lastRuns[$type][$task] < ( time() - $definition['period'] ) )
-                               {
+                                       || $lastRuns[$type][$task] < ( time() - $definition['period'] )
+                               {
                                        try {
                                                if ( call_user_func( $definition['callback'] ) !== null ) {
                                                        $tasksRun[$type][$task] = time();
                                                        ++$count;
                                                }
                                        } catch ( JobQueueError $e ) {
-                                               wfDebugLog( 'exception', $e->getLogMessage() );
+                                               MWExceptionHandler::logException( $e );
                                        }
                                }
                        }
                }
 
-               $wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) {
+               $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) {
                        if ( is_array( $lastRuns ) ) {
                                foreach ( $tasksRun as $type => $tasks ) {
                                        foreach ( $tasks as $task => $timestamp ) {
                                                if ( !isset( $lastRuns[$type][$task] )
-                                                       || $timestamp > $lastRuns[$type][$task] )
-                                               {
+                                                       || $timestamp > $lastRuns[$type][$task]
+                                               {
                                                        $lastRuns[$type][$task] = $timestamp;
                                                }
                                        }
@@ -336,6 +408,7 @@ class JobQueueGroup {
                        } else {
                                $lastRuns = $tasksRun;
                        }
+
                        return $lastRuns;
                } );
 
@@ -360,6 +433,7 @@ class JobQueueGroup {
                        } else {
                                $value = $wgConf->getConfig( $this->wiki, $name );
                                $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) );
+
                                return $value;
                        }
                }