X-Git-Url: https://git.heureux-cyclage.org/?a=blobdiff_plain;ds=sidebyside;f=includes%2Fjobqueue%2FJobQueueDB.php;h=c2772a63816e0d1aabd91abbf73c9f2a40dc3ca6;hb=3a027196d9c3f3549b16d2821366a28ac705fd32;hp=65c27d87322a9ed09810809571602f95d304a923;hpb=c2cedf71ee1464f8b4c974906cbca4c8cd80a4e6;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php index 65c27d8732..c2772a6381 100644 --- a/includes/jobqueue/JobQueueDB.php +++ b/includes/jobqueue/JobQueueDB.php @@ -20,7 +20,7 @@ * @file */ use Wikimedia\Rdbms\IDatabase; -use Wikimedia\Rdbms\DBConnRef; +use Wikimedia\Rdbms\Database; use Wikimedia\Rdbms\DBConnectionError; use Wikimedia\Rdbms\DBError; use MediaWiki\MediaWikiServices; @@ -40,23 +40,34 @@ class JobQueueDB extends JobQueue { /** @var WANObjectCache */ protected $cache; + /** @var IDatabase|DBError|null */ + protected $conn; - /** @var bool|string Name of an external DB cluster. False if not set */ - protected $cluster = false; + /** @var array|null Server configuration array */ + protected $server; + /** @var string|null Name of an external DB cluster or null for the local DB cluster */ + protected $cluster; /** * Additional parameters include: + * - server : Server configuration array for Database::factory. Overrides "cluster". * - cluster : The name of an external cluster registered via LBFactory. * If not specified, the primary DB cluster for the wiki will be used. * This can be overridden with a custom cluster so that DB handles will * be retrieved via LBFactory::getExternalLB() and getConnection(). + * - wanCache : An instance of WANObjectCache to use for caching. * @param array $params */ protected function __construct( array $params ) { parent::__construct( $params ); - $this->cluster = $params['cluster'] ?? false; - $this->cache = MediaWikiServices::getInstance()->getMainWANObjectCache(); + if ( isset( $params['server'] ) ) { + $this->server = $params['server']; + } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) { + $this->cluster = $params['cluster']; + } + + $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty(); } protected function supportedOrders() { @@ -73,6 +84,8 @@ class JobQueueDB extends JobQueue { */ protected function doIsEmpty() { $dbr = $this->getReplicaDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbr ); try { $found = $dbr->selectField( // unclaimed job 'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__ @@ -96,8 +109,10 @@ class JobQueueDB extends JobQueue { return $size; } + $dbr = $this->getReplicaDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbr ); try { - $dbr = $this->getReplicaDB(); $size = (int)$dbr->selectField( 'job', 'COUNT(*)', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__ @@ -127,6 +142,8 @@ class JobQueueDB extends JobQueue { } $dbr = $this->getReplicaDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbr ); try { $count = (int)$dbr->selectField( 'job', 'COUNT(*)', [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ], @@ -158,6 +175,8 @@ class JobQueueDB extends JobQueue { } $dbr = $this->getReplicaDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbr ); try { $count = (int)$dbr->selectField( 'job', 'COUNT(*)', [ @@ -185,6 +204,8 @@ class JobQueueDB extends JobQueue { */ protected function doBatchPush( array $jobs, $flags ) { $dbw = $this->getMasterDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); // In general, there will be two cases here: // a) sqlite; DB connection is probably a regular round-aware handle. // If the connection is busy with a transaction, then defer the job writes @@ -255,8 +276,8 @@ class JobQueueDB extends JobQueue { foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { $dbw->insert( 'job', $rowBatch, $method ); } - JobQueue::incrStats( 'inserts', $this->type, count( $rows ) ); - JobQueue::incrStats( 'dupe_inserts', $this->type, + $this->incrStats( 'inserts', $this->type, count( $rows ) ); + $this->incrStats( 'dupe_inserts', $this->type, count( $rowSet ) + count( $rowList ) - count( $rows ) ); } catch ( DBError $e ) { @@ -273,15 +294,12 @@ class JobQueueDB extends JobQueue { */ protected function doPop() { $dbw = $this->getMasterDB(); - try { - $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting - $dbw->clearFlag( DBO_TRX ); // make each query its own transaction - $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) { - $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting - } ); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); + $job = false; // job popped off + try { $uuid = wfRandomString( 32 ); // pop attempt - $job = false; // job popped off do { // retry when our row is invalid or deleted as a duplicate // Try to reserve a row in the DB... if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) { @@ -295,13 +313,13 @@ class JobQueueDB extends JobQueue { if ( !$row ) { break; // nothing to do } - JobQueue::incrStats( 'pops', $this->type ); + $this->incrStats( 'pops', $this->type ); // Get the job object from the row... $title = Title::makeTitle( $row->job_namespace, $row->job_title ); $job = Job::factory( $row->job_cmd, $title, self::extractBlob( $row->job_params ) ); - $job->metadata['id'] = $row->job_id; - $job->metadata['timestamp'] = $row->job_timestamp; + $job->setMetadata( 'id', $row->job_id ); + $job->setMetadata( 'timestamp', $row->job_timestamp ); break; // done } while ( true ); @@ -327,6 +345,8 @@ class JobQueueDB extends JobQueue { */ protected function claimRandom( $uuid, $rand, $gte ) { $dbw = $this->getMasterDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); // Check cache to see if the queue has <= OFFSET items $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) ); @@ -404,6 +424,8 @@ class JobQueueDB extends JobQueue { */ protected function claimOldest( $uuid ) { $dbw = $this->getMasterDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); $row = false; // the row acquired do { @@ -463,23 +485,23 @@ class JobQueueDB extends JobQueue { * @throws MWException */ protected function doAck( Job $job ) { - if ( !isset( $job->metadata['id'] ) ) { + $id = $job->getMetadata( 'id' ); + if ( $id === null ) { throw new MWException( "Job of type '{$job->getType()}' has no ID." ); } $dbw = $this->getMasterDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); try { - $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting - $dbw->clearFlag( DBO_TRX ); // make each query its own transaction - $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) { - $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting - } ); - // Delete a row with a single DELETE without holding row locks over RTTs... - $dbw->delete( 'job', - [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ ); + $dbw->delete( + 'job', + [ 'job_cmd' => $this->type, 'job_id' => $id ], + __METHOD__ + ); - JobQueue::incrStats( 'acks', $this->type ); + $this->incrStats( 'acks', $this->type ); } catch ( DBError $e ) { $this->throwDBException( $e ); } @@ -505,6 +527,9 @@ class JobQueueDB extends JobQueue { // maintained. Having only the de-duplication registration succeed would cause // jobs to become no-ops without any actual jobs that made them redundant. $dbw = $this->getMasterDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); + $cache = $this->dupCache; $dbw->onTransactionCommitOrIdle( function () use ( $cache, $params, $key ) { @@ -528,6 +553,8 @@ class JobQueueDB extends JobQueue { */ protected function doDelete() { $dbw = $this->getMasterDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); try { $dbw->delete( 'job', [ 'job_cmd' => $this->type ] ); } catch ( DBError $e ) { @@ -542,9 +569,15 @@ class JobQueueDB extends JobQueue { * @return void */ protected function doWaitForBackups() { + if ( $this->server ) { + return; // not using LBFactory instance + } + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); - $lbFactory->waitForReplication( - [ 'domain' => $this->domain, 'cluster' => $this->cluster ] ); + $lbFactory->waitForReplication( [ + 'domain' => $this->domain, + 'cluster' => is_string( $this->cluster ) ? $this->cluster : false + ] ); } /** @@ -578,6 +611,8 @@ class JobQueueDB extends JobQueue { */ protected function getJobIterator( array $conds ) { $dbr = $this->getReplicaDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbr ); try { return new MappedIterator( $dbr->select( 'job', self::selectFields(), $conds ), @@ -587,8 +622,8 @@ class JobQueueDB extends JobQueue { Title::makeTitle( $row->job_namespace, $row->job_title ), strlen( $row->job_params ) ? unserialize( $row->job_params ) : [] ); - $job->metadata['id'] = $row->job_id; - $job->metadata['timestamp'] = $row->job_timestamp; + $job->setMetadata( 'id', $row->job_id ); + $job->setMetadata( 'timestamp', $row->job_timestamp ); return $job; } @@ -599,13 +634,19 @@ class JobQueueDB extends JobQueue { } public function getCoalesceLocationInternal() { - return $this->cluster + if ( $this->server ) { + return null; // not using the LBFactory instance + } + + return is_string( $this->cluster ) ? "DBCluster:{$this->cluster}:{$this->domain}" : "LBFactory:{$this->domain}"; } protected function doGetSiblingQueuesWithJobs( array $types ) { $dbr = $this->getReplicaDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbr ); // @note: this does not check whether the jobs are claimed or not. // This is useful so JobQueueGroup::pop() also sees queues that only // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue @@ -623,6 +664,9 @@ class JobQueueDB extends JobQueue { protected function doGetSiblingQueueSizes( array $types ) { $dbr = $this->getReplicaDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbr ); + $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ], [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] ); @@ -643,6 +687,8 @@ class JobQueueDB extends JobQueue { $now = time(); $count = 0; // affected rows $dbw = $this->getMasterDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); try { if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { @@ -682,8 +728,7 @@ class JobQueueDB extends JobQueue { ); $affected = $dbw->affectedRows(); $count += $affected; - JobQueue::incrStats( 'recycles', $this->type, $affected ); - $this->aggr->notifyQueueNonEmpty( $this->domain, $this->type ); + $this->incrStats( 'recycles', $this->type, $affected ); } } @@ -709,7 +754,7 @@ class JobQueueDB extends JobQueue { $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ ); $affected = $dbw->affectedRows(); $count += $affected; - JobQueue::incrStats( 'abandons', $this->type, $affected ); + $this->incrStats( 'abandons', $this->type, $affected ); } $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); @@ -744,7 +789,7 @@ class JobQueueDB extends JobQueue { /** * @throws JobQueueConnectionError - * @return DBConnRef + * @return IDatabase */ protected function getReplicaDB() { try { @@ -756,7 +801,7 @@ class JobQueueDB extends JobQueue { /** * @throws JobQueueConnectionError - * @return DBConnRef + * @return IDatabase */ protected function getMasterDB() { try { @@ -768,20 +813,52 @@ class JobQueueDB extends JobQueue { /** * @param int $index (DB_REPLICA/DB_MASTER) - * @return DBConnRef + * @return IDatabase */ protected function getDB( $index ) { - $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); - $lb = ( $this->cluster !== false ) - ? $lbFactory->getExternalLB( $this->cluster ) - : $lbFactory->getMainLB( $this->domain ); - - return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) - // Keep a separate connection to avoid contention and deadlocks; - // However, SQLite has the opposite behavior due to DB-level locking. - ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT ) - // Jobs insertion will be defered until the PRESEND stage to reduce contention. - : $lb->getConnectionRef( $index, [], $this->domain ); + if ( $this->server ) { + if ( $this->conn instanceof IDatabase ) { + return $this->conn; + } elseif ( $this->conn instanceof DBError ) { + throw $this->conn; + } + + try { + $this->conn = Database::factory( $this->server['type'], $this->server ); + } catch ( DBError $e ) { + $this->conn = $e; + throw $e; + } + + return $this->conn; + } else { + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $lb = is_string( $this->cluster ) + ? $lbFactory->getExternalLB( $this->cluster ) + : $lbFactory->getMainLB( $this->domain ); + + return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) + // Keep a separate connection to avoid contention and deadlocks; + // However, SQLite has the opposite behavior due to DB-level locking. + ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT ) + // Jobs insertion will be defered until the PRESEND stage to reduce contention. + : $lb->getConnectionRef( $index, [], $this->domain ); + } + } + + /** + * @param IDatabase $db + * @return ScopedCallback + */ + private function getScopedNoTrxFlag( IDatabase $db ) { + $autoTrx = $db->getFlag( DBO_TRX ); // get current setting + $db->clearFlag( DBO_TRX ); // make each query its own transaction + + return new ScopedCallback( function () use ( $db, $autoTrx ) { + if ( $autoTrx ) { + $db->setFlag( DBO_TRX ); // restore old setting + } + } ); } /**