X-Git-Url: https://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fjobqueue%2FJobRunner.php;h=baff288e853c0ec7ae585c45878846ff37a0b6c0;hb=178bada116bc5aa336191f4867fec5cae2258cd6;hp=990f112a60c47b67521a99d770988780dce91420;hpb=84851a43f3ea8ea146c4d82c55fd01b9fa302347;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index 990f112a60..49b7a459a6 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -27,6 +27,9 @@ use Liuggio\StatsdClient\Factory\StatsdDataFactory; use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerInterface; use Wikimedia\ScopedCallback; +use Wikimedia\Rdbms\LBFactory; +use Wikimedia\Rdbms\DBError; +use Wikimedia\Rdbms\DBReplicationWaitError; /** * Job queue runner utility methods @@ -35,6 +38,8 @@ use Wikimedia\ScopedCallback; * @since 1.24 */ class JobRunner implements LoggerAwareInterface { + /** @var Config */ + protected $config; /** @var callable|null Debug output handler */ protected $debug; @@ -46,6 +51,7 @@ class JobRunner implements LoggerAwareInterface { const MAX_ALLOWED_LAG = 3; // abort if more than this much DB lag is present const LAG_CHECK_PERIOD = 1.0; // check replica DB lag this many seconds const ERROR_BACKOFF_TTL = 1; // seconds to back off a queue due to errors + const READONLY_BACKOFF_TTL = 30; // seconds to back off a queue due to read-only errors /** * @param callable $debug Optional debug output handler @@ -70,6 +76,7 @@ class JobRunner implements LoggerAwareInterface { $logger = LoggerFactory::getInstance( 'runJobs' ); } $this->setLogger( $logger ); + $this->config = MediaWikiServices::getInstance()->getMainConfig(); } /** @@ -97,7 +104,8 @@ class JobRunner implements LoggerAwareInterface { * @return array Summary response that can easily be JSON serialized */ public function run( array $options ) { - global $wgJobClasses, $wgTrxProfilerLimits; + $jobClasses = $this->config->get( 'JobClasses' ); + $profilerLimits = $this->config->get( 'TrxProfilerLimits' ); $response = [ 'jobs' => [], 'reached' => 'none-ready' ]; @@ -107,7 +115,7 @@ class JobRunner implements LoggerAwareInterface { $noThrottle = isset( $options['throttle'] ) && !$options['throttle']; // Bail if job type is invalid - if ( $type !== false && !isset( $wgJobClasses[$type] ) ) { + if ( $type !== false && !isset( $jobClasses[$type] ) ) { $response['reached'] = 'none-possible'; return $response; } @@ -132,7 +140,7 @@ class JobRunner implements LoggerAwareInterface { // Catch huge single updates that lead to replica DB lag $trxProfiler = Profiler::instance()->getTransactionProfiler(); $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) ); - $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ ); + $trxProfiler->setExpectations( $profilerLimits['JobRunner'], __METHOD__ ); // Some jobs types should not run until a certain timestamp $backoffs = []; // map of (type => UNIX expiry) @@ -190,7 +198,7 @@ class JobRunner implements LoggerAwareInterface { // Back off of certain jobs for a while (for throttling and for errors) if ( $info['status'] === false && mt_rand( 0, 49 ) == 0 ) { - $ttw = max( $ttw, self::ERROR_BACKOFF_TTL ); // too many errors + $ttw = max( $ttw, $this->getErrorBackoffTTL( $info['error'] ) ); $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] ) ? $backoffDeltas[$jType] + $ttw : $ttw; @@ -253,6 +261,16 @@ class JobRunner implements LoggerAwareInterface { return $response; } + /** + * @param string $error + * @return int TTL in seconds + */ + private function getErrorBackoffTTL( $error ) { + return strpos( $error, 'DBReadOnlyError' ) !== false + ? self::READONLY_BACKOFF_TTL + : self::ERROR_BACKOFF_TTL; + } + /** * @param Job $job * @param LBFactory $lbFactory @@ -275,6 +293,8 @@ class JobRunner implements LoggerAwareInterface { $status = $job->run(); $error = $job->getLastError(); $this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner ); + // Important: this must be the last deferred update added (T100085, T154425) + DeferredUpdates::addCallableUpdate( [ JobQueueGroup::class, 'pushLazyJobs' ] ); // Run any deferred update tasks; doUpdates() manages transactions itself DeferredUpdates::doUpdates(); } catch ( Exception $e ) { @@ -346,15 +366,13 @@ class JobRunner implements LoggerAwareInterface { * @see $wgJobBackoffThrottling */ private function getBackoffTimeToWait( Job $job ) { - global $wgJobBackoffThrottling; + $throttling = $this->config->get( 'JobBackoffThrottling' ); - if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) || - $job instanceof DuplicateJob // no work was done - ) { + if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) { return 0; // not throttled } - $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()]; + $itemsPerSecond = $throttling[$job->getType()]; if ( $itemsPerSecond <= 0 ) { return 0; // not throttled } @@ -502,17 +520,17 @@ class JobRunner implements LoggerAwareInterface { * @throws DBError */ private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) { - global $wgJobSerialCommitThreshold; + $syncThreshold = $this->config->get( 'JobSerialCommitThreshold' ); $time = false; $lb = $lbFactory->getMainLB( wfWikiID() ); - if ( $wgJobSerialCommitThreshold !== false && $lb->getServerCount() > 1 ) { + if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) { // Generally, there is one master connection to the local DB $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() ); // We need natively blocking fast locks if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) { $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY ); - if ( $time < $wgJobSerialCommitThreshold ) { + if ( $time < $syncThreshold ) { $dbwSerial = false; } } else { @@ -524,7 +542,12 @@ class JobRunner implements LoggerAwareInterface { } if ( !$dbwSerial ) { - $lbFactory->commitMasterChanges( $fnameTrxOwner ); + $lbFactory->commitMasterChanges( + $fnameTrxOwner, + // Abort if any transaction was too big + [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ] + ); + return; } @@ -549,7 +572,11 @@ class JobRunner implements LoggerAwareInterface { } // Actually commit the DB master changes - $lbFactory->commitMasterChanges( $fnameTrxOwner ); + $lbFactory->commitMasterChanges( + $fnameTrxOwner, + // Abort if any transaction was too big + [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ] + ); ScopedCallback::consume( $unlocker ); } }