Merge "jobqueue: migrate root job deduplication to the WAN cache"
authorjenkins-bot <jenkins-bot@gerrit.wikimedia.org>
Sat, 13 Jul 2019 23:30:56 +0000 (23:30 +0000)
committerGerrit Code Review <gerrit@wikimedia.org>
Sat, 13 Jul 2019 23:30:56 +0000 (23:30 +0000)
includes/jobqueue/JobQueue.php
includes/jobqueue/JobQueueDB.php
includes/jobqueue/JobQueueGroup.php
includes/jobqueue/JobQueueMemory.php
includes/jobqueue/JobQueueRedis.php

index f5ed7b9..e52f295 100644 (file)
@@ -44,8 +44,8 @@ abstract class JobQueue {
        /** @var StatsdDataFactoryInterface */
        protected $stats;
 
-       /** @var BagOStuff */
-       protected $dupCache;
+       /** @var WANObjectCache */
+       protected $wanCache;
 
        const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
 
@@ -53,6 +53,14 @@ abstract class JobQueue {
 
        /**
         * @param array $params
+        *       - type           : A job type
+        *   - domain         : A DB domain ID
+        *   - wanCache       : An instance of WANObjectCache to use for caching [default: none]
+        *   - stats          : An instance of StatsdDataFactoryInterface [default: none]
+        *   - claimTTL       : Seconds a job can be claimed for exclusive execution [default: forever]
+        *   - maxTries       : Total times a job can be tried, assuming claims expire [default: 3]
+        *   - order          : Queue order, one of ("fifo", "timestamp", "random") [default: variable]
+        *   - readOnlyReason : Mark the queue as read-only with this reason [default: false]
         * @throws JobQueueError
         */
        protected function __construct( array $params ) {
@@ -70,7 +78,7 @@ abstract class JobQueue {
                }
                $this->readOnlyReason = $params['readOnlyReason'] ?? false;
                $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
-               $this->dupCache = $params['stash'] ?? new EmptyBagOStuff();
+               $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
        }
 
        /**
@@ -459,24 +467,23 @@ abstract class JobQueue {
         * @return bool
         */
        protected function doDeduplicateRootJob( IJobSpecification $job ) {
-               if ( !$job->hasRootJobParams() ) {
+               $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
+               if ( !$params ) {
                        throw new JobQueueError( "Cannot register root job; missing parameters." );
                }
-               $params = $job->getRootJobParams();
 
                $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
-               // Callers should call JobQueueGroup::push() before this method so that if the insert
-               // fails, the de-duplication registration will be aborted. Since the insert is
-               // deferred till "transaction idle", do the same here, so that the ordering is
-               // maintained. Having only the de-duplication registration succeed would cause
-               // jobs to become no-ops without any actual jobs that made them redundant.
-               $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
-               if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+               // Callers should call JobQueueGroup::push() before this method so that if the
+               // insert fails, the de-duplication registration will be aborted. Having only the
+               // de-duplication registration succeed would cause jobs to become no-ops without
+               // any actual jobs that made them redundant.
+               $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
+               if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
                        return true; // a newer version of this root job was enqueued
                }
 
                // Update the timestamp of the last root job started at the location...
-               return $this->dupCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
+               return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
        }
 
        /**
@@ -490,9 +497,8 @@ abstract class JobQueue {
                if ( $job->getType() !== $this->type ) {
                        throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
                }
-               $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
 
-               return $isDuplicate;
+               return $this->doIsRootJobOldDuplicate( $job );
        }
 
        /**
@@ -501,14 +507,18 @@ abstract class JobQueue {
         * @return bool
         */
        protected function doIsRootJobOldDuplicate( IJobSpecification $job ) {
-               if ( !$job->hasRootJobParams() ) {
+               $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
+               if ( !$params ) {
                        return false; // job has no de-deplication info
                }
-               $params = $job->getRootJobParams();
 
                $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
                // Get the last time this root job was enqueued
-               $timestamp = $this->dupCache->get( $key );
+               $timestamp = $this->wanCache->get( $key );
+               if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
+                       // Update the timestamp of the last known root job started at the location...
+                       $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
+               }
 
                // Check if a new root job was started at the location after this one's...
                return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
@@ -519,7 +529,7 @@ abstract class JobQueue {
         * @return string
         */
        protected function getRootJobCacheKey( $signature ) {
-               return $this->dupCache->makeGlobalKey(
+               return $this->wanCache->makeGlobalKey(
                        'jobqueue',
                        $this->domain,
                        $this->type,
index 7c78f40..f7b8ed2 100644 (file)
@@ -24,6 +24,7 @@ use Wikimedia\Rdbms\Database;
 use Wikimedia\Rdbms\DBConnectionError;
 use Wikimedia\Rdbms\DBError;
 use MediaWiki\MediaWikiServices;
+use Wikimedia\Rdbms\IMaintainableDatabase;
 use Wikimedia\ScopedCallback;
 
 /**
@@ -38,9 +39,7 @@ class JobQueueDB extends JobQueue {
        const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
        const MAX_OFFSET = 255; // integer; maximum number of rows to skip
 
-       /** @var WANObjectCache */
-       protected $cache;
-       /** @var IDatabase|DBError|null */
+       /** @var IMaintainableDatabase|DBError|null */
        protected $conn;
 
        /** @var array|null Server configuration array */
@@ -55,7 +54,6 @@ class JobQueueDB extends JobQueue {
         *               If not specified, the primary DB cluster for the wiki will be used.
         *               This can be overridden with a custom cluster so that DB handles will
         *               be retrieved via LBFactory::getExternalLB() and getConnection().
-        *   - wanCache : An instance of WANObjectCache to use for caching.
         * @param array $params
         */
        protected function __construct( array $params ) {
@@ -66,8 +64,6 @@ class JobQueueDB extends JobQueue {
                } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
                        $this->cluster = $params['cluster'];
                }
-
-               $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty();
        }
 
        protected function supportedOrders() {
@@ -104,7 +100,7 @@ class JobQueueDB extends JobQueue {
        protected function doGetSize() {
                $key = $this->getCacheKey( 'size' );
 
-               $size = $this->cache->get( $key );
+               $size = $this->wanCache->get( $key );
                if ( is_int( $size ) ) {
                        return $size;
                }
@@ -120,7 +116,7 @@ class JobQueueDB extends JobQueue {
                } catch ( DBError $e ) {
                        throw $this->getDBException( $e );
                }
-               $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
+               $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
 
                return $size;
        }
@@ -136,7 +132,7 @@ class JobQueueDB extends JobQueue {
 
                $key = $this->getCacheKey( 'acquiredcount' );
 
-               $count = $this->cache->get( $key );
+               $count = $this->wanCache->get( $key );
                if ( is_int( $count ) ) {
                        return $count;
                }
@@ -152,7 +148,7 @@ class JobQueueDB extends JobQueue {
                } catch ( DBError $e ) {
                        throw $this->getDBException( $e );
                }
-               $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+               $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
 
                return $count;
        }
@@ -169,7 +165,7 @@ class JobQueueDB extends JobQueue {
 
                $key = $this->getCacheKey( 'abandonedcount' );
 
-               $count = $this->cache->get( $key );
+               $count = $this->wanCache->get( $key );
                if ( is_int( $count ) ) {
                        return $count;
                }
@@ -190,7 +186,7 @@ class JobQueueDB extends JobQueue {
                        throw $this->getDBException( $e );
                }
 
-               $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+               $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
 
                return $count;
        }
@@ -345,7 +341,7 @@ class JobQueueDB extends JobQueue {
                /** @noinspection PhpUnusedLocalVariableInspection */
                $scope = $this->getScopedNoTrxFlag( $dbw );
                // Check cache to see if the queue has <= OFFSET items
-               $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
+               $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
 
                $invertedDirection = false; // whether one job_random direction was already scanned
                // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
@@ -385,7 +381,7 @@ class JobQueueDB extends JobQueue {
                                );
                                if ( !$row ) {
                                        $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
-                                       $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
+                                       $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
                                        continue; // use job_random
                                }
                        }
@@ -510,32 +506,17 @@ class JobQueueDB extends JobQueue {
         * @return bool
         */
        protected function doDeduplicateRootJob( IJobSpecification $job ) {
-               $params = $job->getParams();
-               if ( !isset( $params['rootJobSignature'] ) ) {
-                       throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
-               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
-                       throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
-               }
-               $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
-               // Callers should call JobQueueGroup::push() before this method so that if the insert
-               // fails, the de-duplication registration will be aborted. Since the insert is
-               // deferred till "transaction idle", do the same here, so that the ordering is
+               // Callers should call JobQueueGroup::push() before this method so that if the
+               // insert fails, the de-duplication registration will be aborted. Since the insert
+               // is deferred till "transaction idle", do the same here, so that the ordering is
                // maintained. Having only the de-duplication registration succeed would cause
                // jobs to become no-ops without any actual jobs that made them redundant.
                $dbw = $this->getMasterDB();
                /** @noinspection PhpUnusedLocalVariableInspection */
                $scope = $this->getScopedNoTrxFlag( $dbw );
-
-               $cache = $this->dupCache;
                $dbw->onTransactionCommitOrIdle(
-                       function () use ( $cache, $params, $key ) {
-                               $timestamp = $cache->get( $key ); // current last timestamp of this job
-                               if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
-                                       return true; // a newer version of this root job was enqueued
-                               }
-
-                               // Update the timestamp of the last root job started at the location...
-                               return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+                       function () use ( $job ) {
+                               parent::doDeduplicateRootJob( $job );
                        },
                        __METHOD__
                );
@@ -581,7 +562,7 @@ class JobQueueDB extends JobQueue {
         */
        protected function doFlushCaches() {
                foreach ( [ 'size', 'acquiredcount' ] as $type ) {
-                       $this->cache->delete( $this->getCacheKey( $type ) );
+                       $this->wanCache->delete( $this->getCacheKey( $type ) );
                }
        }
 
@@ -789,7 +770,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @throws JobQueueConnectionError
-        * @return IDatabase
+        * @return IMaintainableDatabase
         */
        protected function getMasterDB() {
                try {
@@ -801,7 +782,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @param int $index (DB_REPLICA/DB_MASTER)
-        * @return IDatabase
+        * @return IMaintainableDatabase
         */
        protected function getDB( $index ) {
                if ( $this->server ) {
@@ -825,12 +806,16 @@ class JobQueueDB extends JobQueue {
                                ? $lbFactory->getExternalLB( $this->cluster )
                                : $lbFactory->getMainLB( $this->domain );
 
-                       return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
+                       if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) {
                                // Keep a separate connection to avoid contention and deadlocks;
                                // However, SQLite has the opposite behavior due to DB-level locking.
-                               ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT )
+                               $flags = $lb::CONN_TRX_AUTOCOMMIT;
+                       } else {
                                // Jobs insertion will be defered until the PRESEND stage to reduce contention.
-                               : $lb->getConnectionRef( $index, [], $this->domain );
+                               $flags = 0;
+                       }
+
+                       return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
                }
        }
 
@@ -856,7 +841,7 @@ class JobQueueDB extends JobQueue {
        private function getCacheKey( $property ) {
                $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
 
-               return $this->cache->makeGlobalKey(
+               return $this->wanCache->makeGlobalKey(
                        'jobqueue',
                        $this->domain,
                        $cluster,
index 756724e..06cd04c 100644 (file)
@@ -121,7 +121,6 @@ class JobQueueGroup {
                $services = MediaWikiServices::getInstance();
                $conf['stats'] = $services->getStatsdDataFactory();
                $conf['wanCache'] = $services->getMainWANObjectCache();
-               $conf['stash'] = $services->getMainObjectStash();
 
                return JobQueue::factory( $conf );
        }
index cb20a76..b26129e 100644 (file)
@@ -33,9 +33,9 @@ class JobQueueMemory extends JobQueue {
        protected static $data = [];
 
        public function __construct( array $params ) {
-               parent::__construct( $params );
+               $params['wanCache'] = new WANObjectCache( [ 'cache' => new HashBagOStuff() ] );
 
-               $this->dupCache = new HashBagOStuff();
+               parent::__construct( $params );
        }
 
        /**
index b8a5ad2..569a5d4 100644 (file)
@@ -451,7 +451,7 @@ LUA;
 
                $conn = $this->getConnection();
                try {
-                       $timestamp = $conn->get( $key ); // current last timestamp of this job
+                       $timestamp = $conn->get( $key ); // last known timestamp of such a root job
                        if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
                                return true; // a newer version of this root job was enqueued
                        }