X-Git-Url: https://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fjobqueue%2FJobRunner.php;h=4ab9f5ad2e21fa12f4c68b099299240a5c3e8073;hb=793dbbb870657541c1d9b7c2ccd7628214d4a3fb;hp=6d2ce0eff960d4d11402681394c611ab1a9dd571;hpb=04fdc78370dbc042116488d6826e19bf3910273b;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index 6d2ce0eff9..4ab9f5ad2e 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -40,6 +40,10 @@ class JobRunner implements LoggerAwareInterface { */ protected $logger; + const MAX_ALLOWED_LAG = 3; // abort if more than this much DB lag is present + const LAG_CHECK_PERIOD = 1.0; // check slave lag this many seconds + const ERROR_BACKOFF_TTL = 1; // seconds to back off a queue due to errors + /** * @param callable $debug Optional debug output handler */ @@ -98,47 +102,42 @@ class JobRunner implements LoggerAwareInterface { $maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : false; $noThrottle = isset( $options['throttle'] ) && !$options['throttle']; + // Bail if job type is invalid if ( $type !== false && !isset( $wgJobClasses[$type] ) ) { $response['reached'] = 'none-possible'; return $response; } - - // Bail out if in read-only mode + // Bail out if DB is in read-only mode if ( wfReadOnly() ) { $response['reached'] = 'read-only'; return $response; } - - // Catch huge single updates that lead to slave lag - $trxProfiler = Profiler::instance()->getTransactionProfiler(); - $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) ); - $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ ); - // Bail out if there is too much DB lag. // This check should not block as we want to try other wiki queues. - $maxAllowedLag = 3; list( , $maxLag ) = wfGetLB( wfWikiID() )->getMaxLag(); - if ( $maxLag >= $maxAllowedLag ) { + if ( $maxLag >= self::MAX_ALLOWED_LAG ) { $response['reached'] = 'slave-lag-limit'; return $response; } - $group = JobQueueGroup::singleton(); - // Flush any pending DB writes for sanity wfGetLBFactory()->commitAll( __METHOD__ ); + // Catch huge single updates that lead to slave lag + $trxProfiler = Profiler::instance()->getTransactionProfiler(); + $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) ); + $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ ); + // Some jobs types should not run until a certain timestamp $backoffs = array(); // map of (type => UNIX expiry) $backoffDeltas = array(); // map of (type => seconds) $wait = 'wait'; // block to read backoffs the first time + $group = JobQueueGroup::singleton(); $stats = RequestContext::getMain()->getStats(); $jobsPopped = 0; $timeMsTotal = 0; - $flags = JobQueueGroup::USE_CACHE; $startTime = microtime( true ); // time since jobs started running - $checkLagPeriod = 1.0; // check slave lag this many seconds $lastCheckTime = 1; // timestamp of last slave check do { // Sync the persistent backoffs with concurrent runners @@ -147,7 +146,11 @@ class JobRunner implements LoggerAwareInterface { $wait = 'nowait'; // less important now if ( $type === false ) { - $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags, $blacklist ); + $job = $group->pop( + JobQueueGroup::TYPE_DEFAULT, + JobQueueGroup::USE_CACHE, + $blacklist + ); } elseif ( in_array( $type, $blacklist ) ) { $job = false; // requested queue in backoff state } else { @@ -155,6 +158,7 @@ class JobRunner implements LoggerAwareInterface { } if ( $job ) { // found a job + ++$jobsPopped; $popTime = time(); $jType = $job->getType(); @@ -169,80 +173,26 @@ class JobRunner implements LoggerAwareInterface { $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); } - $msg = $job->toString() . " STARTING"; - $this->logger->debug( $msg ); - $this->debugCallback( $msg ); - - // Run the job... - $jobStartTime = microtime( true ); - try { - ++$jobsPopped; - $status = $job->run(); - $error = $job->getLastError(); - $this->commitMasterChanges( $job ); - - DeferredUpdates::doUpdates(); - $this->commitMasterChanges( $job ); - } catch ( Exception $e ) { - MWExceptionHandler::rollbackMasterChangesAndLog( $e ); - $status = false; - $error = get_class( $e ) . ': ' . $e->getMessage(); - MWExceptionHandler::logException( $e ); - } - // Commit all outstanding connections that are in a transaction - // to get a fresh repeatable read snapshot on every connection. - // Note that jobs are still responsible for handling slave lag. - wfGetLBFactory()->commitAll( __METHOD__ ); - // Clear out title cache data from prior snapshots - LinkCache::singleton()->clear(); - $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 ); - $timeMsTotal += $timeMs; - - // Record how long jobs wait before getting popped - $readyTs = $job->getReadyTimestamp(); - if ( $readyTs ) { - $pickupDelay = max( 0, $popTime - $readyTs ); - $stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay ); - $stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay ); - } - // Record root job age for jobs being run - $root = $job->getRootJobParams(); - if ( $root['rootJobTimestamp'] ) { - $age = max( 0, $popTime - wfTimestamp( TS_UNIX, $root['rootJobTimestamp'] ) ); - $stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age ); - } - // Track the execution time for jobs - $stats->timing( "jobqueue.run.$jType", $timeMs ); - - // Mark the job as done on success or when the job cannot be retried - if ( $status !== false || !$job->allowRetries() ) { - $group->ack( $job ); // done + $info = $this->executeJob( $job, $stats, $popTime ); + if ( $info['status'] !== false || !$job->allowRetries() ) { + $group->ack( $job ); // succeeded or job cannot be retried } // Back off of certain jobs for a while (for throttling and for errors) - if ( $status === false && mt_rand( 0, 49 ) == 0 ) { - $ttw = max( $ttw, 30 ); // too many errors + if ( $info['status'] === false && mt_rand( 0, 49 ) == 0 ) { + $ttw = max( $ttw, self::ERROR_BACKOFF_TTL ); // too many errors $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] ) ? $backoffDeltas[$jType] + $ttw : $ttw; } - if ( $status === false ) { - $msg = $job->toString() . " t=$timeMs error={$error}"; - $this->logger->error( $msg ); - $this->debugCallback( $msg ); - } else { - $msg = $job->toString() . " t=$timeMs good"; - $this->logger->info( $msg ); - $this->debugCallback( $msg ); - } - $response['jobs'][] = array( 'type' => $jType, - 'status' => ( $status === false ) ? 'failed' : 'ok', - 'error' => $error, - 'time' => $timeMs + 'status' => ( $info['status'] === false ) ? 'failed' : 'ok', + 'error' => $info['error'], + 'time' => $info['timeMs'] ); + $timeMsTotal += $info['timeMs']; // Break out if we hit the job count or wall time limits... if ( $maxJobs && $jobsPopped >= $maxJobs ) { @@ -257,8 +207,8 @@ class JobRunner implements LoggerAwareInterface { // This only waits for so long before exiting and letting // other wikis in the farm (on different masters) get a chance. $timePassed = microtime( true ) - $lastCheckTime; - if ( $timePassed >= $checkLagPeriod || $timePassed < 0 ) { - if ( !wfWaitForSlaves( $lastCheckTime, false, '*', $maxAllowedLag ) ) { + if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) { + if ( !wfWaitForSlaves( $lastCheckTime, false, '*', self::MAX_ALLOWED_LAG ) ) { $response['reached'] = 'slave-lag-limit'; break; } @@ -288,6 +238,85 @@ class JobRunner implements LoggerAwareInterface { return $response; } + /** + * @param Job $job + * @param BufferingStatsdDataFactory $stats + * @param float $popTime + * @return array Map of status/error/timeMs + */ + private function executeJob( Job $job, $stats, $popTime ) { + $jType = $job->getType(); + $msg = $job->toString() . " STARTING"; + $this->logger->debug( $msg ); + $this->debugCallback( $msg ); + + // Run the job... + $rssStart = $this->getMaxRssKb(); + $jobStartTime = microtime( true ); + try { + $status = $job->run(); + $error = $job->getLastError(); + $this->commitMasterChanges( $job ); + + DeferredUpdates::doUpdates(); + $this->commitMasterChanges( $job ); + } catch ( Exception $e ) { + MWExceptionHandler::rollbackMasterChangesAndLog( $e ); + $status = false; + $error = get_class( $e ) . ': ' . $e->getMessage(); + MWExceptionHandler::logException( $e ); + } + // Commit all outstanding connections that are in a transaction + // to get a fresh repeatable read snapshot on every connection. + // Note that jobs are still responsible for handling slave lag. + wfGetLBFactory()->commitAll( __METHOD__ ); + // Clear out title cache data from prior snapshots + LinkCache::singleton()->clear(); + $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 ); + $rssEnd = $this->getMaxRssKb(); + + // Record how long jobs wait before getting popped + $readyTs = $job->getReadyTimestamp(); + if ( $readyTs ) { + $pickupDelay = max( 0, $popTime - $readyTs ); + $stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay ); + $stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay ); + } + // Record root job age for jobs being run + $root = $job->getRootJobParams(); + if ( $root['rootJobTimestamp'] ) { + $age = max( 0, $popTime - wfTimestamp( TS_UNIX, $root['rootJobTimestamp'] ) ); + $stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age ); + } + // Track the execution time for jobs + $stats->timing( "jobqueue.run.$jType", $timeMs ); + // Track RSS increases for jobs (in case of memory leaks) + if ( $rssStart && $rssEnd ) { + $stats->increment( "jobqueue.rss_delta.$jType", $rssEnd - $rssStart ); + } + + if ( $status === false ) { + $msg = $job->toString() . " t=$timeMs error={$error}"; + $this->logger->error( $msg ); + $this->debugCallback( $msg ); + } else { + $msg = $job->toString() . " t=$timeMs good"; + $this->logger->info( $msg ); + $this->debugCallback( $msg ); + } + + return array( 'status' => $status, 'error' => $error, 'timeMs' => $timeMs ); + } + + /** + * @return int|null Max memory RSS in kilobytes + */ + private function getMaxRssKb() { + $info = wfGetRusage() ?: array(); + // see http://linux.die.net/man/2/getrusage + return isset( $info['ru_maxrss'] ) ? (int)$info['ru_maxrss'] : null; + } + /** * @param Job $job * @return int Seconds for this runner to avoid doing more jobs of this type