X-Git-Url: https://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fjobqueue%2FJobQueueDB.php;h=d449e8a259356179432ab16a151c976e2d0d2b36;hb=b7b9cea32981e9bc1425b771b910b7f2ba1d0bde;hp=7c78f400318f3c75a158cec60e69cb3e4e5874ad;hpb=aac6b26c0bafc81287bb042304f1d346da94dc89;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php index 7c78f40031..d449e8a259 100644 --- a/includes/jobqueue/JobQueueDB.php +++ b/includes/jobqueue/JobQueueDB.php @@ -24,6 +24,7 @@ use Wikimedia\Rdbms\Database; use Wikimedia\Rdbms\DBConnectionError; use Wikimedia\Rdbms\DBError; use MediaWiki\MediaWikiServices; +use Wikimedia\Rdbms\IMaintainableDatabase; use Wikimedia\ScopedCallback; /** @@ -38,9 +39,7 @@ class JobQueueDB extends JobQueue { const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random const MAX_OFFSET = 255; // integer; maximum number of rows to skip - /** @var WANObjectCache */ - protected $cache; - /** @var IDatabase|DBError|null */ + /** @var IMaintainableDatabase|DBError|null */ protected $conn; /** @var array|null Server configuration array */ @@ -55,7 +54,6 @@ class JobQueueDB extends JobQueue { * If not specified, the primary DB cluster for the wiki will be used. * This can be overridden with a custom cluster so that DB handles will * be retrieved via LBFactory::getExternalLB() and getConnection(). - * - wanCache : An instance of WANObjectCache to use for caching. * @param array $params */ protected function __construct( array $params ) { @@ -66,8 +64,6 @@ class JobQueueDB extends JobQueue { } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) { $this->cluster = $params['cluster']; } - - $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty(); } protected function supportedOrders() { @@ -104,7 +100,7 @@ class JobQueueDB extends JobQueue { protected function doGetSize() { $key = $this->getCacheKey( 'size' ); - $size = $this->cache->get( $key ); + $size = $this->wanCache->get( $key ); if ( is_int( $size ) ) { return $size; } @@ -120,7 +116,7 @@ class JobQueueDB extends JobQueue { } catch ( DBError $e ) { throw $this->getDBException( $e ); } - $this->cache->set( $key, $size, self::CACHE_TTL_SHORT ); + $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT ); return $size; } @@ -136,7 +132,7 @@ class JobQueueDB extends JobQueue { $key = $this->getCacheKey( 'acquiredcount' ); - $count = $this->cache->get( $key ); + $count = $this->wanCache->get( $key ); if ( is_int( $count ) ) { return $count; } @@ -152,7 +148,7 @@ class JobQueueDB extends JobQueue { } catch ( DBError $e ) { throw $this->getDBException( $e ); } - $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); + $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT ); return $count; } @@ -169,7 +165,7 @@ class JobQueueDB extends JobQueue { $key = $this->getCacheKey( 'abandonedcount' ); - $count = $this->cache->get( $key ); + $count = $this->wanCache->get( $key ); if ( is_int( $count ) ) { return $count; } @@ -190,7 +186,7 @@ class JobQueueDB extends JobQueue { throw $this->getDBException( $e ); } - $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); + $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT ); return $count; } @@ -345,7 +341,7 @@ class JobQueueDB extends JobQueue { /** @noinspection PhpUnusedLocalVariableInspection */ $scope = $this->getScopedNoTrxFlag( $dbw ); // Check cache to see if the queue has <= OFFSET items - $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) ); + $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) ); $invertedDirection = false; // whether one job_random direction was already scanned // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT @@ -385,7 +381,7 @@ class JobQueueDB extends JobQueue { ); if ( !$row ) { $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows - $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 ); + $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 ); continue; // use job_random } } @@ -510,32 +506,17 @@ class JobQueueDB extends JobQueue { * @return bool */ protected function doDeduplicateRootJob( IJobSpecification $job ) { - $params = $job->getParams(); - if ( !isset( $params['rootJobSignature'] ) ) { - throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); - } elseif ( !isset( $params['rootJobTimestamp'] ) ) { - throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." ); - } - $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); - // Callers should call JobQueueGroup::push() before this method so that if the insert - // fails, the de-duplication registration will be aborted. Since the insert is - // deferred till "transaction idle", do the same here, so that the ordering is + // Callers should call JobQueueGroup::push() before this method so that if the + // insert fails, the de-duplication registration will be aborted. Since the insert + // is deferred till "transaction idle", do the same here, so that the ordering is // maintained. Having only the de-duplication registration succeed would cause // jobs to become no-ops without any actual jobs that made them redundant. $dbw = $this->getMasterDB(); /** @noinspection PhpUnusedLocalVariableInspection */ $scope = $this->getScopedNoTrxFlag( $dbw ); - - $cache = $this->dupCache; $dbw->onTransactionCommitOrIdle( - function () use ( $cache, $params, $key ) { - $timestamp = $cache->get( $key ); // current last timestamp of this job - if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { - return true; // a newer version of this root job was enqueued - } - - // Update the timestamp of the last root job started at the location... - return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + function () use ( $job ) { + parent::doDeduplicateRootJob( $job ); }, __METHOD__ ); @@ -581,7 +562,7 @@ class JobQueueDB extends JobQueue { */ protected function doFlushCaches() { foreach ( [ 'size', 'acquiredcount' ] as $type ) { - $this->cache->delete( $this->getCacheKey( $type ) ); + $this->wanCache->delete( $this->getCacheKey( $type ) ); } } @@ -709,9 +690,9 @@ class JobQueueDB extends JobQueue { $dbw->update( 'job', [ 'job_token' => '', - 'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release - [ - 'job_id' => $ids ], + 'job_token_timestamp' => $dbw->timestamp( $now ) // time of release + ], + [ 'job_id' => $ids, "job_token != ''" ], __METHOD__ ); $affected = $dbw->affectedRows(); @@ -789,7 +770,7 @@ class JobQueueDB extends JobQueue { /** * @throws JobQueueConnectionError - * @return IDatabase + * @return IMaintainableDatabase */ protected function getMasterDB() { try { @@ -801,7 +782,7 @@ class JobQueueDB extends JobQueue { /** * @param int $index (DB_REPLICA/DB_MASTER) - * @return IDatabase + * @return IMaintainableDatabase */ protected function getDB( $index ) { if ( $this->server ) { @@ -825,12 +806,16 @@ class JobQueueDB extends JobQueue { ? $lbFactory->getExternalLB( $this->cluster ) : $lbFactory->getMainLB( $this->domain ); - return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) + if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) { // Keep a separate connection to avoid contention and deadlocks; // However, SQLite has the opposite behavior due to DB-level locking. - ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT ) + $flags = $lb::CONN_TRX_AUTOCOMMIT; + } else { // Jobs insertion will be defered until the PRESEND stage to reduce contention. - : $lb->getConnectionRef( $index, [], $this->domain ); + $flags = 0; + } + + return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags ); } } @@ -856,7 +841,7 @@ class JobQueueDB extends JobQueue { private function getCacheKey( $property ) { $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; - return $this->cache->makeGlobalKey( + return $this->wanCache->makeGlobalKey( 'jobqueue', $this->domain, $cluster,