return $response;
}
- $group = JobQueueGroup::singleton();
- // Handle any required periodic queue maintenance
- $count = $group->executeReadyPeriodicTasks();
- if ( $count > 0 ) {
- $msg = "Executed $count periodic queue task(s).";
- $this->logger->debug( $msg );
- $this->debugCallback( $msg );
- }
-
// Bail out if in read-only mode
if ( wfReadOnly() ) {
$response['reached'] = 'read-only';
return $response;
}
+ $profiler = Profiler::instance();
+
// Catch huge single updates that lead to slave lag
- $trxProfiler = Profiler::instance()->getTransactionProfiler();
+ $trxProfiler = $profiler->getTransactionProfiler();
$trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
$trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
return $response;
}
+ $group = JobQueueGroup::singleton();
+
// Flush any pending DB writes for sanity
- wfGetLBFactory()->commitMasterChanges();
+ wfGetLBFactory()->commitAll();
// Some jobs types should not run until a certain timestamp
$backoffs = array(); // map of (type => UNIX expiry)
$backoffDeltas = array(); // map of (type => seconds)
$wait = 'wait'; // block to read backoffs the first time
- $jobsRun = 0;
+ $stats = RequestContext::getMain()->getStats();
+ $jobsPopped = 0;
$timeMsTotal = 0;
$flags = JobQueueGroup::USE_CACHE;
$startTime = microtime( true ); // time since jobs started running
}
if ( $job ) { // found a job
+ $popTime = time();
$jType = $job->getType();
// Back off of certain jobs for a while (for throttling and for errors)
}
$msg = $job->toString() . " STARTING";
- $this->logger->info( $msg );
+ $this->logger->debug( $msg );
$this->debugCallback( $msg );
// Run the job...
+ $psection = $profiler->scopedProfileIn( __METHOD__ . '-' . $jType );
$jobStartTime = microtime( true );
try {
- ++$jobsRun;
+ ++$jobsPopped;
$status = $job->run();
$error = $job->getLastError();
$this->commitMasterChanges( $job );
+
+ DeferredUpdates::doUpdates();
+ $this->commitMasterChanges( $job );
} catch ( Exception $e ) {
MWExceptionHandler::rollbackMasterChangesAndLog( $e );
$status = false;
$error = get_class( $e ) . ': ' . $e->getMessage();
MWExceptionHandler::logException( $e );
}
+ // Commit all outstanding connections that are in a transaction
+ // to get a fresh repeatable read snapshot on every connection.
+ wfGetLBFactory()->commitAll();
$timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
$timeMsTotal += $timeMs;
+ $profiler->scopedProfileOut( $psection );
+
+ if ( $job->getQueuedTimestamp() ) {
+ // Record time to run for the job type
+ $stats->timing( "job-pickuptime-$jType",
+ $popTime - $job->getQueuedTimestamp() );
+ }
// Mark the job as done on success or when the job cannot be retried
if ( $status !== false || !$job->allowRetries() ) {
);
// Break out if we hit the job count or wall time limits...
- if ( $maxJobs && $jobsRun >= $maxJobs ) {
+ if ( $maxJobs && $jobsPopped >= $maxJobs ) {
$response['reached'] = 'job-limit';
break;
} elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) {
$lastCheckTime = microtime( true );
}
// Don't let any queue slaves/backups fall behind
- if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) {
+ if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) {
$group->waitForBackups();
}
}
/**
- * Commit any DB master changes from a job on all load balancers
+ * Issue a commit on all masters who are currently in a transaction and have
+ * made changes to the database. It also supports sometimes waiting for the
+ * local wiki's slaves to catch up. See the documentation for
+ * $wgJobSerialCommitThreshold for more.
*
* @param Job $job
* @throws DBError