Merge "rdbms: make LBFactory close/rollback dangling handles like LoadBalancer"
[lhc/web/wiklou.git] / includes / jobqueue / JobQueueDB.php
index 7c78f40..d449e8a 100644 (file)
@@ -24,6 +24,7 @@ use Wikimedia\Rdbms\Database;
 use Wikimedia\Rdbms\DBConnectionError;
 use Wikimedia\Rdbms\DBError;
 use MediaWiki\MediaWikiServices;
+use Wikimedia\Rdbms\IMaintainableDatabase;
 use Wikimedia\ScopedCallback;
 
 /**
@@ -38,9 +39,7 @@ class JobQueueDB extends JobQueue {
        const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
        const MAX_OFFSET = 255; // integer; maximum number of rows to skip
 
-       /** @var WANObjectCache */
-       protected $cache;
-       /** @var IDatabase|DBError|null */
+       /** @var IMaintainableDatabase|DBError|null */
        protected $conn;
 
        /** @var array|null Server configuration array */
@@ -55,7 +54,6 @@ class JobQueueDB extends JobQueue {
         *               If not specified, the primary DB cluster for the wiki will be used.
         *               This can be overridden with a custom cluster so that DB handles will
         *               be retrieved via LBFactory::getExternalLB() and getConnection().
-        *   - wanCache : An instance of WANObjectCache to use for caching.
         * @param array $params
         */
        protected function __construct( array $params ) {
@@ -66,8 +64,6 @@ class JobQueueDB extends JobQueue {
                } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
                        $this->cluster = $params['cluster'];
                }
-
-               $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty();
        }
 
        protected function supportedOrders() {
@@ -104,7 +100,7 @@ class JobQueueDB extends JobQueue {
        protected function doGetSize() {
                $key = $this->getCacheKey( 'size' );
 
-               $size = $this->cache->get( $key );
+               $size = $this->wanCache->get( $key );
                if ( is_int( $size ) ) {
                        return $size;
                }
@@ -120,7 +116,7 @@ class JobQueueDB extends JobQueue {
                } catch ( DBError $e ) {
                        throw $this->getDBException( $e );
                }
-               $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
+               $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
 
                return $size;
        }
@@ -136,7 +132,7 @@ class JobQueueDB extends JobQueue {
 
                $key = $this->getCacheKey( 'acquiredcount' );
 
-               $count = $this->cache->get( $key );
+               $count = $this->wanCache->get( $key );
                if ( is_int( $count ) ) {
                        return $count;
                }
@@ -152,7 +148,7 @@ class JobQueueDB extends JobQueue {
                } catch ( DBError $e ) {
                        throw $this->getDBException( $e );
                }
-               $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+               $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
 
                return $count;
        }
@@ -169,7 +165,7 @@ class JobQueueDB extends JobQueue {
 
                $key = $this->getCacheKey( 'abandonedcount' );
 
-               $count = $this->cache->get( $key );
+               $count = $this->wanCache->get( $key );
                if ( is_int( $count ) ) {
                        return $count;
                }
@@ -190,7 +186,7 @@ class JobQueueDB extends JobQueue {
                        throw $this->getDBException( $e );
                }
 
-               $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+               $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
 
                return $count;
        }
@@ -345,7 +341,7 @@ class JobQueueDB extends JobQueue {
                /** @noinspection PhpUnusedLocalVariableInspection */
                $scope = $this->getScopedNoTrxFlag( $dbw );
                // Check cache to see if the queue has <= OFFSET items
-               $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
+               $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
 
                $invertedDirection = false; // whether one job_random direction was already scanned
                // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
@@ -385,7 +381,7 @@ class JobQueueDB extends JobQueue {
                                );
                                if ( !$row ) {
                                        $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
-                                       $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
+                                       $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
                                        continue; // use job_random
                                }
                        }
@@ -510,32 +506,17 @@ class JobQueueDB extends JobQueue {
         * @return bool
         */
        protected function doDeduplicateRootJob( IJobSpecification $job ) {
-               $params = $job->getParams();
-               if ( !isset( $params['rootJobSignature'] ) ) {
-                       throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
-               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
-                       throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
-               }
-               $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
-               // Callers should call JobQueueGroup::push() before this method so that if the insert
-               // fails, the de-duplication registration will be aborted. Since the insert is
-               // deferred till "transaction idle", do the same here, so that the ordering is
+               // Callers should call JobQueueGroup::push() before this method so that if the
+               // insert fails, the de-duplication registration will be aborted. Since the insert
+               // is deferred till "transaction idle", do the same here, so that the ordering is
                // maintained. Having only the de-duplication registration succeed would cause
                // jobs to become no-ops without any actual jobs that made them redundant.
                $dbw = $this->getMasterDB();
                /** @noinspection PhpUnusedLocalVariableInspection */
                $scope = $this->getScopedNoTrxFlag( $dbw );
-
-               $cache = $this->dupCache;
                $dbw->onTransactionCommitOrIdle(
-                       function () use ( $cache, $params, $key ) {
-                               $timestamp = $cache->get( $key ); // current last timestamp of this job
-                               if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
-                                       return true; // a newer version of this root job was enqueued
-                               }
-
-                               // Update the timestamp of the last root job started at the location...
-                               return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+                       function () use ( $job ) {
+                               parent::doDeduplicateRootJob( $job );
                        },
                        __METHOD__
                );
@@ -581,7 +562,7 @@ class JobQueueDB extends JobQueue {
         */
        protected function doFlushCaches() {
                foreach ( [ 'size', 'acquiredcount' ] as $type ) {
-                       $this->cache->delete( $this->getCacheKey( $type ) );
+                       $this->wanCache->delete( $this->getCacheKey( $type ) );
                }
        }
 
@@ -709,9 +690,9 @@ class JobQueueDB extends JobQueue {
                                        $dbw->update( 'job',
                                                [
                                                        'job_token' => '',
-                                                       'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
-                                               [
-                                                       'job_id' => $ids ],
+                                                       'job_token_timestamp' => $dbw->timestamp( $now ) // time of release
+                                               ],
+                                               [ 'job_id' => $ids, "job_token != ''" ],
                                                __METHOD__
                                        );
                                        $affected = $dbw->affectedRows();
@@ -789,7 +770,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @throws JobQueueConnectionError
-        * @return IDatabase
+        * @return IMaintainableDatabase
         */
        protected function getMasterDB() {
                try {
@@ -801,7 +782,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @param int $index (DB_REPLICA/DB_MASTER)
-        * @return IDatabase
+        * @return IMaintainableDatabase
         */
        protected function getDB( $index ) {
                if ( $this->server ) {
@@ -825,12 +806,16 @@ class JobQueueDB extends JobQueue {
                                ? $lbFactory->getExternalLB( $this->cluster )
                                : $lbFactory->getMainLB( $this->domain );
 
-                       return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
+                       if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) {
                                // Keep a separate connection to avoid contention and deadlocks;
                                // However, SQLite has the opposite behavior due to DB-level locking.
-                               ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT )
+                               $flags = $lb::CONN_TRX_AUTOCOMMIT;
+                       } else {
                                // Jobs insertion will be defered until the PRESEND stage to reduce contention.
-                               : $lb->getConnectionRef( $index, [], $this->domain );
+                               $flags = 0;
+                       }
+
+                       return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
                }
        }
 
@@ -856,7 +841,7 @@ class JobQueueDB extends JobQueue {
        private function getCacheKey( $property ) {
                $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
 
-               return $this->cache->makeGlobalKey(
+               return $this->wanCache->makeGlobalKey(
                        'jobqueue',
                        $this->domain,
                        $cluster,