X-Git-Url: https://git.heureux-cyclage.org/?p=lhc%2Fweb%2Fwiklou.git;a=blobdiff_plain;f=includes%2Fjobqueue%2FJobRunner.php;h=fa7d605731e3e474b6816f9fa8756ad8cc30eb65;hp=a1aeaba4357548410d33ac7f3f48a92a9ba6a209;hb=a8379682a46a428320c88702c800a6107c015137;hpb=afdb4753bdcbc55608e350195d3e8de49e6fdfff diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index a1aeaba435..fa7d605731 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -38,6 +38,8 @@ use Wikimedia\Rdbms\DBReplicationWaitError; * @since 1.24 */ class JobRunner implements LoggerAwareInterface { + /** @var Config */ + protected $config; /** @var callable|null Debug output handler */ protected $debug; @@ -74,6 +76,7 @@ class JobRunner implements LoggerAwareInterface { $logger = LoggerFactory::getInstance( 'runJobs' ); } $this->setLogger( $logger ); + $this->config = MediaWikiServices::getInstance()->getMainConfig(); } /** @@ -101,7 +104,8 @@ class JobRunner implements LoggerAwareInterface { * @return array Summary response that can easily be JSON serialized */ public function run( array $options ) { - global $wgJobClasses, $wgTrxProfilerLimits; + $jobClasses = $this->config->get( 'JobClasses' ); + $profilerLimits = $this->config->get( 'TrxProfilerLimits' ); $response = [ 'jobs' => [], 'reached' => 'none-ready' ]; @@ -111,10 +115,11 @@ class JobRunner implements LoggerAwareInterface { $noThrottle = isset( $options['throttle'] ) && !$options['throttle']; // Bail if job type is invalid - if ( $type !== false && !isset( $wgJobClasses[$type] ) ) { + if ( $type !== false && !isset( $jobClasses[$type] ) ) { $response['reached'] = 'none-possible'; return $response; } + // Bail out if DB is in read-only mode if ( wfReadOnly() ) { $response['reached'] = 'read-only'; @@ -122,6 +127,9 @@ class JobRunner implements LoggerAwareInterface { } $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + if ( $lbFactory->hasTransactionRound() ) { + throw new LogicException( __METHOD__ . ' called with an active transaction round.' ); + } // Bail out if there is too much DB lag. // This check should not block as we want to try other wiki queues. list( , $maxLag ) = $lbFactory->getMainLB( wfWikiID() )->getMaxLag(); @@ -130,13 +138,10 @@ class JobRunner implements LoggerAwareInterface { return $response; } - // Flush any pending DB writes for sanity - $lbFactory->commitAll( __METHOD__ ); - // Catch huge single updates that lead to replica DB lag $trxProfiler = Profiler::instance()->getTransactionProfiler(); $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) ); - $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ ); + $trxProfiler->setExpectations( $profilerLimits['JobRunner'], __METHOD__ ); // Some jobs types should not run until a certain timestamp $backoffs = []; // map of (type => UNIX expiry) @@ -166,7 +171,6 @@ class JobRunner implements LoggerAwareInterface { } else { $job = $group->pop( $type ); // job from a single queue } - $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes if ( $job ) { // found a job ++$jobsPopped; @@ -189,7 +193,6 @@ class JobRunner implements LoggerAwareInterface { $info = $this->executeJob( $job, $lbFactory, $stats, $popTime ); if ( $info['status'] !== false || !$job->allowRetries() ) { $group->ack( $job ); // succeeded or job cannot be retried - $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes } // Back off of certain jobs for a while (for throttling and for errors) @@ -277,7 +280,9 @@ class JobRunner implements LoggerAwareInterface { private function executeJob( Job $job, LBFactory $lbFactory, $stats, $popTime ) { $jType = $job->getType(); $msg = $job->toString() . " STARTING"; - $this->logger->debug( $msg ); + $this->logger->debug( $msg, [ + 'job_type' => $job->getType(), + ] ); $this->debugCallback( $msg ); // Run the job... @@ -289,10 +294,10 @@ class JobRunner implements LoggerAwareInterface { $status = $job->run(); $error = $job->getLastError(); $this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner ); + // Important: this must be the last deferred update added (T100085, T154425) + DeferredUpdates::addCallableUpdate( [ JobQueueGroup::class, 'pushLazyJobs' ] ); // Run any deferred update tasks; doUpdates() manages transactions itself DeferredUpdates::doUpdates(); - // Push lazy jobs added by the job or its deferred udpates - JobQueueGroup::pushLazyJobs(); } catch ( Exception $e ) { MWExceptionHandler::rollbackMasterChangesAndLog( $e ); $status = false; @@ -335,12 +340,23 @@ class JobRunner implements LoggerAwareInterface { } if ( $status === false ) { + $msg = $job->toString() . " t={job_duration} error={job_error}"; + $this->logger->error( $msg, [ + 'job_type' => $job->getType(), + 'job_duration' => $timeMs, + 'job_error' => $error, + ] ); + $msg = $job->toString() . " t=$timeMs error={$error}"; - $this->logger->error( $msg ); $this->debugCallback( $msg ); } else { + $msg = $job->toString() . " t={job_duration} good"; + $this->logger->info( $msg, [ + 'job_type' => $job->getType(), + 'job_duration' => $timeMs, + ] ); + $msg = $job->toString() . " t=$timeMs good"; - $this->logger->info( $msg ); $this->debugCallback( $msg ); } @@ -362,15 +378,13 @@ class JobRunner implements LoggerAwareInterface { * @see $wgJobBackoffThrottling */ private function getBackoffTimeToWait( Job $job ) { - global $wgJobBackoffThrottling; + $throttling = $this->config->get( 'JobBackoffThrottling' ); - if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) || - $job instanceof DuplicateJob // no work was done - ) { + if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) { return 0; // not throttled } - $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()]; + $itemsPerSecond = $throttling[$job->getType()]; if ( $itemsPerSecond <= 0 ) { return 0; // not throttled } @@ -486,9 +500,14 @@ class JobRunner implements LoggerAwareInterface { } $usedBytes = memory_get_usage(); if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) { + $msg = "Detected excessive memory usage ({used_bytes}/{max_bytes})."; + $this->logger->error( $msg, [ + 'used_bytes' => $usedBytes, + 'max_bytes' => $maxBytes, + ] ); + $msg = "Detected excessive memory usage ($usedBytes/$maxBytes)."; $this->debugCallback( $msg ); - $this->logger->error( $msg ); return false; } @@ -518,17 +537,17 @@ class JobRunner implements LoggerAwareInterface { * @throws DBError */ private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) { - global $wgJobSerialCommitThreshold; + $syncThreshold = $this->config->get( 'JobSerialCommitThreshold' ); $time = false; $lb = $lbFactory->getMainLB( wfWikiID() ); - if ( $wgJobSerialCommitThreshold !== false && $lb->getServerCount() > 1 ) { + if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) { // Generally, there is one master connection to the local DB $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() ); // We need natively blocking fast locks if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) { $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY ); - if ( $time < $wgJobSerialCommitThreshold ) { + if ( $time < $syncThreshold ) { $dbwSerial = false; } } else { @@ -540,13 +559,24 @@ class JobRunner implements LoggerAwareInterface { } if ( !$dbwSerial ) { - $lbFactory->commitMasterChanges( $fnameTrxOwner ); + $lbFactory->commitMasterChanges( + $fnameTrxOwner, + // Abort if any transaction was too big + [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ] + ); + return; } $ms = intval( 1000 * $time ); + + $msg = $job->toString() . " COMMIT ENQUEUED [{job_commit_write_ms}ms of writes]"; + $this->logger->info( $msg, [ + 'job_type' => $job->getType(), + 'job_commit_write_ms' => $ms, + ] ); + $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]"; - $this->logger->info( $msg ); $this->debugCallback( $msg ); // Wait for an exclusive lock to commit @@ -565,7 +595,11 @@ class JobRunner implements LoggerAwareInterface { } // Actually commit the DB master changes - $lbFactory->commitMasterChanges( $fnameTrxOwner ); + $lbFactory->commitMasterChanges( + $fnameTrxOwner, + // Abort if any transaction was too big + [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ] + ); ScopedCallback::consume( $unlocker ); } }