Merge "Additional tests to catch Parsoid regressions."
[lhc/web/wiklou.git] / includes / job / JobQueueGroup.php
index 48f2746..cf0215b 100644 (file)
@@ -31,16 +31,24 @@ class JobQueueGroup {
        /** @var Array */
        protected static $instances = array();
 
+       /** @var ProcessCacheLRU */
+       protected $cache;
+
        protected $wiki; // string; wiki ID
 
-       const TYPE_DEFAULT = 1; // integer; job not in $wgJobTypesExcludedFromDefaultQueue
+       const TYPE_DEFAULT = 1; // integer; jobs popped by default
        const TYPE_ANY     = 2; // integer; any job
 
+       const USE_CACHE = 1; // integer; use process cache
+
+       const PROC_CACHE_TTL = 15; // integer; seconds
+
        /**
         * @param $wiki string Wiki ID
         */
        protected function __construct( $wiki ) {
                $this->wiki = $wiki;
+               $this->cache = new ProcessCacheLRU( 1 );
        }
 
        /**
@@ -55,6 +63,15 @@ class JobQueueGroup {
                return self::$instances[$wiki];
        }
 
+       /**
+        * Destroy the singleton instances
+        *
+        * @return void
+        */
+       public static function destroySingletons() {
+               self::$instances = array();
+       }
+
        /**
         * @param $type string
         * @return JobQueue Job queue object for a given queue type
@@ -73,9 +90,11 @@ class JobQueueGroup {
        }
 
        /**
-        * Insert jobs into the respective queues of with the belong
+        * Insert jobs into the respective queues of with the belong.
+        * This inserts the jobs into the queue specified by $wgJobTypeConf.
         *
         * @param $jobs Job|array A single Job or a list of Jobs
+        * @throws MWException
         * @return bool
         */
        public function push( $jobs ) {
@@ -92,30 +111,49 @@ class JobQueueGroup {
 
                $ok = true;
                foreach ( $jobsByType as $type => $jobs ) {
-                       if ( !$this->get( $type )->batchPush( $jobs ) ) {
+                       if ( !$this->get( $type )->push( $jobs ) ) {
                                $ok = false;
                        }
                }
 
+               if ( $this->cache->has( 'queues-ready', 'list' ) ) {
+                       $list = $this->cache->get( 'queues-ready', 'list' );
+                       if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
+                               $this->cache->clear( 'queues-ready' );
+                       }
+               }
+
                return $ok;
        }
 
        /**
         * Pop a job off one of the job queues
         *
-        * @param $type integer JobQueueGroup::TYPE_* constant
+        * @param $queueType integer JobQueueGroup::TYPE_* constant
+        * @param $flags integer Bitfield of JobQueueGroup::USE_* constants
         * @return Job|bool Returns false on failure
         */
-       public function pop( $type = self::TYPE_DEFAULT ) {
-               $types = ( $type == self::TYPE_DEFAULT )
-                       ? $this->getDefaultQueueTypes()
-                       : $this->getQueueTypes();
+       public function pop( $queueType = self::TYPE_DEFAULT, $flags = 0 ) {
+               if ( $flags & self::USE_CACHE ) {
+                       if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
+                               $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
+                       }
+                       $types = $this->cache->get( 'queues-ready', 'list' );
+               } else {
+                       $types = $this->getQueuesWithJobs();
+               }
+
+               if ( $queueType == self::TYPE_DEFAULT ) {
+                       $types = array_intersect( $types, $this->getDefaultQueueTypes() );
+               }
                shuffle( $types ); // avoid starvation
 
                foreach ( $types as $type ) { // for each queue...
                        $job = $this->get( $type )->pop();
-                       if ( $job ) {
-                               return $job; // found
+                       if ( $job ) { // found
+                               return $job;
+                       } else { // not found
+                               $this->cache->clear( 'queues-ready' );
                        }
                }
 
@@ -132,6 +170,17 @@ class JobQueueGroup {
                return $this->get( $job->getType() )->ack( $job );
        }
 
+       /**
+        * Register the "root job" of a given job into the queue for de-duplication.
+        * This should only be called right *after* all the new jobs have been inserted.
+        *
+        * @param $job Job
+        * @return bool
+        */
+       public function deduplicateRootJob( Job $job ) {
+               return $this->get( $job->getType() )->deduplicateRootJob( $job );
+       }
+
        /**
         * Get the list of queue types
         *
@@ -166,4 +215,17 @@ class JobQueueGroup {
                }
                return $types;
        }
+
+       /**
+        * @return Array List of default job types that have non-empty queues
+        */
+       public function getDefaultQueuesWithJobs() {
+               $types = array();
+               foreach ( $this->getDefaultQueueTypes() as $type ) {
+                       if ( !$this->get( $type )->isEmpty() ) {
+                               $types[] = $type;
+                       }
+               }
+               return $types;
+       }
 }