Merge "objectcache: fix race conditions in RedisBagOStuff::incr()"
[lhc/web/wiklou.git] / includes / jobqueue / JobQueueDB.php
index c2772a6..f7b8ed2 100644 (file)
@@ -24,6 +24,7 @@ use Wikimedia\Rdbms\Database;
 use Wikimedia\Rdbms\DBConnectionError;
 use Wikimedia\Rdbms\DBError;
 use MediaWiki\MediaWikiServices;
+use Wikimedia\Rdbms\IMaintainableDatabase;
 use Wikimedia\ScopedCallback;
 
 /**
@@ -38,9 +39,7 @@ class JobQueueDB extends JobQueue {
        const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
        const MAX_OFFSET = 255; // integer; maximum number of rows to skip
 
-       /** @var WANObjectCache */
-       protected $cache;
-       /** @var IDatabase|DBError|null */
+       /** @var IMaintainableDatabase|DBError|null */
        protected $conn;
 
        /** @var array|null Server configuration array */
@@ -55,7 +54,6 @@ class JobQueueDB extends JobQueue {
         *               If not specified, the primary DB cluster for the wiki will be used.
         *               This can be overridden with a custom cluster so that DB handles will
         *               be retrieved via LBFactory::getExternalLB() and getConnection().
-        *   - wanCache : An instance of WANObjectCache to use for caching.
         * @param array $params
         */
        protected function __construct( array $params ) {
@@ -66,8 +64,6 @@ class JobQueueDB extends JobQueue {
                } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
                        $this->cluster = $params['cluster'];
                }
-
-               $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty();
        }
 
        protected function supportedOrders() {
@@ -91,7 +87,7 @@ class JobQueueDB extends JobQueue {
                                'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
                        );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
 
                return !$found;
@@ -104,7 +100,7 @@ class JobQueueDB extends JobQueue {
        protected function doGetSize() {
                $key = $this->getCacheKey( 'size' );
 
-               $size = $this->cache->get( $key );
+               $size = $this->wanCache->get( $key );
                if ( is_int( $size ) ) {
                        return $size;
                }
@@ -118,9 +114,9 @@ class JobQueueDB extends JobQueue {
                                __METHOD__
                        );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
-               $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
+               $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
 
                return $size;
        }
@@ -136,7 +132,7 @@ class JobQueueDB extends JobQueue {
 
                $key = $this->getCacheKey( 'acquiredcount' );
 
-               $count = $this->cache->get( $key );
+               $count = $this->wanCache->get( $key );
                if ( is_int( $count ) ) {
                        return $count;
                }
@@ -150,9 +146,9 @@ class JobQueueDB extends JobQueue {
                                __METHOD__
                        );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
-               $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+               $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
 
                return $count;
        }
@@ -169,7 +165,7 @@ class JobQueueDB extends JobQueue {
 
                $key = $this->getCacheKey( 'abandonedcount' );
 
-               $count = $this->cache->get( $key );
+               $count = $this->wanCache->get( $key );
                if ( is_int( $count ) ) {
                        return $count;
                }
@@ -187,10 +183,10 @@ class JobQueueDB extends JobQueue {
                                __METHOD__
                        );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
 
-               $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+               $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
 
                return $count;
        }
@@ -281,7 +277,7 @@ class JobQueueDB extends JobQueue {
                                count( $rowSet ) + count( $rowList ) - count( $rows )
                        );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
                if ( $flags & self::QOS_ATOMIC ) {
                        $dbw->endAtomic( $method );
@@ -290,7 +286,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @see JobQueue::doPop()
-        * @return Job|bool
+        * @return RunnableJob|bool
         */
        protected function doPop() {
                $dbw = $this->getMasterDB();
@@ -314,12 +310,9 @@ class JobQueueDB extends JobQueue {
                                        break; // nothing to do
                                }
                                $this->incrStats( 'pops', $this->type );
+
                                // Get the job object from the row...
-                               $title = Title::makeTitle( $row->job_namespace, $row->job_title );
-                               $job = Job::factory( $row->job_cmd, $title,
-                                       self::extractBlob( $row->job_params ) );
-                               $job->setMetadata( 'id', $row->job_id );
-                               $job->setMetadata( 'timestamp', $row->job_timestamp );
+                               $job = $this->jobFromRow( $row );
                                break; // done
                        } while ( true );
 
@@ -329,7 +322,7 @@ class JobQueueDB extends JobQueue {
                                $this->recycleAndDeleteStaleJobs();
                        }
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
 
                return $job;
@@ -348,9 +341,8 @@ class JobQueueDB extends JobQueue {
                /** @noinspection PhpUnusedLocalVariableInspection */
                $scope = $this->getScopedNoTrxFlag( $dbw );
                // Check cache to see if the queue has <= OFFSET items
-               $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
+               $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
 
-               $row = false; // the row acquired
                $invertedDirection = false; // whether one job_random direction was already scanned
                // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
                // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
@@ -389,7 +381,7 @@ class JobQueueDB extends JobQueue {
                                );
                                if ( !$row ) {
                                        $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
-                                       $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
+                                       $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
                                        continue; // use job_random
                                }
                        }
@@ -481,10 +473,10 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @see JobQueue::doAck()
-        * @param Job $job
+        * @param RunnableJob $job
         * @throws MWException
         */
-       protected function doAck( Job $job ) {
+       protected function doAck( RunnableJob $job ) {
                $id = $job->getMetadata( 'id' );
                if ( $id === null ) {
                        throw new MWException( "Job of type '{$job->getType()}' has no ID." );
@@ -503,7 +495,7 @@ class JobQueueDB extends JobQueue {
 
                        $this->incrStats( 'acks', $this->type );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
        }
 
@@ -514,32 +506,17 @@ class JobQueueDB extends JobQueue {
         * @return bool
         */
        protected function doDeduplicateRootJob( IJobSpecification $job ) {
-               $params = $job->getParams();
-               if ( !isset( $params['rootJobSignature'] ) ) {
-                       throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
-               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
-                       throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
-               }
-               $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
-               // Callers should call JobQueueGroup::push() before this method so that if the insert
-               // fails, the de-duplication registration will be aborted. Since the insert is
-               // deferred till "transaction idle", do the same here, so that the ordering is
+               // Callers should call JobQueueGroup::push() before this method so that if the
+               // insert fails, the de-duplication registration will be aborted. Since the insert
+               // is deferred till "transaction idle", do the same here, so that the ordering is
                // maintained. Having only the de-duplication registration succeed would cause
                // jobs to become no-ops without any actual jobs that made them redundant.
                $dbw = $this->getMasterDB();
                /** @noinspection PhpUnusedLocalVariableInspection */
                $scope = $this->getScopedNoTrxFlag( $dbw );
-
-               $cache = $this->dupCache;
                $dbw->onTransactionCommitOrIdle(
-                       function () use ( $cache, $params, $key ) {
-                               $timestamp = $cache->get( $key ); // current last timestamp of this job
-                               if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
-                                       return true; // a newer version of this root job was enqueued
-                               }
-
-                               // Update the timestamp of the last root job started at the location...
-                               return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+                       function () use ( $job ) {
+                               parent::doDeduplicateRootJob( $job );
                        },
                        __METHOD__
                );
@@ -558,7 +535,7 @@ class JobQueueDB extends JobQueue {
                try {
                        $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
 
                return true;
@@ -585,7 +562,7 @@ class JobQueueDB extends JobQueue {
         */
        protected function doFlushCaches() {
                foreach ( [ 'size', 'acquiredcount' ] as $type ) {
-                       $this->cache->delete( $this->getCacheKey( $type ) );
+                       $this->wanCache->delete( $this->getCacheKey( $type ) );
                }
        }
 
@@ -617,19 +594,11 @@ class JobQueueDB extends JobQueue {
                        return new MappedIterator(
                                $dbr->select( 'job', self::selectFields(), $conds ),
                                function ( $row ) {
-                                       $job = Job::factory(
-                                               $row->job_cmd,
-                                               Title::makeTitle( $row->job_namespace, $row->job_title ),
-                                               strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
-                                       );
-                                       $job->setMetadata( 'id', $row->job_id );
-                                       $job->setMetadata( 'timestamp', $row->job_timestamp );
-
-                                       return $job;
+                                       return $this->jobFromRow( $row );
                                }
                        );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
        }
 
@@ -759,7 +728,7 @@ class JobQueueDB extends JobQueue {
 
                        $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
 
                return $count;
@@ -774,8 +743,8 @@ class JobQueueDB extends JobQueue {
                return [
                        // Fields that describe the nature of the job
                        'job_cmd' => $job->getType(),
-                       'job_namespace' => $job->getTitle()->getNamespace(),
-                       'job_title' => $job->getTitle()->getDBkey(),
+                       'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
+                       'job_title' => $job->getParams()['title'] ?? '',
                        'job_params' => self::makeBlob( $job->getParams() ),
                        // Additional job metadata
                        'job_timestamp' => $db->timestamp(),
@@ -801,7 +770,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @throws JobQueueConnectionError
-        * @return IDatabase
+        * @return IMaintainableDatabase
         */
        protected function getMasterDB() {
                try {
@@ -813,7 +782,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @param int $index (DB_REPLICA/DB_MASTER)
-        * @return IDatabase
+        * @return IMaintainableDatabase
         */
        protected function getDB( $index ) {
                if ( $this->server ) {
@@ -837,12 +806,16 @@ class JobQueueDB extends JobQueue {
                                ? $lbFactory->getExternalLB( $this->cluster )
                                : $lbFactory->getMainLB( $this->domain );
 
-                       return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
+                       if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) {
                                // Keep a separate connection to avoid contention and deadlocks;
                                // However, SQLite has the opposite behavior due to DB-level locking.
-                               ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT )
+                               $flags = $lb::CONN_TRX_AUTOCOMMIT;
+                       } else {
                                // Jobs insertion will be defered until the PRESEND stage to reduce contention.
-                               : $lb->getConnectionRef( $index, [], $this->domain );
+                               $flags = 0;
+                       }
+
+                       return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
                }
        }
 
@@ -868,7 +841,7 @@ class JobQueueDB extends JobQueue {
        private function getCacheKey( $property ) {
                $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
 
-               return $this->cache->makeGlobalKey(
+               return $this->wanCache->makeGlobalKey(
                        'jobqueue',
                        $this->domain,
                        $cluster,
@@ -890,23 +863,30 @@ class JobQueueDB extends JobQueue {
        }
 
        /**
-        * @param string $blob
-        * @return bool|mixed
+        * @param stdClass $row
+        * @return RunnableJob|null
         */
-       protected static function extractBlob( $blob ) {
-               if ( (string)$blob !== '' ) {
-                       return unserialize( $blob );
-               } else {
-                       return false;
+       protected function jobFromRow( $row ) {
+               $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
+               if ( !is_array( $params ) ) { // this shouldn't happen
+                       throw new UnexpectedValueException(
+                               "Could not unserialize job with ID '{$row->job_id}'." );
                }
+
+               $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
+               $job = $this->factoryJob( $row->job_cmd, $params );
+               $job->setMetadata( 'id', $row->job_id );
+               $job->setMetadata( 'timestamp', $row->job_timestamp );
+
+               return $job;
        }
 
        /**
         * @param DBError $e
-        * @throws JobQueueError
+        * @return JobQueueError
         */
-       protected function throwDBException( DBError $e ) {
-               throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
+       protected function getDBException( DBError $e ) {
+               return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
        }
 
        /**