jobqueue: dependency inject more objects into JobQueue
authorAaron Schulz <aschulz@wikimedia.org>
Sat, 30 Mar 2019 04:41:34 +0000 (21:41 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Thu, 4 Apr 2019 08:32:59 +0000 (01:32 -0700)
Also moved some WikiMap/$wgJobClasses checks to JobQueueGroup::pop
which is the method callers are supposed to use.

Change-Id: I2ab82d8adc4ae1f54697d2935afa2053539cf2db

includes/jobqueue/JobQueue.php
includes/jobqueue/JobQueueDB.php
includes/jobqueue/JobQueueGroup.php
includes/jobqueue/JobQueueMemory.php
includes/jobqueue/JobQueueRedis.php
tests/phpunit/includes/jobqueue/JobQueueTest.php

index 8cfed3b..0644002 100644 (file)
@@ -20,7 +20,7 @@
  * @file
  * @defgroup JobQueue JobQueue
  */
-use MediaWiki\MediaWikiServices;
+use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
 
 /**
  * Class to handle enqueueing and running of background jobs
@@ -41,6 +41,8 @@ abstract class JobQueue {
        protected $maxTries;
        /** @var string|bool Read only rationale (or false if r/w) */
        protected $readOnlyReason;
+       /** @var StatsdDataFactoryInterface */
+       protected $stats;
 
        /** @var BagOStuff */
        protected $dupCache;
@@ -66,8 +68,9 @@ abstract class JobQueue {
                if ( !in_array( $this->order, $this->supportedOrders() ) ) {
                        throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
                }
-               $this->dupCache = wfGetCache( CACHE_ANYTHING );
                $this->readOnlyReason = $params['readOnlyReason'] ?? false;
+               $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
+               $this->dupCache = $params['stash'] ?? new EmptyBagOStuff();
        }
 
        /**
@@ -91,6 +94,8 @@ abstract class JobQueue {
         *                  of jobs simply means re-inserting them into the queue. Jobs can be
         *                  attempted up to three times before being discarded.
         *   - readOnlyReason : Set this to a string to make the queue read-only.
+        *   - stash      : A BagOStuff instance that can be used for root job deduplication
+        *   - stats      : A StatsdDataFactoryInterface [optional]
         *
         * Queue classes should throw an exception if they do not support the options given.
         *
@@ -112,7 +117,7 @@ abstract class JobQueue {
        }
 
        /**
-        * @return string Wiki ID
+        * @return string Database domain ID
         */
        final public function getDomain() {
                return $this->domain;
@@ -359,23 +364,14 @@ abstract class JobQueue {
         * @return Job|bool Returns false if there are no jobs
         */
        final public function pop() {
-               global $wgJobClasses;
-
                $this->assertNotReadOnly();
-               if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
-                       throw new JobQueueError(
-                               "Cannot pop '{$this->type}' job off foreign '{$this->domain}' wiki queue." );
-               } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
-                       // Do not pop jobs if there is no class for the queue type
-                       throw new JobQueueError( "Unrecognized job type '{$this->type}'." );
-               }
 
                $job = $this->doPop();
 
                // Flag this job as an old duplicate based on its "root" job...
                try {
                        if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
-                               self::incrStats( 'dupe_pops', $this->type );
+                               $this->incrStats( 'dupe_pops', $this->type );
                                $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
                        }
                } catch ( Exception $e ) {
@@ -480,7 +476,7 @@ abstract class JobQueue {
                }
 
                // Update the timestamp of the last root job started at the location...
-               return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+               return $this->dupCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
        }
 
        /**
@@ -707,12 +703,8 @@ abstract class JobQueue {
         * @param int $delta
         * @since 1.22
         */
-       public static function incrStats( $key, $type, $delta = 1 ) {
-               static $stats;
-               if ( !$stats ) {
-                       $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
-               }
-               $stats->updateCount( "jobqueue.{$key}.all", $delta );
-               $stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
+       protected function incrStats( $key, $type, $delta = 1 ) {
+               $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
+               $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
        }
 }
index 7aecfe9..c2772a6 100644 (file)
@@ -55,6 +55,7 @@ class JobQueueDB extends JobQueue {
         *               If not specified, the primary DB cluster for the wiki will be used.
         *               This can be overridden with a custom cluster so that DB handles will
         *               be retrieved via LBFactory::getExternalLB() and getConnection().
+        *   - wanCache : An instance of WANObjectCache to use for caching.
         * @param array $params
         */
        protected function __construct( array $params ) {
@@ -66,7 +67,7 @@ class JobQueueDB extends JobQueue {
                        $this->cluster = $params['cluster'];
                }
 
-               $this->cache = MediaWikiServices::getInstance()->getMainWANObjectCache();
+               $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty();
        }
 
        protected function supportedOrders() {
@@ -275,8 +276,8 @@ class JobQueueDB extends JobQueue {
                        foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
                                $dbw->insert( 'job', $rowBatch, $method );
                        }
-                       JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
-                       JobQueue::incrStats( 'dupe_inserts', $this->type,
+                       $this->incrStats( 'inserts', $this->type, count( $rows ) );
+                       $this->incrStats( 'dupe_inserts', $this->type,
                                count( $rowSet ) + count( $rowList ) - count( $rows )
                        );
                } catch ( DBError $e ) {
@@ -312,7 +313,7 @@ class JobQueueDB extends JobQueue {
                                if ( !$row ) {
                                        break; // nothing to do
                                }
-                               JobQueue::incrStats( 'pops', $this->type );
+                               $this->incrStats( 'pops', $this->type );
                                // Get the job object from the row...
                                $title = Title::makeTitle( $row->job_namespace, $row->job_title );
                                $job = Job::factory( $row->job_cmd, $title,
@@ -500,7 +501,7 @@ class JobQueueDB extends JobQueue {
                                __METHOD__
                        );
 
-                       JobQueue::incrStats( 'acks', $this->type );
+                       $this->incrStats( 'acks', $this->type );
                } catch ( DBError $e ) {
                        $this->throwDBException( $e );
                }
@@ -727,7 +728,7 @@ class JobQueueDB extends JobQueue {
                                        );
                                        $affected = $dbw->affectedRows();
                                        $count += $affected;
-                                       JobQueue::incrStats( 'recycles', $this->type, $affected );
+                                       $this->incrStats( 'recycles', $this->type, $affected );
                                }
                        }
 
@@ -753,7 +754,7 @@ class JobQueueDB extends JobQueue {
                                $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
                                $affected = $dbw->affectedRows();
                                $count += $affected;
-                               JobQueue::incrStats( 'abandons', $this->type, $affected );
+                               $this->incrStats( 'abandons', $this->type, $affected );
                        }
 
                        $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
index b9c4157..4bac304 100644 (file)
@@ -118,6 +118,11 @@ class JobQueueGroup {
                        $conf['readOnlyReason'] = $this->readOnlyReason;
                }
 
+               $services = MediaWikiServices::getInstance();
+               $conf['stats'] = $services->getStatsdDataFactory();
+               $conf['wanCache'] = $services->getMainWANObjectCache();
+               $conf['stash'] = $services->getMainObjectStash();
+
                return JobQueue::factory( $conf );
        }
 
@@ -232,8 +237,18 @@ class JobQueueGroup {
         * @return Job|bool Returns false on failure
         */
        public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
+               global $wgJobClasses;
+
                $job = false;
 
+               if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
+                       throw new JobQueueError(
+                               "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
+               } elseif ( is_string( $qtype ) && !isset( $wgJobClasses[$qtype] ) ) {
+                       // Do not pop jobs if there is no class for the queue type
+                       throw new JobQueueError( "Unrecognized job type '$qtype'." );
+               }
+
                if ( is_string( $qtype ) ) { // specific job type
                        if ( !in_array( $qtype, $blacklist ) ) {
                                $job = $this->get( $qtype )->pop();
index 6c45e96..b6c4005 100644 (file)
@@ -32,6 +32,12 @@ class JobQueueMemory extends JobQueue {
        /** @var array[] */
        protected static $data = [];
 
+       public function __construct( array $params ) {
+               parent::__construct( $params );
+
+               $this->dupCache = new HashBagOStuff();
+       }
+
        /**
         * @see JobQueue::doBatchPush
         *
@@ -43,10 +49,7 @@ class JobQueueMemory extends JobQueue {
 
                foreach ( $jobs as $job ) {
                        if ( $job->ignoreDuplicates() ) {
-                               $sha1 = Wikimedia\base_convert(
-                                       sha1( serialize( $job->getDeduplicationInfo() ) ),
-                                       16, 36, 31
-                               );
+                               $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
                                if ( !isset( $unclaimed[$sha1] ) ) {
                                        $unclaimed[$sha1] = $job;
                                }
index 4d07a09..98a5491 100644 (file)
@@ -225,9 +225,9 @@ class JobQueueRedis extends JobQueue {
                                        $failed += count( $itemBatch );
                                }
                        }
-                       JobQueue::incrStats( 'inserts', $this->type, count( $items ) );
-                       JobQueue::incrStats( 'inserts_actual', $this->type, $pushed );
-                       JobQueue::incrStats( 'dupe_inserts', $this->type,
+                       $this->incrStats( 'inserts', $this->type, count( $items ) );
+                       $this->incrStats( 'inserts_actual', $this->type, $pushed );
+                       $this->incrStats( 'dupe_inserts', $this->type,
                                count( $items ) - $failed - $pushed );
                        if ( $failed > 0 ) {
                                $err = "Could not insert {$failed} {$this->type} job(s).";
@@ -321,7 +321,7 @@ LUA;
                                        break; // no jobs; nothing to do
                                }
 
-                               JobQueue::incrStats( 'pops', $this->type );
+                               $this->incrStats( 'pops', $this->type );
                                $item = $this->unserialize( $blob );
                                if ( $item === false ) {
                                        wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
@@ -424,7 +424,7 @@ LUA;
                                return false;
                        }
 
-                       JobQueue::incrStats( 'acks', $this->type );
+                       $this->incrStats( 'acks', $this->type );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
index d38c6c7..81a80b6 100644 (file)
@@ -32,6 +32,8 @@ class JobQueueTest extends MediaWikiTestCase {
                }
                $baseConfig['type'] = 'null';
                $baseConfig['domain'] = WikiMap::getCurrentWikiDbDomain()->getId();
+               $baseConfig['stash'] = new HashBagOStuff();
+               $baseConfig['wanCache'] = new WANObjectCache( [ 'cache' => new HashBagOStuff() ] );
                $variants = [
                        'queueRand' => [ 'order' => 'random', 'claimTTL' => 0 ],
                        'queueRandTTL' => [ 'order' => 'random', 'claimTTL' => 10 ],