Merge "Remove "Misc" tab from Special:Preferences"
[lhc/web/wiklou.git] / includes / job / JobQueueFederated.php
index db5b686..35b80ca 100644 (file)
@@ -55,6 +55,8 @@ class JobQueueFederated extends JobQueue {
        /** @var Array (partition names => integer) */
        protected $partitionsNoPush = array();
 
+       /** @var HashRing */
+       protected $partitionRing;
        /** @var Array (partition name => JobQueue) */
        protected $partitionQueues = array();
        /** @var BagOStuff */
@@ -80,7 +82,9 @@ class JobQueueFederated extends JobQueue {
         */
        protected function __construct( array $params ) {
                parent::__construct( $params );
-               $this->sectionsByWiki = $params['sectionsByWiki'];
+               $this->sectionsByWiki = isset( $params['sectionsByWiki'] )
+                       ? $params['sectionsByWiki']
+                       : array(); // all in "default" section
                $this->partitionsBySection = $params['partitionsBySection'];
                $this->configByPartition = $params['configByPartition'];
                if ( isset( $params['partitionsNoPush'] ) ) {
@@ -100,6 +104,12 @@ class JobQueueFederated extends JobQueue {
                                $baseConfig + $this->configByPartition[$partition]
                        );
                }
+               // Get the ring of partitions to push job de-duplication information into
+               $partitionsTry = array_diff_key(
+                       $this->getPartitionMap(),
+                       $this->partitionsNoPush
+               ); // (partition => weight)
+               $this->partitionRing = new HashRing( $partitionsTry );
                // Aggregate cache some per-queue values if there are multiple partition queues
                $this->cache = $this->isFederated() ? wfGetMainCache() : new EmptyBagOStuff();
        }
@@ -128,9 +138,13 @@ class JobQueueFederated extends JobQueue {
                }
 
                foreach ( $this->partitionQueues as $queue ) {
-                       if ( !$queue->doIsEmpty() ) {
-                               $this->cache->add( $key, 'false', self::CACHE_TTL_LONG );
-                               return false;
+                       try {
+                               if ( !$queue->doIsEmpty() ) {
+                                       $this->cache->add( $key, 'false', self::CACHE_TTL_LONG );
+                                       return false;
+                               }
+                       } catch ( JobQueueError $e ) {
+                               wfDebugLog( 'exception', $e->getLogMessage() );
                        }
                }
 
@@ -169,7 +183,11 @@ class JobQueueFederated extends JobQueue {
 
                $count = 0;
                foreach ( $this->partitionQueues as $queue ) {
-                       $count += $queue->$method();
+                       try {
+                               $count += $queue->$method();
+                       } catch ( JobQueueError $e ) {
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
                }
 
                $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
@@ -234,7 +252,13 @@ class JobQueueFederated extends JobQueue {
                // Insert the de-duplicated jobs into the queues...
                foreach ( $uJobsByPartition as $partition => $jobBatch ) {
                        $queue = $this->partitionQueues[$partition];
-                       if ( $queue->doBatchPush( $jobBatch, $flags ) ) {
+                       try {
+                               $ok = $queue->doBatchPush( $jobBatch, $flags );
+                       } catch ( JobQueueError $e ) {
+                               $ok = false;
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
+                       if ( $ok ) {
                                $key = $this->getCacheKey( 'empty' );
                                $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
                        } else {
@@ -249,7 +273,13 @@ class JobQueueFederated extends JobQueue {
                                $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
                        } else {
                                $queue = $this->partitionQueues[$partition];
-                               if ( $queue->doBatchPush( $jobBatch, $flags ) ) {
+                               try {
+                                       $ok = $queue->doBatchPush( $jobBatch, $flags );
+                               } catch ( JobQueueError $e ) {
+                                       $ok = false;
+                                       wfDebugLog( 'exception', $e->getLogMessage() );
+                               }
+                               if ( $ok ) {
                                        $key = $this->getCacheKey( 'empty' );
                                        $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
                                } else {
@@ -278,7 +308,12 @@ class JobQueueFederated extends JobQueue {
                                break; // all partitions at 0 weight
                        }
                        $queue = $this->partitionQueues[$partition];
-                       $job = $queue->pop();
+                       try {
+                               $job = $queue->pop();
+                       } catch ( JobQueueError $e ) {
+                               $job = false;
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
                        if ( $job ) {
                                $job->metadata['QueuePartition'] = $partition;
                                return $job;
@@ -298,15 +333,49 @@ class JobQueueFederated extends JobQueue {
                return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
        }
 
+       protected function doIsRootJobOldDuplicate( Job $job ) {
+               $params = $job->getRootJobParams();
+               $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 );
+               try {
+                       return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job );
+               } catch ( MWException $e ) {
+                       if ( isset( $partitions[1] ) ) { // check fallback partition
+                               return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job );
+                       }
+               }
+               return false;
+       }
+
+       protected function doDeduplicateRootJob( Job $job ) {
+               $params = $job->getRootJobParams();
+               $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 );
+               try {
+                       return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job );
+               } catch ( MWException $e ) {
+                       if ( isset( $partitions[1] ) ) { // check fallback partition
+                               return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job );
+                       }
+               }
+               return false;
+       }
+
        protected function doDelete() {
                foreach ( $this->partitionQueues as $queue ) {
-                       $queue->doDelete();
+                       try {
+                               $queue->doDelete();
+                       } catch ( JobQueueError $e ) {
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
                }
        }
 
        protected function doWaitForBackups() {
                foreach ( $this->partitionQueues as $queue ) {
-                       $queue->waitForBackups();
+                       try {
+                               $queue->waitForBackups();
+                       } catch ( JobQueueError $e ) {
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
                }
        }