Small JobQueueFederated performance tweaks
authorAaron Schulz <aschulz@wikimedia.org>
Fri, 18 Oct 2013 03:00:24 +0000 (20:00 -0700)
committerBryanDavis <bdavis@wikimedia.org>
Fri, 18 Oct 2013 22:38:09 +0000 (22:38 +0000)
* Made "find non-empty queues" and isEmpty() checks start with the most
  weighted partitions first
* Cleaned up a few exception catches to use JobQueueError
* Added short-circuit for doGetSiblingQueuesWithJobs()

Change-Id: I4d96a8c61c6b1c41dbb44000ccda0fa05a418a09

includes/job/JobQueueFederated.php

index fb2ef0d..d3ce164 100644 (file)
@@ -47,9 +47,9 @@
  * @since 1.22
  */
 class JobQueueFederated extends JobQueue {
-       /** @var Array (partition name => weight) */
+       /** @var Array (partition name => weight) reverse sorted by weight */
        protected $partitionMap = array();
-       /** @var Array (partition name => JobQueue) */
+       /** @var Array (partition name => JobQueue) reverse sorted by weight */
        protected $partitionQueues = array();
        /** @var HashRing */
        protected $partitionPushRing;
@@ -82,7 +82,10 @@ class JobQueueFederated extends JobQueue {
                if ( !isset( $params['partitionsBySection'][$section] ) ) {
                        throw new MWException( "No configuration for section '$section'." );
                }
+               // Get the full partition map
                $this->partitionMap = $params['partitionsBySection'][$section];
+               arsort( $this->partitionMap, SORT_NUMERIC );
+               // Get the partitions jobs can actually be pushed to
                $partitionPushMap = $this->partitionMap;
                if ( isset( $params['partitionsNoPush'] ) ) {
                        foreach ( $params['partitionsNoPush'] as $partition ) {
@@ -104,7 +107,7 @@ class JobQueueFederated extends JobQueue {
                        $this->partitionQueues[$partition] = JobQueue::factory(
                                $baseConfig + $params['configByPartition'][$partition] );
                }
-               // Get the ring of partitions to push job de-duplication information into
+               // Get the ring of partitions to push jobs into
                $this->partitionPushRing = new HashRing( $partitionPushMap );
                // Aggregate cache some per-queue values if there are multiple partition queues
                $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
@@ -327,7 +330,7 @@ class JobQueueFederated extends JobQueue {
                $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
                try {
                        return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job );
-               } catch ( MWException $e ) {
+               } catch ( JobQueueError $e ) {
                        if ( isset( $partitions[1] ) ) { // check fallback partition
                                return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job );
                        }
@@ -340,7 +343,7 @@ class JobQueueFederated extends JobQueue {
                $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
                try {
                        return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job );
-               } catch ( MWException $e ) {
+               } catch ( JobQueueError $e ) {
                        if ( isset( $partitions[1] ) ) { // check fallback partition
                                return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job );
                        }
@@ -421,15 +424,18 @@ class JobQueueFederated extends JobQueue {
                        try {
                                $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
                                if ( is_array( $nonEmpty ) ) {
-                                       $result = array_merge( $result, $nonEmpty );
+                                       $result = array_unique( array_merge( $result, $nonEmpty ) );
                                } else {
                                        return null; // not supported on all partitions; bail
                                }
+                               if ( count( $result ) == count( $types ) ) {
+                                       break; // short-circuit
+                               }
                        } catch ( JobQueueError $e ) {
                                MWExceptionHandler::logException( $e );
                        }
                }
-               return array_values( array_unique( $result ) );
+               return array_values( $result );
        }
 
        protected function doGetSiblingQueueSizes( array $types ) {