X-Git-Url: http://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fjob%2FJobQueueFederated.php;h=35b80ca6a73739410cfae308da30d77c136e8c22;hb=a71728c990496101c740a33efa0b238d9fc2417d;hp=b517d559b46d1c839237099c269f67efe2beb76e;hpb=e278cf4370a64b11553cb7e9ae2bc97905cf80d7;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/job/JobQueueFederated.php b/includes/job/JobQueueFederated.php index b517d559b4..35b80ca6a7 100644 --- a/includes/job/JobQueueFederated.php +++ b/includes/job/JobQueueFederated.php @@ -55,6 +55,8 @@ class JobQueueFederated extends JobQueue { /** @var Array (partition names => integer) */ protected $partitionsNoPush = array(); + /** @var HashRing */ + protected $partitionRing; /** @var Array (partition name => JobQueue) */ protected $partitionQueues = array(); /** @var BagOStuff */ @@ -80,7 +82,9 @@ class JobQueueFederated extends JobQueue { */ protected function __construct( array $params ) { parent::__construct( $params ); - $this->sectionsByWiki = $params['sectionsByWiki']; + $this->sectionsByWiki = isset( $params['sectionsByWiki'] ) + ? $params['sectionsByWiki'] + : array(); // all in "default" section $this->partitionsBySection = $params['partitionsBySection']; $this->configByPartition = $params['configByPartition']; if ( isset( $params['partitionsNoPush'] ) ) { @@ -100,6 +104,12 @@ class JobQueueFederated extends JobQueue { $baseConfig + $this->configByPartition[$partition] ); } + // Get the ring of partitions to push job de-duplication information into + $partitionsTry = array_diff_key( + $this->getPartitionMap(), + $this->partitionsNoPush + ); // (partition => weight) + $this->partitionRing = new HashRing( $partitionsTry ); // Aggregate cache some per-queue values if there are multiple partition queues $this->cache = $this->isFederated() ? wfGetMainCache() : new EmptyBagOStuff(); } @@ -128,9 +138,13 @@ class JobQueueFederated extends JobQueue { } foreach ( $this->partitionQueues as $queue ) { - if ( !$queue->doIsEmpty() ) { - $this->cache->add( $key, 'false', self::CACHE_TTL_LONG ); - return false; + try { + if ( !$queue->doIsEmpty() ) { + $this->cache->add( $key, 'false', self::CACHE_TTL_LONG ); + return false; + } + } catch ( JobQueueError $e ) { + wfDebugLog( 'exception', $e->getLogMessage() ); } } @@ -169,7 +183,11 @@ class JobQueueFederated extends JobQueue { $count = 0; foreach ( $this->partitionQueues as $queue ) { - $count += $queue->$method(); + try { + $count += $queue->$method(); + } catch ( JobQueueError $e ) { + wfDebugLog( 'exception', $e->getLogMessage() ); + } } $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); @@ -234,7 +252,13 @@ class JobQueueFederated extends JobQueue { // Insert the de-duplicated jobs into the queues... foreach ( $uJobsByPartition as $partition => $jobBatch ) { $queue = $this->partitionQueues[$partition]; - if ( $queue->doBatchPush( $jobBatch, $flags ) ) { + try { + $ok = $queue->doBatchPush( $jobBatch, $flags ); + } catch ( JobQueueError $e ) { + $ok = false; + wfDebugLog( 'exception', $e->getLogMessage() ); + } + if ( $ok ) { $key = $this->getCacheKey( 'empty' ); $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); } else { @@ -249,7 +273,13 @@ class JobQueueFederated extends JobQueue { $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted } else { $queue = $this->partitionQueues[$partition]; - if ( $queue->doBatchPush( $jobBatch, $flags ) ) { + try { + $ok = $queue->doBatchPush( $jobBatch, $flags ); + } catch ( JobQueueError $e ) { + $ok = false; + wfDebugLog( 'exception', $e->getLogMessage() ); + } + if ( $ok ) { $key = $this->getCacheKey( 'empty' ); $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); } else { @@ -278,7 +308,12 @@ class JobQueueFederated extends JobQueue { break; // all partitions at 0 weight } $queue = $this->partitionQueues[$partition]; - $job = $queue->pop(); + try { + $job = $queue->pop(); + } catch ( JobQueueError $e ) { + $job = false; + wfDebugLog( 'exception', $e->getLogMessage() ); + } if ( $job ) { $job->metadata['QueuePartition'] = $partition; return $job; @@ -298,9 +333,49 @@ class JobQueueFederated extends JobQueue { return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job ); } + protected function doIsRootJobOldDuplicate( Job $job ) { + $params = $job->getRootJobParams(); + $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 ); + try { + return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job ); + } catch ( MWException $e ) { + if ( isset( $partitions[1] ) ) { // check fallback partition + return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job ); + } + } + return false; + } + + protected function doDeduplicateRootJob( Job $job ) { + $params = $job->getRootJobParams(); + $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 ); + try { + return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job ); + } catch ( MWException $e ) { + if ( isset( $partitions[1] ) ) { // check fallback partition + return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job ); + } + } + return false; + } + + protected function doDelete() { + foreach ( $this->partitionQueues as $queue ) { + try { + $queue->doDelete(); + } catch ( JobQueueError $e ) { + wfDebugLog( 'exception', $e->getLogMessage() ); + } + } + } + protected function doWaitForBackups() { foreach ( $this->partitionQueues as $queue ) { - $queue->waitForBackups(); + try { + $queue->waitForBackups(); + } catch ( JobQueueError $e ) { + wfDebugLog( 'exception', $e->getLogMessage() ); + } } }