jobqueue: migrate root job deduplication to the WAN cache
authorAaron Schulz <aschulz@wikimedia.org>
Sat, 6 Jul 2019 05:43:45 +0000 (22:43 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Sat, 13 Jul 2019 22:38:17 +0000 (15:38 -0700)
If the root job timestamp keys are lost or otherwise unknown, they
will now be deductively recached with the best known values as jobs
are popped and executed. This means the running any of many child
jobs of a root job can restore the root timestamp if it was lost.
This does not need to use the main stash given this fact.

Bug: T227376
Change-Id: Iae0f3af15803af048ff49f3bf281b2bde18c87f2

includes/jobqueue/JobQueue.php
includes/jobqueue/JobQueueDB.php
includes/jobqueue/JobQueueGroup.php
includes/jobqueue/JobQueueMemory.php
includes/jobqueue/JobQueueRedis.php

index f5ed7b9..e52f295 100644 (file)
@@ -44,8 +44,8 @@ abstract class JobQueue {
        /** @var StatsdDataFactoryInterface */
        protected $stats;
 
-       /** @var BagOStuff */
-       protected $dupCache;
+       /** @var WANObjectCache */
+       protected $wanCache;
 
        const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
 
@@ -53,6 +53,14 @@ abstract class JobQueue {
 
        /**
         * @param array $params
+        *       - type           : A job type
+        *   - domain         : A DB domain ID
+        *   - wanCache       : An instance of WANObjectCache to use for caching [default: none]
+        *   - stats          : An instance of StatsdDataFactoryInterface [default: none]
+        *   - claimTTL       : Seconds a job can be claimed for exclusive execution [default: forever]
+        *   - maxTries       : Total times a job can be tried, assuming claims expire [default: 3]
+        *   - order          : Queue order, one of ("fifo", "timestamp", "random") [default: variable]
+        *   - readOnlyReason : Mark the queue as read-only with this reason [default: false]
         * @throws JobQueueError
         */
        protected function __construct( array $params ) {
@@ -70,7 +78,7 @@ abstract class JobQueue {
                }
                $this->readOnlyReason = $params['readOnlyReason'] ?? false;
                $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
-               $this->dupCache = $params['stash'] ?? new EmptyBagOStuff();
+               $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
        }
 
        /**
@@ -459,24 +467,23 @@ abstract class JobQueue {
         * @return bool
         */
        protected function doDeduplicateRootJob( IJobSpecification $job ) {
-               if ( !$job->hasRootJobParams() ) {
+               $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
+               if ( !$params ) {
                        throw new JobQueueError( "Cannot register root job; missing parameters." );
                }
-               $params = $job->getRootJobParams();
 
                $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
-               // Callers should call JobQueueGroup::push() before this method so that if the insert
-               // fails, the de-duplication registration will be aborted. Since the insert is
-               // deferred till "transaction idle", do the same here, so that the ordering is
-               // maintained. Having only the de-duplication registration succeed would cause
-               // jobs to become no-ops without any actual jobs that made them redundant.
-               $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
-               if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+               // Callers should call JobQueueGroup::push() before this method so that if the
+               // insert fails, the de-duplication registration will be aborted. Having only the
+               // de-duplication registration succeed would cause jobs to become no-ops without
+               // any actual jobs that made them redundant.
+               $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
+               if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
                        return true; // a newer version of this root job was enqueued
                }
 
                // Update the timestamp of the last root job started at the location...
-               return $this->dupCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
+               return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
        }
 
        /**
@@ -490,9 +497,8 @@ abstract class JobQueue {
                if ( $job->getType() !== $this->type ) {
                        throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
                }
-               $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
 
-               return $isDuplicate;
+               return $this->doIsRootJobOldDuplicate( $job );
        }
 
        /**
@@ -501,14 +507,18 @@ abstract class JobQueue {
         * @return bool
         */
        protected function doIsRootJobOldDuplicate( IJobSpecification $job ) {
-               if ( !$job->hasRootJobParams() ) {
+               $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
+               if ( !$params ) {
                        return false; // job has no de-deplication info
                }
-               $params = $job->getRootJobParams();
 
                $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
                // Get the last time this root job was enqueued
-               $timestamp = $this->dupCache->get( $key );
+               $timestamp = $this->wanCache->get( $key );
+               if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
+                       // Update the timestamp of the last known root job started at the location...
+                       $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
+               }
 
                // Check if a new root job was started at the location after this one's...
                return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
@@ -519,7 +529,7 @@ abstract class JobQueue {
         * @return string
         */
        protected function getRootJobCacheKey( $signature ) {
-               return $this->dupCache->makeGlobalKey(
+               return $this->wanCache->makeGlobalKey(
                        'jobqueue',
                        $this->domain,
                        $this->type,
index 7c78f40..f7b8ed2 100644 (file)
@@ -24,6 +24,7 @@ use Wikimedia\Rdbms\Database;
 use Wikimedia\Rdbms\DBConnectionError;
 use Wikimedia\Rdbms\DBError;
 use MediaWiki\MediaWikiServices;
+use Wikimedia\Rdbms\IMaintainableDatabase;
 use Wikimedia\ScopedCallback;
 
 /**
@@ -38,9 +39,7 @@ class JobQueueDB extends JobQueue {
        const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
        const MAX_OFFSET = 255; // integer; maximum number of rows to skip
 
-       /** @var WANObjectCache */
-       protected $cache;
-       /** @var IDatabase|DBError|null */
+       /** @var IMaintainableDatabase|DBError|null */
        protected $conn;
 
        /** @var array|null Server configuration array */
@@ -55,7 +54,6 @@ class JobQueueDB extends JobQueue {
         *               If not specified, the primary DB cluster for the wiki will be used.
         *               This can be overridden with a custom cluster so that DB handles will
         *               be retrieved via LBFactory::getExternalLB() and getConnection().
-        *   - wanCache : An instance of WANObjectCache to use for caching.
         * @param array $params
         */
        protected function __construct( array $params ) {
@@ -66,8 +64,6 @@ class JobQueueDB extends JobQueue {
                } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
                        $this->cluster = $params['cluster'];
                }
-
-               $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty();
        }
 
        protected function supportedOrders() {
@@ -104,7 +100,7 @@ class JobQueueDB extends JobQueue {
        protected function doGetSize() {
                $key = $this->getCacheKey( 'size' );
 
-               $size = $this->cache->get( $key );
+               $size = $this->wanCache->get( $key );
                if ( is_int( $size ) ) {
                        return $size;
                }
@@ -120,7 +116,7 @@ class JobQueueDB extends JobQueue {
                } catch ( DBError $e ) {
                        throw $this->getDBException( $e );
                }
-               $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
+               $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
 
                return $size;
        }
@@ -136,7 +132,7 @@ class JobQueueDB extends JobQueue {
 
                $key = $this->getCacheKey( 'acquiredcount' );
 
-               $count = $this->cache->get( $key );
+               $count = $this->wanCache->get( $key );
                if ( is_int( $count ) ) {
                        return $count;
                }
@@ -152,7 +148,7 @@ class JobQueueDB extends JobQueue {
                } catch ( DBError $e ) {
                        throw $this->getDBException( $e );
                }
-               $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+               $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
 
                return $count;
        }
@@ -169,7 +165,7 @@ class JobQueueDB extends JobQueue {
 
                $key = $this->getCacheKey( 'abandonedcount' );
 
-               $count = $this->cache->get( $key );
+               $count = $this->wanCache->get( $key );
                if ( is_int( $count ) ) {
                        return $count;
                }
@@ -190,7 +186,7 @@ class JobQueueDB extends JobQueue {
                        throw $this->getDBException( $e );
                }
 
-               $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+               $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
 
                return $count;
        }
@@ -345,7 +341,7 @@ class JobQueueDB extends JobQueue {
                /** @noinspection PhpUnusedLocalVariableInspection */
                $scope = $this->getScopedNoTrxFlag( $dbw );
                // Check cache to see if the queue has <= OFFSET items
-               $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
+               $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
 
                $invertedDirection = false; // whether one job_random direction was already scanned
                // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
@@ -385,7 +381,7 @@ class JobQueueDB extends JobQueue {
                                );
                                if ( !$row ) {
                                        $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
-                                       $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
+                                       $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
                                        continue; // use job_random
                                }
                        }
@@ -510,32 +506,17 @@ class JobQueueDB extends JobQueue {
         * @return bool
         */
        protected function doDeduplicateRootJob( IJobSpecification $job ) {
-               $params = $job->getParams();
-               if ( !isset( $params['rootJobSignature'] ) ) {
-                       throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
-               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
-                       throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
-               }
-               $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
-               // Callers should call JobQueueGroup::push() before this method so that if the insert
-               // fails, the de-duplication registration will be aborted. Since the insert is
-               // deferred till "transaction idle", do the same here, so that the ordering is
+               // Callers should call JobQueueGroup::push() before this method so that if the
+               // insert fails, the de-duplication registration will be aborted. Since the insert
+               // is deferred till "transaction idle", do the same here, so that the ordering is
                // maintained. Having only the de-duplication registration succeed would cause
                // jobs to become no-ops without any actual jobs that made them redundant.
                $dbw = $this->getMasterDB();
                /** @noinspection PhpUnusedLocalVariableInspection */
                $scope = $this->getScopedNoTrxFlag( $dbw );
-
-               $cache = $this->dupCache;
                $dbw->onTransactionCommitOrIdle(
-                       function () use ( $cache, $params, $key ) {
-                               $timestamp = $cache->get( $key ); // current last timestamp of this job
-                               if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
-                                       return true; // a newer version of this root job was enqueued
-                               }
-
-                               // Update the timestamp of the last root job started at the location...
-                               return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+                       function () use ( $job ) {
+                               parent::doDeduplicateRootJob( $job );
                        },
                        __METHOD__
                );
@@ -581,7 +562,7 @@ class JobQueueDB extends JobQueue {
         */
        protected function doFlushCaches() {
                foreach ( [ 'size', 'acquiredcount' ] as $type ) {
-                       $this->cache->delete( $this->getCacheKey( $type ) );
+                       $this->wanCache->delete( $this->getCacheKey( $type ) );
                }
        }
 
@@ -789,7 +770,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @throws JobQueueConnectionError
-        * @return IDatabase
+        * @return IMaintainableDatabase
         */
        protected function getMasterDB() {
                try {
@@ -801,7 +782,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @param int $index (DB_REPLICA/DB_MASTER)
-        * @return IDatabase
+        * @return IMaintainableDatabase
         */
        protected function getDB( $index ) {
                if ( $this->server ) {
@@ -825,12 +806,16 @@ class JobQueueDB extends JobQueue {
                                ? $lbFactory->getExternalLB( $this->cluster )
                                : $lbFactory->getMainLB( $this->domain );
 
-                       return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
+                       if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) {
                                // Keep a separate connection to avoid contention and deadlocks;
                                // However, SQLite has the opposite behavior due to DB-level locking.
-                               ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT )
+                               $flags = $lb::CONN_TRX_AUTOCOMMIT;
+                       } else {
                                // Jobs insertion will be defered until the PRESEND stage to reduce contention.
-                               : $lb->getConnectionRef( $index, [], $this->domain );
+                               $flags = 0;
+                       }
+
+                       return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
                }
        }
 
@@ -856,7 +841,7 @@ class JobQueueDB extends JobQueue {
        private function getCacheKey( $property ) {
                $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
 
-               return $this->cache->makeGlobalKey(
+               return $this->wanCache->makeGlobalKey(
                        'jobqueue',
                        $this->domain,
                        $cluster,
index 756724e..06cd04c 100644 (file)
@@ -121,7 +121,6 @@ class JobQueueGroup {
                $services = MediaWikiServices::getInstance();
                $conf['stats'] = $services->getStatsdDataFactory();
                $conf['wanCache'] = $services->getMainWANObjectCache();
-               $conf['stash'] = $services->getMainObjectStash();
 
                return JobQueue::factory( $conf );
        }
index cb20a76..b26129e 100644 (file)
@@ -33,9 +33,9 @@ class JobQueueMemory extends JobQueue {
        protected static $data = [];
 
        public function __construct( array $params ) {
-               parent::__construct( $params );
+               $params['wanCache'] = new WANObjectCache( [ 'cache' => new HashBagOStuff() ] );
 
-               $this->dupCache = new HashBagOStuff();
+               parent::__construct( $params );
        }
 
        /**
index 2140043..5113cbe 100644 (file)
@@ -449,7 +449,7 @@ LUA;
 
                $conn = $this->getConnection();
                try {
-                       $timestamp = $conn->get( $key ); // current last timestamp of this job
+                       $timestamp = $conn->get( $key ); // last known timestamp of such a root job
                        if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
                                return true; // a newer version of this root job was enqueued
                        }