X-Git-Url: http://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fjobqueue%2FJobQueueDB.php;h=f7b8ed2f78d41ec388a423ed81f2ae5da9bf6645;hb=8699880aab6f666f902b8d8927325760adfe7c20;hp=47ee5886d4ad71fa94678f1b736228f7c4cdb1e7;hpb=1f163231a6eb5d3e11f55d3d65b5acb45a4d80f8;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php index 47ee5886d4..f7b8ed2f78 100644 --- a/includes/jobqueue/JobQueueDB.php +++ b/includes/jobqueue/JobQueueDB.php @@ -24,6 +24,7 @@ use Wikimedia\Rdbms\Database; use Wikimedia\Rdbms\DBConnectionError; use Wikimedia\Rdbms\DBError; use MediaWiki\MediaWikiServices; +use Wikimedia\Rdbms\IMaintainableDatabase; use Wikimedia\ScopedCallback; /** @@ -38,9 +39,7 @@ class JobQueueDB extends JobQueue { const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random const MAX_OFFSET = 255; // integer; maximum number of rows to skip - /** @var WANObjectCache */ - protected $cache; - /** @var IDatabase|DBError|null */ + /** @var IMaintainableDatabase|DBError|null */ protected $conn; /** @var array|null Server configuration array */ @@ -55,7 +54,6 @@ class JobQueueDB extends JobQueue { * If not specified, the primary DB cluster for the wiki will be used. * This can be overridden with a custom cluster so that DB handles will * be retrieved via LBFactory::getExternalLB() and getConnection(). - * - wanCache : An instance of WANObjectCache to use for caching. * @param array $params */ protected function __construct( array $params ) { @@ -66,8 +64,6 @@ class JobQueueDB extends JobQueue { } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) { $this->cluster = $params['cluster']; } - - $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty(); } protected function supportedOrders() { @@ -91,7 +87,7 @@ class JobQueueDB extends JobQueue { 'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__ ); } catch ( DBError $e ) { - $this->throwDBException( $e ); + throw $this->getDBException( $e ); } return !$found; @@ -104,7 +100,7 @@ class JobQueueDB extends JobQueue { protected function doGetSize() { $key = $this->getCacheKey( 'size' ); - $size = $this->cache->get( $key ); + $size = $this->wanCache->get( $key ); if ( is_int( $size ) ) { return $size; } @@ -118,9 +114,9 @@ class JobQueueDB extends JobQueue { __METHOD__ ); } catch ( DBError $e ) { - $this->throwDBException( $e ); + throw $this->getDBException( $e ); } - $this->cache->set( $key, $size, self::CACHE_TTL_SHORT ); + $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT ); return $size; } @@ -136,7 +132,7 @@ class JobQueueDB extends JobQueue { $key = $this->getCacheKey( 'acquiredcount' ); - $count = $this->cache->get( $key ); + $count = $this->wanCache->get( $key ); if ( is_int( $count ) ) { return $count; } @@ -150,9 +146,9 @@ class JobQueueDB extends JobQueue { __METHOD__ ); } catch ( DBError $e ) { - $this->throwDBException( $e ); + throw $this->getDBException( $e ); } - $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); + $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT ); return $count; } @@ -169,7 +165,7 @@ class JobQueueDB extends JobQueue { $key = $this->getCacheKey( 'abandonedcount' ); - $count = $this->cache->get( $key ); + $count = $this->wanCache->get( $key ); if ( is_int( $count ) ) { return $count; } @@ -187,10 +183,10 @@ class JobQueueDB extends JobQueue { __METHOD__ ); } catch ( DBError $e ) { - $this->throwDBException( $e ); + throw $this->getDBException( $e ); } - $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); + $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT ); return $count; } @@ -281,7 +277,7 @@ class JobQueueDB extends JobQueue { count( $rowSet ) + count( $rowList ) - count( $rows ) ); } catch ( DBError $e ) { - $this->throwDBException( $e ); + throw $this->getDBException( $e ); } if ( $flags & self::QOS_ATOMIC ) { $dbw->endAtomic( $method ); @@ -316,12 +312,7 @@ class JobQueueDB extends JobQueue { $this->incrStats( 'pops', $this->type ); // Get the job object from the row... - $params = self::extractBlob( $row->job_params ); - $params = is_array( $params ) ? $params : []; // sanity - $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ]; - $job = $this->factoryJob( $row->job_cmd, $params ); - $job->setMetadata( 'id', $row->job_id ); - $job->setMetadata( 'timestamp', $row->job_timestamp ); + $job = $this->jobFromRow( $row ); break; // done } while ( true ); @@ -331,7 +322,7 @@ class JobQueueDB extends JobQueue { $this->recycleAndDeleteStaleJobs(); } } catch ( DBError $e ) { - $this->throwDBException( $e ); + throw $this->getDBException( $e ); } return $job; @@ -350,9 +341,8 @@ class JobQueueDB extends JobQueue { /** @noinspection PhpUnusedLocalVariableInspection */ $scope = $this->getScopedNoTrxFlag( $dbw ); // Check cache to see if the queue has <= OFFSET items - $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) ); + $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) ); - $row = false; // the row acquired $invertedDirection = false; // whether one job_random direction was already scanned // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is @@ -391,7 +381,7 @@ class JobQueueDB extends JobQueue { ); if ( !$row ) { $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows - $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 ); + $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 ); continue; // use job_random } } @@ -505,7 +495,7 @@ class JobQueueDB extends JobQueue { $this->incrStats( 'acks', $this->type ); } catch ( DBError $e ) { - $this->throwDBException( $e ); + throw $this->getDBException( $e ); } } @@ -516,32 +506,17 @@ class JobQueueDB extends JobQueue { * @return bool */ protected function doDeduplicateRootJob( IJobSpecification $job ) { - $params = $job->getParams(); - if ( !isset( $params['rootJobSignature'] ) ) { - throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); - } elseif ( !isset( $params['rootJobTimestamp'] ) ) { - throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." ); - } - $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); - // Callers should call JobQueueGroup::push() before this method so that if the insert - // fails, the de-duplication registration will be aborted. Since the insert is - // deferred till "transaction idle", do the same here, so that the ordering is + // Callers should call JobQueueGroup::push() before this method so that if the + // insert fails, the de-duplication registration will be aborted. Since the insert + // is deferred till "transaction idle", do the same here, so that the ordering is // maintained. Having only the de-duplication registration succeed would cause // jobs to become no-ops without any actual jobs that made them redundant. $dbw = $this->getMasterDB(); /** @noinspection PhpUnusedLocalVariableInspection */ $scope = $this->getScopedNoTrxFlag( $dbw ); - - $cache = $this->dupCache; $dbw->onTransactionCommitOrIdle( - function () use ( $cache, $params, $key ) { - $timestamp = $cache->get( $key ); // current last timestamp of this job - if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { - return true; // a newer version of this root job was enqueued - } - - // Update the timestamp of the last root job started at the location... - return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + function () use ( $job ) { + parent::doDeduplicateRootJob( $job ); }, __METHOD__ ); @@ -560,7 +535,7 @@ class JobQueueDB extends JobQueue { try { $dbw->delete( 'job', [ 'job_cmd' => $this->type ] ); } catch ( DBError $e ) { - $this->throwDBException( $e ); + throw $this->getDBException( $e ); } return true; @@ -587,7 +562,7 @@ class JobQueueDB extends JobQueue { */ protected function doFlushCaches() { foreach ( [ 'size', 'acquiredcount' ] as $type ) { - $this->cache->delete( $this->getCacheKey( $type ) ); + $this->wanCache->delete( $this->getCacheKey( $type ) ); } } @@ -619,22 +594,11 @@ class JobQueueDB extends JobQueue { return new MappedIterator( $dbr->select( 'job', self::selectFields(), $conds ), function ( $row ) { - $params = strlen( $row->job_params ) ? unserialize( $row->job_params ) : []; - $params = is_array( $params ) ? $params : []; // sanity - $params += [ - 'namespace' => $row->job_namespace, - 'title' => $row->job_title - ]; - - $job = $this->factoryJob( $row->job_cmd, $params ); - $job->setMetadata( 'id', $row->job_id ); - $job->setMetadata( 'timestamp', $row->job_timestamp ); - - return $job; + return $this->jobFromRow( $row ); } ); } catch ( DBError $e ) { - $this->throwDBException( $e ); + throw $this->getDBException( $e ); } } @@ -764,7 +728,7 @@ class JobQueueDB extends JobQueue { $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); } catch ( DBError $e ) { - $this->throwDBException( $e ); + throw $this->getDBException( $e ); } return $count; @@ -806,7 +770,7 @@ class JobQueueDB extends JobQueue { /** * @throws JobQueueConnectionError - * @return IDatabase + * @return IMaintainableDatabase */ protected function getMasterDB() { try { @@ -818,7 +782,7 @@ class JobQueueDB extends JobQueue { /** * @param int $index (DB_REPLICA/DB_MASTER) - * @return IDatabase + * @return IMaintainableDatabase */ protected function getDB( $index ) { if ( $this->server ) { @@ -842,12 +806,16 @@ class JobQueueDB extends JobQueue { ? $lbFactory->getExternalLB( $this->cluster ) : $lbFactory->getMainLB( $this->domain ); - return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) + if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) { // Keep a separate connection to avoid contention and deadlocks; // However, SQLite has the opposite behavior due to DB-level locking. - ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT ) + $flags = $lb::CONN_TRX_AUTOCOMMIT; + } else { // Jobs insertion will be defered until the PRESEND stage to reduce contention. - : $lb->getConnectionRef( $index, [], $this->domain ); + $flags = 0; + } + + return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags ); } } @@ -873,7 +841,7 @@ class JobQueueDB extends JobQueue { private function getCacheKey( $property ) { $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; - return $this->cache->makeGlobalKey( + return $this->wanCache->makeGlobalKey( 'jobqueue', $this->domain, $cluster, @@ -895,23 +863,30 @@ class JobQueueDB extends JobQueue { } /** - * @param string $blob - * @return bool|mixed + * @param stdClass $row + * @return RunnableJob|null */ - protected static function extractBlob( $blob ) { - if ( (string)$blob !== '' ) { - return unserialize( $blob ); - } else { - return false; + protected function jobFromRow( $row ) { + $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : []; + if ( !is_array( $params ) ) { // this shouldn't happen + throw new UnexpectedValueException( + "Could not unserialize job with ID '{$row->job_id}'." ); } + + $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ]; + $job = $this->factoryJob( $row->job_cmd, $params ); + $job->setMetadata( 'id', $row->job_id ); + $job->setMetadata( 'timestamp', $row->job_timestamp ); + + return $job; } /** * @param DBError $e - * @throws JobQueueError + * @return JobQueueError */ - protected function throwDBException( DBError $e ) { - throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); + protected function getDBException( DBError $e ) { + return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); } /**