Moved some JobQueueAggregator logic out of JobQueueGroup
authorAaron Schulz <aschulz@wikimedia.org>
Mon, 16 Feb 2015 23:34:53 +0000 (15:34 -0800)
committerAaron Schulz <aschulz@wikimedia.org>
Wed, 4 Mar 2015 05:56:15 +0000 (21:56 -0800)
Change-Id: I28ba1a25db225d4cf5f503a6c0f4405f13118151

includes/jobqueue/JobQueue.php
includes/jobqueue/JobQueueDB.php
includes/jobqueue/JobQueueGroup.php
includes/jobqueue/aggregator/JobQueueAggregator.php
includes/jobqueue/aggregator/JobQueueAggregatorRedis.php

index 53fcaee..1a730d3 100644 (file)
@@ -49,6 +49,8 @@ abstract class JobQueue {
 
        /** @var BagOStuff */
        protected $dupCache;
+       /** @var JobQueueAggregator */
+       protected $aggr;
 
        const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
 
@@ -76,6 +78,9 @@ abstract class JobQueue {
                        throw new MWException( __CLASS__ . " does not support delayed jobs." );
                }
                $this->dupCache = wfGetCache( CACHE_ANYTHING );
+               $this->aggr = isset( $params['aggregator'] )
+                       ? $params['aggregator']
+                       : new JobQueueAggregatorNull( array() );
        }
 
        /**
@@ -298,7 +303,8 @@ abstract class JobQueue {
         * @throws JobQueueError
         */
        final public function push( $jobs, $flags = 0 ) {
-               $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
+               $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+               $this->batchPush( $jobs, $flags );
        }
 
        /**
@@ -327,6 +333,7 @@ abstract class JobQueue {
                }
 
                $this->doBatchPush( $jobs, $flags );
+               $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
        }
 
        /**
@@ -356,6 +363,10 @@ abstract class JobQueue {
 
                $job = $this->doPop();
 
+               if ( !$job ) {
+                       $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
+               }
+
                // Flag this job as an old duplicate based on its "root" job...
                try {
                        if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
index 5e8399c..d5f47ff 100644 (file)
@@ -686,7 +686,9 @@ class JobQueueDB extends JobQueue {
                                        $affected = $dbw->affectedRows();
                                        $count += $affected;
                                        JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki );
+                                       // The tasks recycled jobs or release delayed jobs into the queue
                                        $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
+                                       $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
                                }
                        }
 
index dbb85d7..ebd547a 100644 (file)
@@ -94,6 +94,7 @@ class JobQueueGroup {
                } else {
                        $conf = $conf + $wgJobTypeConf['default'];
                }
+               $conf['aggregator'] = JobQueueAggregator::singleton();
 
                return JobQueue::factory( $conf );
        }
@@ -125,7 +126,6 @@ class JobQueueGroup {
 
                foreach ( $jobsByType as $type => $jobs ) {
                        $this->get( $type )->push( $jobs );
-                       JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
                }
 
                if ( $this->cache->has( 'queues-ready', 'list' ) ) {
@@ -153,9 +153,6 @@ class JobQueueGroup {
                if ( is_string( $qtype ) ) { // specific job type
                        if ( !in_array( $qtype, $blacklist ) ) {
                                $job = $this->get( $qtype )->pop();
-                               if ( !$job ) {
-                                       JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
-                               }
                        }
                } else { // any job in the "default" jobs types
                        if ( $flags & self::USE_CACHE ) {
@@ -179,7 +176,6 @@ class JobQueueGroup {
                                if ( $job ) { // found
                                        break;
                                } else { // not found
-                                       JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
                                        $this->cache->clear( 'queues-ready' );
                                }
                        }
@@ -381,10 +377,6 @@ class JobQueueGroup {
                                        }
                                }
                        }
-                       // The tasks may have recycled jobs or release delayed jobs into the queue
-                       if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) {
-                               JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
-                       }
                }
 
                if ( $count === 0 ) {
index 4c2dfad..febc277 100644 (file)
@@ -34,7 +34,7 @@ abstract class JobQueueAggregator {
        /**
         * @param array $params
         */
-       protected function __construct( array $params ) {
+       public function __construct( array $params ) {
        }
 
        /**
index db9e764..847dd6f 100644 (file)
@@ -44,7 +44,7 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
         *                    If a hostname is specified but no port, the standard port number
         *                    6379 will be used. Required.
         */
-       protected function __construct( array $params ) {
+       public function __construct( array $params ) {
                parent::__construct( $params );
                $this->servers = isset( $params['redisServers'] )
                        ? $params['redisServers']