}
$group = JobQueueGroup::singleton();
-
+
// Flush any pending DB writes for sanity
wfGetLBFactory()->commitAll();
$backoffDeltas = array(); // map of (type => seconds)
$wait = 'wait'; // block to read backoffs the first time
- $jobsRun = 0;
+ $stats = RequestContext::getMain()->getStats();
+ $jobsPopped = 0;
$timeMsTotal = 0;
$flags = JobQueueGroup::USE_CACHE;
$startTime = microtime( true ); // time since jobs started running
}
if ( $job ) { // found a job
+ $popTime = time();
$jType = $job->getType();
// Back off of certain jobs for a while (for throttling and for errors)
$psection = $profiler->scopedProfileIn( __METHOD__ . '-' . $jType );
$jobStartTime = microtime( true );
try {
- ++$jobsRun;
+ ++$jobsPopped;
$status = $job->run();
$error = $job->getLastError();
$this->commitMasterChanges( $job );
}
// Commit all outstanding connections that are in a transaction
// to get a fresh repeatable read snapshot on every connection.
- // This is important because if you have an old snapshot on the
- // database you could run the job incorrectly. Its possible, for
- // example, to pick up a RefreshLinksJob for a new page that isn't
- // even visible to the snapshot. The snapshot could have been
- // created before the page. Fresh snapshots will see the page.
wfGetLBFactory()->commitAll();
$timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
$timeMsTotal += $timeMs;
$profiler->scopedProfileOut( $psection );
+ $readyTs = $job->getReadyTimestamp();
+ if ( $readyTs ) {
+ // Record time to run for the job type
+ $pickupDelay = $popTime - $readyTs;
+ $stats->timing( 'jobqueue.pickup_delay.all', $pickupDelay );
+ $stats->timing( "jobqueue.pickup_delay.$jType", $pickupDelay );
+ }
+
// Mark the job as done on success or when the job cannot be retried
if ( $status !== false || !$job->allowRetries() ) {
$group->ack( $job ); // done
);
// Break out if we hit the job count or wall time limits...
- if ( $maxJobs && $jobsRun >= $maxJobs ) {
+ if ( $maxJobs && $jobsPopped >= $maxJobs ) {
$response['reached'] = 'job-limit';
break;
} elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) {
$lastCheckTime = microtime( true );
}
// Don't let any queue slaves/backups fall behind
- if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) {
+ if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) {
$group->waitForBackups();
}