Merge "Wrap changes lists in <div class="mw-changeslist" />"
[lhc/web/wiklou.git] / includes / job / JobQueueFederated.php
index 36f4959..589bed6 100644 (file)
  * @since 1.22
  */
 class JobQueueFederated extends JobQueue {
-       /** @var Array (partition name => weight) reverse sorted by weight */
+       /** @var array (partition name => weight) reverse sorted by weight */
        protected $partitionMap = array();
-       /** @var Array (partition name => JobQueue) reverse sorted by weight */
+
+       /** @var array (partition name => JobQueue) reverse sorted by weight */
        protected $partitionQueues = array();
+
        /** @var HashRing */
        protected $partitionPushRing;
+
        /** @var BagOStuff */
        protected $cache;
 
-       protected $maxPartitionsTry;  // integer; maximum number of partitions to try
+       /** @var int Maximum number of partitions to try */
+       protected $maxPartitionsTry;
 
        const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
        const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
@@ -79,6 +83,7 @@ class JobQueueFederated extends JobQueue {
         *                          during failure, at the cost of added latency and somewhat
         *                          less reliable job de-duplication mechanisms.
         * @param array $params
+        * @throws MWException
         */
        protected function __construct( array $params ) {
                parent::__construct( $params );
@@ -104,8 +109,8 @@ class JobQueueFederated extends JobQueue {
                // Get the config to pass to merge into each partition queue config
                $baseConfig = $params;
                foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry',
-                       'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o )
-               {
+                       'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o
+               {
                        unset( $baseConfig[$o] ); // partition queue doesn't care about this
                }
                // Get the partition queue objects
@@ -149,6 +154,7 @@ class JobQueueFederated extends JobQueue {
                        try {
                                if ( !$queue->doIsEmpty() ) {
                                        $this->cache->add( $key, 'false', self::CACHE_TTL_LONG );
+
                                        return false;
                                }
                        } catch ( JobQueueError $e ) {
@@ -157,6 +163,7 @@ class JobQueueFederated extends JobQueue {
                }
 
                $this->cache->add( $key, 'true', self::CACHE_TTL_LONG );
+
                return true;
        }
 
@@ -179,7 +186,7 @@ class JobQueueFederated extends JobQueue {
        /**
         * @param string $type
         * @param string $method
-        * @return integer
+        * @return int
         */
        protected function getCrossPartitionSum( $type, $method ) {
                $key = $this->getCacheKey( $type );
@@ -199,6 +206,7 @@ class JobQueueFederated extends JobQueue {
                }
 
                $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+
                return $count;
        }
 
@@ -215,13 +223,15 @@ class JobQueueFederated extends JobQueue {
                        throw new JobQueueError(
                                "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
                }
+
                return true;
        }
 
        /**
         * @param array $jobs
         * @param HashRing $partitionRing
-        * @param integer $flags
+        * @param int $flags
+        * @throws JobQueueError
         * @return array List of Job object that could not be inserted
         */
        protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
@@ -231,6 +241,7 @@ class JobQueueFederated extends JobQueue {
                // to use a consistent hash to avoid allowing duplicate jobs per partition.
                // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
                $uJobsByPartition = array(); // (partition name => job list)
+               /** @var Job $job */
                foreach ( $jobs as $key => $job ) {
                        if ( $job->ignoreDuplicates() ) {
                                $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
@@ -250,6 +261,7 @@ class JobQueueFederated extends JobQueue {
 
                // Insert the de-duplicated jobs into the queues...
                foreach ( $uJobsByPartition as $partition => $jobBatch ) {
+                       /** @var JobQueue $queue */
                        $queue = $this->partitionQueues[$partition];
                        try {
                                $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
@@ -309,6 +321,8 @@ class JobQueueFederated extends JobQueue {
                        if ( $partition === false ) {
                                break; // all partitions at 0 weight
                        }
+
+                       /** @var JobQueue $queue */
                        $queue = $this->partitionQueues[$partition];
                        try {
                                $job = $queue->pop();
@@ -318,6 +332,7 @@ class JobQueueFederated extends JobQueue {
                        }
                        if ( $job ) {
                                $job->metadata['QueuePartition'] = $partition;
+
                                return $job;
                        } else {
                                unset( $partitionsTry[$partition] ); // blacklist partition
@@ -325,6 +340,7 @@ class JobQueueFederated extends JobQueue {
                }
 
                $this->cache->set( $key, 'true', JobQueueDB::CACHE_TTL_LONG );
+
                return false;
        }
 
@@ -332,6 +348,7 @@ class JobQueueFederated extends JobQueue {
                if ( !isset( $job->metadata['QueuePartition'] ) ) {
                        throw new MWException( "The given job has no defined partition name." );
                }
+
                return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
        }
 
@@ -345,6 +362,7 @@ class JobQueueFederated extends JobQueue {
                                return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job );
                        }
                }
+
                return false;
        }
 
@@ -358,10 +376,12 @@ class JobQueueFederated extends JobQueue {
                                return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job );
                        }
                }
+
                return false;
        }
 
        protected function doDelete() {
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        try {
                                $queue->doDelete();
@@ -372,6 +392,7 @@ class JobQueueFederated extends JobQueue {
        }
 
        protected function doWaitForBackups() {
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        try {
                                $queue->waitForBackups();
@@ -383,11 +404,13 @@ class JobQueueFederated extends JobQueue {
 
        protected function doGetPeriodicTasks() {
                $tasks = array();
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $partition => $queue ) {
                        foreach ( $queue->getPeriodicTasks() as $task => $def ) {
                                $tasks["{$partition}:{$task}"] = $def;
                        }
                }
+
                return $tasks;
        }
 
@@ -399,9 +422,12 @@ class JobQueueFederated extends JobQueue {
                        'delayedcount',
                        'abandonedcount'
                );
+
                foreach ( $types as $type ) {
                        $this->cache->delete( $this->getCacheKey( $type ) );
                }
+
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        $queue->doFlushCaches();
                }
@@ -409,17 +435,23 @@ class JobQueueFederated extends JobQueue {
 
        public function getAllQueuedJobs() {
                $iterator = new AppendIterator();
+
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        $iterator->append( $queue->getAllQueuedJobs() );
                }
+
                return $iterator;
        }
 
        public function getAllDelayedJobs() {
                $iterator = new AppendIterator();
+
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        $iterator->append( $queue->getAllDelayedJobs() );
                }
+
                return $iterator;
        }
 
@@ -430,6 +462,8 @@ class JobQueueFederated extends JobQueue {
 
        protected function doGetSiblingQueuesWithJobs( array $types ) {
                $result = array();
+
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        try {
                                $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
@@ -445,11 +479,14 @@ class JobQueueFederated extends JobQueue {
                                MWExceptionHandler::logException( $e );
                        }
                }
+
                return array_values( $result );
        }
 
        protected function doGetSiblingQueueSizes( array $types ) {
                $result = array();
+
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        try {
                                $sizes = $queue->doGetSiblingQueueSizes( $types );
@@ -464,20 +501,24 @@ class JobQueueFederated extends JobQueue {
                                MWExceptionHandler::logException( $e );
                        }
                }
+
                return $result;
        }
 
        public function setTestingPrefix( $key ) {
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        $queue->setTestingPrefix( $key );
                }
        }
 
        /**
+        * @param $property
         * @return string
         */
        private function getCacheKey( $property ) {
                list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+
                return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
        }
 }