Merge "Change the syntax and non-one count fake values for {{PLURAL: in newmessages*"
[lhc/web/wiklou.git] / includes / job / JobQueueGroup.php
index 23a5494..fa7fee5 100644 (file)
@@ -36,17 +36,21 @@ class JobQueueGroup {
 
        protected $wiki; // string; wiki ID
 
+       /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
+       protected $coalescedQueues;
+
        const TYPE_DEFAULT = 1; // integer; jobs popped by default
-       const TYPE_ANY     = 2; // integer; any job
+       const TYPE_ANY = 2; // integer; any job
 
        const USE_CACHE = 1; // integer; use process or persistent cache
+       const USE_PRIORITY = 2; // integer; respect deprioritization
 
        const PROC_CACHE_TTL = 15; // integer; seconds
 
        const CACHE_VERSION = 1; // integer; cache version
 
        /**
-        * @param $wiki string Wiki ID
+        * @param string $wiki Wiki ID
         */
        protected function __construct( $wiki ) {
                $this->wiki = $wiki;
@@ -54,7 +58,7 @@ class JobQueueGroup {
        }
 
        /**
-        * @param $wiki string Wiki ID
+        * @param string $wiki Wiki ID
         * @return JobQueueGroup
         */
        public static function singleton( $wiki = false ) {
@@ -146,6 +150,9 @@ class JobQueueGroup {
         */
        public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) {
                if ( is_string( $qtype ) ) { // specific job type
+                       if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $qtype ) ) {
+                               return false; // back off
+                       }
                        $job = $this->get( $qtype )->pop();
                        if ( !$job ) {
                                JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
@@ -167,6 +174,9 @@ class JobQueueGroup {
                        shuffle( $types ); // avoid starvation
 
                        foreach ( $types as $type ) { // for each queue...
+                               if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $type ) ) {
+                                       continue; // back off
+                               }
                                $job = $this->get( $type )->pop();
                                if ( $job ) { // found
                                        return $job;
@@ -201,6 +211,25 @@ class JobQueueGroup {
                return $this->get( $job->getType() )->deduplicateRootJob( $job );
        }
 
+       /**
+        * Wait for any slaves or backup queue servers to catch up.
+        *
+        * This does nothing for certain queue classes.
+        *
+        * @return void
+        * @throws MWException
+        */
+       public function waitForBackups() {
+               global $wgJobTypeConf;
+
+               wfProfileIn( __METHOD__ );
+               // Try to avoid doing this more than once per queue storage medium
+               foreach ( $wgJobTypeConf as $type => $conf ) {
+                       $this->get( $type )->waitForBackups();
+               }
+               wfProfileOut( __METHOD__ );
+       }
+
        /**
         * Get the list of queue types
         *
@@ -228,14 +257,93 @@ class JobQueueGroup {
         */
        public function getQueuesWithJobs() {
                $types = array();
-               foreach ( $this->getQueueTypes() as $type ) {
-                       if ( !$this->get( $type )->isEmpty() ) {
-                               $types[] = $type;
+               foreach ( $this->getCoalescedQueues() as $info ) {
+                       $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
+                       if ( is_array( $nonEmpty ) ) { // batching features supported
+                               $types = array_merge( $types, $nonEmpty );
+                       } else { // we have to go through the queues in the bucket one-by-one
+                               foreach ( $info['types'] as $type ) {
+                                       if ( !$this->get( $type )->isEmpty() ) {
+                                               $types[] = $type;
+                                       }
+                               }
                        }
                }
                return $types;
        }
 
+       /**
+        * Get the size of the queus for a list of job types
+        *
+        * @return Array Map of (job type => size)
+        */
+       public function getQueueSizes() {
+               $sizeMap = array();
+               foreach ( $this->getCoalescedQueues() as $info ) {
+                       $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
+                       if ( is_array( $sizes ) ) { // batching features supported
+                               $sizeMap = $sizeMap + $sizes;
+                       } else { // we have to go through the queues in the bucket one-by-one
+                               foreach ( $info['types'] as $type ) {
+                                       $sizeMap[$type] = $this->get( $type )->getSize();
+                               }
+                       }
+               }
+               return $sizeMap;
+       }
+
+       /**
+        * @return array
+        */
+       protected function getCoalescedQueues() {
+               global $wgJobTypeConf;
+
+               if ( $this->coalescedQueues === null ) {
+                       $this->coalescedQueues = array();
+                       foreach ( $wgJobTypeConf as $type => $conf ) {
+                               $queue = JobQueue::factory(
+                                       array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf );
+                               $loc = $queue->getCoalesceLocationInternal();
+                               if ( !isset( $this->coalescedQueues[$loc] ) ) {
+                                       $this->coalescedQueues[$loc]['queue'] = $queue;
+                                       $this->coalescedQueues[$loc]['types'] = array();
+                               }
+                               if ( $type === 'default' ) {
+                                       $this->coalescedQueues[$loc]['types'] = array_merge(
+                                               $this->coalescedQueues[$loc]['types'],
+                                               array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
+                                       );
+                               } else {
+                                       $this->coalescedQueues[$loc]['types'][] = $type;
+                               }
+                       }
+               }
+
+               return $this->coalescedQueues;
+       }
+
+       /**
+        * Check if jobs should not be popped of a queue right now.
+        * This is only used for performance, such as to avoid spamming
+        * the queue with many sub-jobs before they actually get run.
+        *
+        * @param $type string
+        * @return bool
+        */
+       public function isQueueDeprioritized( $type ) {
+               if ( $this->cache->has( 'isDeprioritized', $type, 5 ) ) {
+                       return $this->cache->get( 'isDeprioritized', $type );
+               }
+               if ( $type === 'refreshLinks2' ) {
+                       // Don't keep converting refreshLinks2 => refreshLinks jobs if the
+                       // later jobs have not been done yet. This helps throttle queue spam.
+                       $deprioritized = !$this->get( 'refreshLinks' )->isEmpty();
+                       $this->cache->set( 'isDeprioritized', $type, $deprioritized );
+                       return $deprioritized;
+               }
+               return false;
+       }
+
        /**
         * Execute any due periodic queue maintenance tasks for all queues.
         *
@@ -262,9 +370,13 @@ class JobQueueGroup {
                                } elseif ( !isset( $lastRuns[$type][$task] )
                                        || $lastRuns[$type][$task] < ( time() - $definition['period'] ) )
                                {
-                                       if ( call_user_func( $definition['callback'] ) !== null ) {
-                                               $tasksRun[$type][$task] = time();
-                                               ++$count;
+                                       try {
+                                               if ( call_user_func( $definition['callback'] ) !== null ) {
+                                                       $tasksRun[$type][$task] = time();
+                                                       ++$count;
+                                               }
+                                       } catch ( JobQueueError $e ) {
+                                               MWExceptionHandler::logException( $e );
                                        }
                                }
                        }