jobqueue: made federated queue use HashRing for root job de-duplication
authorAaron Schulz <aschulz@wikimedia.org>
Fri, 31 May 2013 03:46:33 +0000 (20:46 -0700)
committerTim Starling <tstarling@wikimedia.org>
Fri, 26 Jul 2013 05:06:27 +0000 (05:06 +0000)
* This can spread entries out across job servers rather than always $wgMemc.
* Also made the sectionsByWiki config default to an empty array.

Change-Id: I176ff02eb4f05a1ea7d3bf93e0a10e074bb27d11

includes/job/JobQueueFederated.php

index db5b686..19de8bb 100644 (file)
@@ -55,6 +55,8 @@ class JobQueueFederated extends JobQueue {
        /** @var Array (partition names => integer) */
        protected $partitionsNoPush = array();
 
+       /** @var HashRing */
+       protected $partitionRing;
        /** @var Array (partition name => JobQueue) */
        protected $partitionQueues = array();
        /** @var BagOStuff */
@@ -80,7 +82,9 @@ class JobQueueFederated extends JobQueue {
         */
        protected function __construct( array $params ) {
                parent::__construct( $params );
-               $this->sectionsByWiki = $params['sectionsByWiki'];
+               $this->sectionsByWiki = isset( $params['sectionsByWiki'] )
+                       ? $params['sectionsByWiki']
+                       : array(); // all in "default" section
                $this->partitionsBySection = $params['partitionsBySection'];
                $this->configByPartition = $params['configByPartition'];
                if ( isset( $params['partitionsNoPush'] ) ) {
@@ -100,6 +104,12 @@ class JobQueueFederated extends JobQueue {
                                $baseConfig + $this->configByPartition[$partition]
                        );
                }
+               // Get the ring of partitions to push job de-duplication information into
+               $partitionsTry = array_diff_key(
+                       $this->getPartitionMap(),
+                       $this->partitionsNoPush
+               ); // (partition => weight)
+               $this->partitionRing = new HashRing( $partitionsTry );
                // Aggregate cache some per-queue values if there are multiple partition queues
                $this->cache = $this->isFederated() ? wfGetMainCache() : new EmptyBagOStuff();
        }
@@ -298,6 +308,32 @@ class JobQueueFederated extends JobQueue {
                return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
        }
 
+       protected function doIsRootJobOldDuplicate( Job $job ) {
+               $params = $job->getRootJobParams();
+               $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 );
+               try {
+                       return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job );
+               } catch ( MWException $e ) {
+                       if ( isset( $partitions[1] ) ) { // check fallback partition
+                               return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job );
+                       }
+               }
+               return false;
+       }
+
+       protected function doDeduplicateRootJob( Job $job ) {
+               $params = $job->getRootJobParams();
+               $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 );
+               try {
+                       return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job );
+               } catch ( MWException $e ) {
+                       if ( isset( $partitions[1] ) ) { // check fallback partition
+                               return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job );
+                       }
+               }
+               return false;
+       }
+
        protected function doDelete() {
                foreach ( $this->partitionQueues as $queue ) {
                        $queue->doDelete();