* @since 1.24
*/
class JobRunner implements LoggerAwareInterface {
+ /** @var Config */
+ protected $config;
/** @var callable|null Debug output handler */
protected $debug;
$logger = LoggerFactory::getInstance( 'runJobs' );
}
$this->setLogger( $logger );
+ $this->config = MediaWikiServices::getInstance()->getMainConfig();
}
/**
* @return array Summary response that can easily be JSON serialized
*/
public function run( array $options ) {
- global $wgJobClasses, $wgTrxProfilerLimits;
+ $jobClasses = $this->config->get( 'JobClasses' );
+ $profilerLimits = $this->config->get( 'TrxProfilerLimits' );
$response = [ 'jobs' => [], 'reached' => 'none-ready' ];
$noThrottle = isset( $options['throttle'] ) && !$options['throttle'];
// Bail if job type is invalid
- if ( $type !== false && !isset( $wgJobClasses[$type] ) ) {
+ if ( $type !== false && !isset( $jobClasses[$type] ) ) {
$response['reached'] = 'none-possible';
return $response;
}
+
// Bail out if DB is in read-only mode
if ( wfReadOnly() ) {
$response['reached'] = 'read-only';
}
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ if ( $lbFactory->hasTransactionRound() ) {
+ throw new LogicException( __METHOD__ . ' called with an active transaction round.' );
+ }
// Bail out if there is too much DB lag.
// This check should not block as we want to try other wiki queues.
list( , $maxLag ) = $lbFactory->getMainLB( wfWikiID() )->getMaxLag();
return $response;
}
- // Flush any pending DB writes for sanity
- $lbFactory->commitAll( __METHOD__ );
-
// Catch huge single updates that lead to replica DB lag
$trxProfiler = Profiler::instance()->getTransactionProfiler();
$trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
- $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
+ $trxProfiler->setExpectations( $profilerLimits['JobRunner'], __METHOD__ );
// Some jobs types should not run until a certain timestamp
$backoffs = []; // map of (type => UNIX expiry)
} else {
$job = $group->pop( $type ); // job from a single queue
}
- $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes
if ( $job ) { // found a job
++$jobsPopped;
$info = $this->executeJob( $job, $lbFactory, $stats, $popTime );
if ( $info['status'] !== false || !$job->allowRetries() ) {
$group->ack( $job ); // succeeded or job cannot be retried
- $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes
}
// Back off of certain jobs for a while (for throttling and for errors)
private function executeJob( Job $job, LBFactory $lbFactory, $stats, $popTime ) {
$jType = $job->getType();
$msg = $job->toString() . " STARTING";
- $this->logger->debug( $msg );
+ $this->logger->debug( $msg, [
+ 'job_type' => $job->getType(),
+ ] );
$this->debugCallback( $msg );
// Run the job...
$status = $job->run();
$error = $job->getLastError();
$this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner );
+ // Important: this must be the last deferred update added (T100085, T154425)
+ DeferredUpdates::addCallableUpdate( [ JobQueueGroup::class, 'pushLazyJobs' ] );
// Run any deferred update tasks; doUpdates() manages transactions itself
DeferredUpdates::doUpdates();
- // Push lazy jobs added by the job or its deferred udpates
- JobQueueGroup::pushLazyJobs();
} catch ( Exception $e ) {
MWExceptionHandler::rollbackMasterChangesAndLog( $e );
$status = false;
}
if ( $status === false ) {
+ $msg = $job->toString() . " t={job_duration} error={job_error}";
+ $this->logger->error( $msg, [
+ 'job_type' => $job->getType(),
+ 'job_duration' => $timeMs,
+ 'job_error' => $error,
+ ] );
+
$msg = $job->toString() . " t=$timeMs error={$error}";
- $this->logger->error( $msg );
$this->debugCallback( $msg );
} else {
+ $msg = $job->toString() . " t={job_duration} good";
+ $this->logger->info( $msg, [
+ 'job_type' => $job->getType(),
+ 'job_duration' => $timeMs,
+ ] );
+
$msg = $job->toString() . " t=$timeMs good";
- $this->logger->info( $msg );
$this->debugCallback( $msg );
}
* @see $wgJobBackoffThrottling
*/
private function getBackoffTimeToWait( Job $job ) {
- global $wgJobBackoffThrottling;
+ $throttling = $this->config->get( 'JobBackoffThrottling' );
- if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) ||
- $job instanceof DuplicateJob // no work was done
- ) {
+ if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) {
return 0; // not throttled
}
- $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()];
+ $itemsPerSecond = $throttling[$job->getType()];
if ( $itemsPerSecond <= 0 ) {
return 0; // not throttled
}
}
$usedBytes = memory_get_usage();
if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
+ $msg = "Detected excessive memory usage ({used_bytes}/{max_bytes}).";
+ $this->logger->error( $msg, [
+ 'used_bytes' => $usedBytes,
+ 'max_bytes' => $maxBytes,
+ ] );
+
$msg = "Detected excessive memory usage ($usedBytes/$maxBytes).";
$this->debugCallback( $msg );
- $this->logger->error( $msg );
return false;
}
* @throws DBError
*/
private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) {
- global $wgJobSerialCommitThreshold;
+ $syncThreshold = $this->config->get( 'JobSerialCommitThreshold' );
$time = false;
$lb = $lbFactory->getMainLB( wfWikiID() );
- if ( $wgJobSerialCommitThreshold !== false && $lb->getServerCount() > 1 ) {
+ if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) {
// Generally, there is one master connection to the local DB
$dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
// We need natively blocking fast locks
if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
$time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
- if ( $time < $wgJobSerialCommitThreshold ) {
+ if ( $time < $syncThreshold ) {
$dbwSerial = false;
}
} else {
}
if ( !$dbwSerial ) {
- $lbFactory->commitMasterChanges( $fnameTrxOwner );
+ $lbFactory->commitMasterChanges(
+ $fnameTrxOwner,
+ // Abort if any transaction was too big
+ [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
+ );
+
return;
}
$ms = intval( 1000 * $time );
+
+ $msg = $job->toString() . " COMMIT ENQUEUED [{job_commit_write_ms}ms of writes]";
+ $this->logger->info( $msg, [
+ 'job_type' => $job->getType(),
+ 'job_commit_write_ms' => $ms,
+ ] );
+
$msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]";
- $this->logger->info( $msg );
$this->debugCallback( $msg );
// Wait for an exclusive lock to commit
}
// Actually commit the DB master changes
- $lbFactory->commitMasterChanges( $fnameTrxOwner );
+ $lbFactory->commitMasterChanges(
+ $fnameTrxOwner,
+ // Abort if any transaction was too big
+ [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
+ );
ScopedCallback::consume( $unlocker );
}
}