Merge "[JobQueue] Try to cut down on waitForBackups() calls in runJobs.php."
[lhc/web/wiklou.git] / includes / job / JobQueueGroup.php
index 6d9d590..351c71a 100644 (file)
@@ -37,22 +37,24 @@ class JobQueueGroup {
        protected $wiki; // string; wiki ID
 
        const TYPE_DEFAULT = 1; // integer; jobs popped by default
-       const TYPE_ANY     = 2; // integer; any job
+       const TYPE_ANY = 2; // integer; any job
 
-       const USE_CACHE = 1; // integer; use process cache
+       const USE_CACHE = 1; // integer; use process or persistent cache
 
        const PROC_CACHE_TTL = 15; // integer; seconds
 
+       const CACHE_VERSION = 1; // integer; cache version
+
        /**
-        * @param $wiki string Wiki ID
+        * @param string $wiki Wiki ID
         */
        protected function __construct( $wiki ) {
                $this->wiki = $wiki;
-               $this->cache = new ProcessCacheLRU( 1 );
+               $this->cache = new ProcessCacheLRU( 10 );
        }
 
        /**
-        * @param $wiki string Wiki ID
+        * @param string $wiki Wiki ID
         * @return JobQueueGroup
         */
        public static function singleton( $wiki = false ) {
@@ -73,8 +75,10 @@ class JobQueueGroup {
        }
 
        /**
+        * Get the job queue object for a given queue type
+        *
         * @param $type string
-        * @return JobQueue Job queue object for a given queue type
+        * @return JobQueue
         */
        public function get( $type ) {
                global $wgJobTypeConf;
@@ -91,7 +95,9 @@ class JobQueueGroup {
 
        /**
         * Insert jobs into the respective queues of with the belong.
-        * This inserts the jobs into the queue specified by $wgJobTypeConf.
+        *
+        * This inserts the jobs into the queue specified by $wgJobTypeConf
+        * and updates the aggregate job queue information cache as needed.
         *
         * @param $jobs Job|array A single Job or a list of Jobs
         * @throws MWException
@@ -111,7 +117,9 @@ class JobQueueGroup {
 
                $ok = true;
                foreach ( $jobsByType as $type => $jobs ) {
-                       if ( !$this->get( $type )->push( $jobs ) ) {
+                       if ( $this->get( $type )->push( $jobs ) ) {
+                               JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
+                       } else {
                                $ok = false;
                        }
                }
@@ -129,35 +137,47 @@ class JobQueueGroup {
        /**
         * Pop a job off one of the job queues
         *
-        * @param $queueType integer JobQueueGroup::TYPE_* constant
+        * This pops a job off a queue as specified by $wgJobTypeConf and
+        * updates the aggregate job queue information cache as needed.
+        *
+        * @param $qtype integer|string JobQueueGroup::TYPE_DEFAULT or type string
         * @param $flags integer Bitfield of JobQueueGroup::USE_* constants
         * @return Job|bool Returns false on failure
         */
-       public function pop( $queueType = self::TYPE_DEFAULT, $flags = 0 ) {
-               if ( $flags & self::USE_CACHE ) {
-                       if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
-                               $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
+       public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) {
+               if ( is_string( $qtype ) ) { // specific job type
+                       $job = $this->get( $qtype )->pop();
+                       if ( !$job ) {
+                               JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
+                       }
+                       return $job;
+               } else { // any job in the "default" jobs types
+                       if ( $flags & self::USE_CACHE ) {
+                               if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
+                                       $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
+                               }
+                               $types = $this->cache->get( 'queues-ready', 'list' );
+                       } else {
+                               $types = $this->getQueuesWithJobs();
                        }
-                       $types = $this->cache->get( 'queues-ready', 'list' );
-               } else {
-                       $types = $this->getQueuesWithJobs();
-               }
-
-               if ( $queueType == self::TYPE_DEFAULT ) {
-                       $types = array_intersect( $types, $this->getDefaultQueueTypes() );
-               }
-               shuffle( $types ); // avoid starvation
 
-               foreach ( $types as $type ) { // for each queue...
-                       $job = $this->get( $type )->pop();
-                       if ( $job ) { // found
-                               return $job;
-                       } else { // not found
-                               $this->cache->clear( 'queues-ready' );
+                       if ( $qtype == self::TYPE_DEFAULT ) {
+                               $types = array_intersect( $types, $this->getDefaultQueueTypes() );
+                       }
+                       shuffle( $types ); // avoid starvation
+
+                       foreach ( $types as $type ) { // for each queue...
+                               $job = $this->get( $type )->pop();
+                               if ( $job ) { // found
+                                       return $job;
+                               } else { // not found
+                                       JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
+                                       $this->cache->clear( 'queues-ready' );
+                               }
                        }
-               }
 
-               return false; // no jobs found
+                       return false; // no jobs found
+               }
        }
 
        /**
@@ -181,15 +201,32 @@ class JobQueueGroup {
                return $this->get( $job->getType() )->deduplicateRootJob( $job );
        }
 
+       /**
+        * Wait for any slaves or backup queue servers to catch up.
+        *
+        * This does nothing for certain queue classes.
+        *
+        * @return void
+        * @throws MWException
+        */
+       public function waitForBackups() {
+               global $wgJobTypeConf;
+
+               wfProfileIn( __METHOD__ );
+               // Try to avoid doing this more than once per queue storage medium
+               foreach ( $wgJobTypeConf as $type => $conf ) {
+                       $this->get( $type )->waitForBackups();
+               }
+               wfProfileOut( __METHOD__ );
+       }
+
        /**
         * Get the list of queue types
         *
         * @return array List of strings
         */
        public function getQueueTypes() {
-               global $wgJobClasses;
-
-               return array_keys( $wgJobClasses );
+               return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
        }
 
        /**
@@ -204,6 +241,8 @@ class JobQueueGroup {
        }
 
        /**
+        * Get the list of job types that have non-empty queues
+        *
         * @return Array List of job types that have non-empty queues
         */
        public function getQueuesWithJobs() {
@@ -217,16 +256,20 @@ class JobQueueGroup {
        }
 
        /**
-        * @return Array List of default job types that have non-empty queues
+        * Check if jobs should not be popped of a queue right now.
+        * This is only used for performance, such as to avoid spamming
+        * the queue with many sub-jobs before they actually get run.
+        *
+        * @param $type string
+        * @return bool
         */
-       public function getDefaultQueuesWithJobs() {
-               $types = array();
-               foreach ( $this->getDefaultQueueTypes() as $type ) {
-                       if ( !$this->get( $type )->isEmpty() ) {
-                               $types[] = $type;
-                       }
+       public function isQueueDeprioritized( $type ) {
+               if ( $type === 'refreshLinks2' ) {
+                       // Don't keep converting refreshLinks2 => refreshLinks jobs if the
+                       // later jobs have not been done yet. This helps throttle queue spam.
+                       return !$this->get( 'refreshLinks' )->isEmpty();
                }
-               return $types;
+               return false;
        }
 
        /**
@@ -282,4 +325,27 @@ class JobQueueGroup {
 
                return $count;
        }
+
+       /**
+        * @param $name string
+        * @return mixed
+        */
+       private function getCachedConfigVar( $name ) {
+               global $wgConf, $wgMemc;
+
+               if ( $this->wiki === wfWikiID() ) {
+                       return $GLOBALS[$name]; // common case
+               } else {
+                       list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+                       $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name );
+                       $value = $wgMemc->get( $key ); // ('v' => ...) or false
+                       if ( is_array( $value ) ) {
+                               return $value['v'];
+                       } else {
+                               $value = $wgConf->getConfig( $this->wiki, $name );
+                               $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) );
+                               return $value;
+                       }
+               }
+       }
 }