Merge "Wrap changes lists in <div class="mw-changeslist" />"
[lhc/web/wiklou.git] / includes / job / JobQueueFederated.php
index d3ce164..589bed6 100644 (file)
  * @since 1.22
  */
 class JobQueueFederated extends JobQueue {
-       /** @var Array (partition name => weight) reverse sorted by weight */
+       /** @var array (partition name => weight) reverse sorted by weight */
        protected $partitionMap = array();
-       /** @var Array (partition name => JobQueue) reverse sorted by weight */
+
+       /** @var array (partition name => JobQueue) reverse sorted by weight */
        protected $partitionQueues = array();
+
        /** @var HashRing */
        protected $partitionPushRing;
+
        /** @var BagOStuff */
        protected $cache;
 
+       /** @var int Maximum number of partitions to try */
+       protected $maxPartitionsTry;
+
        const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
        const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
 
@@ -72,7 +78,12 @@ class JobQueueFederated extends JobQueue {
         *                          the federated queue itself (e.g. 'order' and 'claimTTL').
         *  - partitionsNoPush    : List of partition names that can handle pop() but not push().
         *                          This can be used to migrate away from a certain partition.
+        *  - maxPartitionsTry    : Maximum number of times to attempt job insertion using
+        *                          different partition queues. This improves availability
+        *                          during failure, at the cost of added latency and somewhat
+        *                          less reliable job de-duplication mechanisms.
         * @param array $params
+        * @throws MWException
         */
        protected function __construct( array $params ) {
                parent::__construct( $params );
@@ -82,6 +93,9 @@ class JobQueueFederated extends JobQueue {
                if ( !isset( $params['partitionsBySection'][$section] ) ) {
                        throw new MWException( "No configuration for section '$section'." );
                }
+               $this->maxPartitionsTry = isset( $params['maxPartitionsTry'] )
+                       ? $params['maxPartitionsTry']
+                       : 2;
                // Get the full partition map
                $this->partitionMap = $params['partitionsBySection'][$section];
                arsort( $this->partitionMap, SORT_NUMERIC );
@@ -94,10 +108,10 @@ class JobQueueFederated extends JobQueue {
                }
                // Get the config to pass to merge into each partition queue config
                $baseConfig = $params;
-               foreach ( array( 'class', 'sectionsByWiki',
-                       'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o )
-               {
-                       unset( $baseConfig[$o] );
+               foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry',
+                       'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o
+               {
+                       unset( $baseConfig[$o] ); // partition queue doesn't care about this
                }
                // Get the partition queue objects
                foreach ( $this->partitionMap as $partition => $w ) {
@@ -140,6 +154,7 @@ class JobQueueFederated extends JobQueue {
                        try {
                                if ( !$queue->doIsEmpty() ) {
                                        $this->cache->add( $key, 'false', self::CACHE_TTL_LONG );
+
                                        return false;
                                }
                        } catch ( JobQueueError $e ) {
@@ -148,6 +163,7 @@ class JobQueueFederated extends JobQueue {
                }
 
                $this->cache->add( $key, 'true', self::CACHE_TTL_LONG );
+
                return true;
        }
 
@@ -170,7 +186,7 @@ class JobQueueFederated extends JobQueue {
        /**
         * @param string $type
         * @param string $method
-        * @return integer
+        * @return int
         */
        protected function getCrossPartitionSum( $type, $method ) {
                $key = $this->getCacheKey( $type );
@@ -190,28 +206,32 @@ class JobQueueFederated extends JobQueue {
                }
 
                $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+
                return $count;
        }
 
        protected function doBatchPush( array $jobs, $flags ) {
-               if ( !count( $jobs ) ) {
-                       return true; // nothing to do
-               }
                // Local ring variable that may be changed to point to a new ring on failure
                $partitionRing = $this->partitionPushRing;
-               // Try to insert the jobs and update $partitionsTry on any failures
-               $jobsLeft = $this->tryJobInsertions( $jobs, $partitionRing, $flags );
-               if ( count( $jobsLeft ) ) { // some jobs failed to insert?
-                       // Try to insert the remaning jobs once more, ignoring the bad partitions
-                       return !count( $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ) );
+               // Try to insert the jobs and update $partitionsTry on any failures.
+               // Retry to insert any remaning jobs again, ignoring the bad partitions.
+               $jobsLeft = $jobs;
+               for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
+                       $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags );
+               }
+               if ( count( $jobsLeft ) ) {
+                       throw new JobQueueError(
+                               "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
                }
+
                return true;
        }
 
        /**
         * @param array $jobs
         * @param HashRing $partitionRing
-        * @param integer $flags
+        * @param int $flags
+        * @throws JobQueueError
         * @return array List of Job object that could not be inserted
         */
        protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
@@ -221,6 +241,7 @@ class JobQueueFederated extends JobQueue {
                // to use a consistent hash to avoid allowing duplicate jobs per partition.
                // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
                $uJobsByPartition = array(); // (partition name => job list)
+               /** @var Job $job */
                foreach ( $jobs as $key => $job ) {
                        if ( $job->ignoreDuplicates() ) {
                                $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
@@ -240,6 +261,7 @@ class JobQueueFederated extends JobQueue {
 
                // Insert the de-duplicated jobs into the queues...
                foreach ( $uJobsByPartition as $partition => $jobBatch ) {
+                       /** @var JobQueue $queue */
                        $queue = $this->partitionQueues[$partition];
                        try {
                                $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
@@ -299,6 +321,8 @@ class JobQueueFederated extends JobQueue {
                        if ( $partition === false ) {
                                break; // all partitions at 0 weight
                        }
+
+                       /** @var JobQueue $queue */
                        $queue = $this->partitionQueues[$partition];
                        try {
                                $job = $queue->pop();
@@ -308,6 +332,7 @@ class JobQueueFederated extends JobQueue {
                        }
                        if ( $job ) {
                                $job->metadata['QueuePartition'] = $partition;
+
                                return $job;
                        } else {
                                unset( $partitionsTry[$partition] ); // blacklist partition
@@ -315,6 +340,7 @@ class JobQueueFederated extends JobQueue {
                }
 
                $this->cache->set( $key, 'true', JobQueueDB::CACHE_TTL_LONG );
+
                return false;
        }
 
@@ -322,6 +348,7 @@ class JobQueueFederated extends JobQueue {
                if ( !isset( $job->metadata['QueuePartition'] ) ) {
                        throw new MWException( "The given job has no defined partition name." );
                }
+
                return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
        }
 
@@ -335,6 +362,7 @@ class JobQueueFederated extends JobQueue {
                                return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job );
                        }
                }
+
                return false;
        }
 
@@ -348,10 +376,12 @@ class JobQueueFederated extends JobQueue {
                                return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job );
                        }
                }
+
                return false;
        }
 
        protected function doDelete() {
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        try {
                                $queue->doDelete();
@@ -362,6 +392,7 @@ class JobQueueFederated extends JobQueue {
        }
 
        protected function doWaitForBackups() {
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        try {
                                $queue->waitForBackups();
@@ -373,11 +404,13 @@ class JobQueueFederated extends JobQueue {
 
        protected function doGetPeriodicTasks() {
                $tasks = array();
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $partition => $queue ) {
                        foreach ( $queue->getPeriodicTasks() as $task => $def ) {
                                $tasks["{$partition}:{$task}"] = $def;
                        }
                }
+
                return $tasks;
        }
 
@@ -389,9 +422,12 @@ class JobQueueFederated extends JobQueue {
                        'delayedcount',
                        'abandonedcount'
                );
+
                foreach ( $types as $type ) {
                        $this->cache->delete( $this->getCacheKey( $type ) );
                }
+
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        $queue->doFlushCaches();
                }
@@ -399,17 +435,23 @@ class JobQueueFederated extends JobQueue {
 
        public function getAllQueuedJobs() {
                $iterator = new AppendIterator();
+
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        $iterator->append( $queue->getAllQueuedJobs() );
                }
+
                return $iterator;
        }
 
        public function getAllDelayedJobs() {
                $iterator = new AppendIterator();
+
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        $iterator->append( $queue->getAllDelayedJobs() );
                }
+
                return $iterator;
        }
 
@@ -420,6 +462,8 @@ class JobQueueFederated extends JobQueue {
 
        protected function doGetSiblingQueuesWithJobs( array $types ) {
                $result = array();
+
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        try {
                                $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
@@ -435,11 +479,14 @@ class JobQueueFederated extends JobQueue {
                                MWExceptionHandler::logException( $e );
                        }
                }
+
                return array_values( $result );
        }
 
        protected function doGetSiblingQueueSizes( array $types ) {
                $result = array();
+
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        try {
                                $sizes = $queue->doGetSiblingQueueSizes( $types );
@@ -454,20 +501,24 @@ class JobQueueFederated extends JobQueue {
                                MWExceptionHandler::logException( $e );
                        }
                }
+
                return $result;
        }
 
        public function setTestingPrefix( $key ) {
+               /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        $queue->setTestingPrefix( $key );
                }
        }
 
        /**
+        * @param $property
         * @return string
         */
        private function getCacheKey( $property ) {
                list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+
                return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
        }
 }