$response['reached'] = 'none-possible';
return $response;
}
+
// Bail out if DB is in read-only mode
if ( wfReadOnly() ) {
$response['reached'] = 'read-only';
}
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ if ( $lbFactory->hasTransactionRound() ) {
+ throw new LogicException( __METHOD__ . ' called with an active transaction round.' );
+ }
// Bail out if there is too much DB lag.
// This check should not block as we want to try other wiki queues.
list( , $maxLag ) = $lbFactory->getMainLB( wfWikiID() )->getMaxLag();
return $response;
}
- // Flush any pending DB writes for sanity
- $lbFactory->commitAll( __METHOD__ );
-
// Catch huge single updates that lead to replica DB lag
$trxProfiler = Profiler::instance()->getTransactionProfiler();
$trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
} else {
$job = $group->pop( $type ); // job from a single queue
}
- $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes
if ( $job ) { // found a job
++$jobsPopped;
$info = $this->executeJob( $job, $lbFactory, $stats, $popTime );
if ( $info['status'] !== false || !$job->allowRetries() ) {
$group->ack( $job ); // succeeded or job cannot be retried
- $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes
}
// Back off of certain jobs for a while (for throttling and for errors)
private function executeJob( Job $job, LBFactory $lbFactory, $stats, $popTime ) {
$jType = $job->getType();
$msg = $job->toString() . " STARTING";
- $this->logger->debug( $msg );
+ $this->logger->debug( $msg, [
+ 'job_type' => $job->getType(),
+ ] );
$this->debugCallback( $msg );
// Run the job...
}
if ( $status === false ) {
+ $msg = $job->toString() . " t={job_duration} error={job_error}";
+ $this->logger->error( $msg, [
+ 'job_type' => $job->getType(),
+ 'job_duration' => $timeMs,
+ 'job_error' => $error,
+ ] );
+
$msg = $job->toString() . " t=$timeMs error={$error}";
- $this->logger->error( $msg );
$this->debugCallback( $msg );
} else {
+ $msg = $job->toString() . " t={job_duration} good";
+ $this->logger->info( $msg, [
+ 'job_type' => $job->getType(),
+ 'job_duration' => $timeMs,
+ ] );
+
$msg = $job->toString() . " t=$timeMs good";
- $this->logger->info( $msg );
$this->debugCallback( $msg );
}
}
$usedBytes = memory_get_usage();
if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
+ $msg = "Detected excessive memory usage ({used_bytes}/{max_bytes}).";
+ $this->logger->error( $msg, [
+ 'used_bytes' => $usedBytes,
+ 'max_bytes' => $maxBytes,
+ ] );
+
$msg = "Detected excessive memory usage ($usedBytes/$maxBytes).";
$this->debugCallback( $msg );
- $this->logger->error( $msg );
return false;
}
}
$ms = intval( 1000 * $time );
+
+ $msg = $job->toString() . " COMMIT ENQUEUED [{job_commit_write_ms}ms of writes]";
+ $this->logger->info( $msg, [
+ 'job_type' => $job->getType(),
+ 'job_commit_write_ms' => $ms,
+ ] );
+
$msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]";
- $this->logger->info( $msg );
$this->debugCallback( $msg );
// Wait for an exclusive lock to commit