}
$group = JobQueueGroup::singleton();
-
+
// Flush any pending DB writes for sanity
wfGetLBFactory()->commitAll();
$wait = 'wait'; // block to read backoffs the first time
$stats = RequestContext::getMain()->getStats();
- $jobsRun = 0;
+ $jobsPopped = 0;
$timeMsTotal = 0;
$flags = JobQueueGroup::USE_CACHE;
$startTime = microtime( true ); // time since jobs started running
}
if ( $job ) { // found a job
+ $popTime = time();
$jType = $job->getType();
// Back off of certain jobs for a while (for throttling and for errors)
$msg = $job->toString() . " STARTING";
$this->logger->debug( $msg );
$this->debugCallback( $msg );
- $timeToRun = false;
// Run the job...
$psection = $profiler->scopedProfileIn( __METHOD__ . '-' . $jType );
$jobStartTime = microtime( true );
try {
- ++$jobsRun;
- $queuedTime = $job->getQueuedTimestamp();
- if ( $queuedTime !== null ) {
- $timeToRun = time() - $queuedTime;
- }
+ ++$jobsPopped;
$status = $job->run();
$error = $job->getLastError();
$this->commitMasterChanges( $job );
}
// Commit all outstanding connections that are in a transaction
// to get a fresh repeatable read snapshot on every connection.
- // This is important because if you have an old snapshot on the
- // database you could run the job incorrectly. Its possible, for
- // example, to pick up a RefreshLinksJob for a new page that isn't
- // even visible to the snapshot. The snapshot could have been
- // created before the page. Fresh snapshots will see the page.
wfGetLBFactory()->commitAll();
$timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
$timeMsTotal += $timeMs;
$profiler->scopedProfileOut( $psection );
- if ( $timeToRun !== false ) {
+
+ $readyTs = $job->getReadyTimestamp();
+ if ( $readyTs ) {
// Record time to run for the job type
- $stats->timing( "jobqueue.pickup_time.$jType", $timeToRun );
+ $pickupDelay = $popTime - $readyTs;
+ $stats->timing( 'jobqueue.pickup_delay.all', $pickupDelay );
+ $stats->timing( "jobqueue.pickup_delay.$jType", $pickupDelay );
}
// Mark the job as done on success or when the job cannot be retried
);
// Break out if we hit the job count or wall time limits...
- if ( $maxJobs && $jobsRun >= $maxJobs ) {
+ if ( $maxJobs && $jobsPopped >= $maxJobs ) {
$response['reached'] = 'job-limit';
break;
} elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) {
$lastCheckTime = microtime( true );
}
// Don't let any queue slaves/backups fall behind
- if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) {
+ if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) {
$group->waitForBackups();
}
// Bail if near-OOM instead of in a job
- $this->assertMemoryOK();
+ if ( !$this->checkMemoryOK() ) {
+ $response['reached'] = 'memory-limit';
+ break;
+ }
}
} while ( $job ); // stop when there are no jobs
/**
* Make sure that this script is not too close to the memory usage limit.
* It is better to die in between jobs than OOM right in the middle of one.
- * @throws MWException
+ * @return bool
*/
- private function assertMemoryOK() {
+ private function checkMemoryOK() {
static $maxBytes = null;
if ( $maxBytes === null ) {
$m = array();
}
$usedBytes = memory_get_usage();
if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
- throw new MWException( "Detected excessive memory usage ($usedBytes/$maxBytes)." );
+ $msg = "Detected excessive memory usage ($usedBytes/$maxBytes).";
+ $this->debugCallback( $msg );
+ $this->logger->error( $msg );
+
+ return false;
}
+
+ return true;
}
/**