+ /**
+ * @param Job $job
+ * @param BufferingStatsdDataFactory $stats
+ * @param float $popTime
+ * @return array Map of status/error/timeMs
+ */
+ private function executeJob( Job $job, $stats, $popTime ) {
+ $jType = $job->getType();
+ $msg = $job->toString() . " STARTING";
+ $this->logger->debug( $msg );
+ $this->debugCallback( $msg );
+
+ // Run the job...
+ $rssStart = $this->getMaxRssKb();
+ $jobStartTime = microtime( true );
+ try {
+ $status = $job->run();
+ $error = $job->getLastError();
+ $this->commitMasterChanges( $job );
+
+ DeferredUpdates::doUpdates();
+ $this->commitMasterChanges( $job );
+ } catch ( Exception $e ) {
+ MWExceptionHandler::rollbackMasterChangesAndLog( $e );
+ $status = false;
+ $error = get_class( $e ) . ': ' . $e->getMessage();
+ MWExceptionHandler::logException( $e );
+ }
+ // Commit all outstanding connections that are in a transaction
+ // to get a fresh repeatable read snapshot on every connection.
+ // Note that jobs are still responsible for handling slave lag.
+ wfGetLBFactory()->commitAll( __METHOD__ );
+ // Clear out title cache data from prior snapshots
+ LinkCache::singleton()->clear();
+ $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
+ $rssEnd = $this->getMaxRssKb();
+
+ // Record how long jobs wait before getting popped
+ $readyTs = $job->getReadyTimestamp();
+ if ( $readyTs ) {
+ $pickupDelay = max( 0, $popTime - $readyTs );
+ $stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
+ $stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
+ }
+ // Record root job age for jobs being run
+ $root = $job->getRootJobParams();
+ if ( $root['rootJobTimestamp'] ) {
+ $age = max( 0, $popTime - wfTimestamp( TS_UNIX, $root['rootJobTimestamp'] ) );
+ $stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age );
+ }
+ // Track the execution time for jobs
+ $stats->timing( "jobqueue.run.$jType", $timeMs );
+ // Track RSS increases for jobs (in case of memory leaks)
+ if ( $rssStart && $rssEnd ) {
+ $stats->increment( "jobqueue.rss_delta.$jType", $rssEnd - $rssStart );
+ }
+
+ if ( $status === false ) {
+ $msg = $job->toString() . " t=$timeMs error={$error}";
+ $this->logger->error( $msg );
+ $this->debugCallback( $msg );
+ } else {
+ $msg = $job->toString() . " t=$timeMs good";
+ $this->logger->info( $msg );
+ $this->debugCallback( $msg );
+ }
+
+ return array( 'status' => $status, 'error' => $error, 'timeMs' => $timeMs );
+ }
+
+ /**
+ * @return int|null Max memory RSS in kilobytes
+ */
+ private function getMaxRssKb() {
+ $info = wfGetRusage() ?: array();
+ // see http://linux.die.net/man/2/getrusage
+ return isset( $info['ru_maxrss'] ) ? (int)$info['ru_maxrss'] : null;
+ }
+