Merge "Turn a wfDebug call in MessageCache::get into a wfDebugLog call"
[lhc/web/wiklou.git] / includes / jobqueue / JobRunner.php
index 8462924..4d2c618 100644 (file)
@@ -76,24 +76,34 @@ class JobRunner {
                        $this->runJobsLog( "Executed $count periodic queue task(s)." );
                }
 
+               // Bail out if there is too much DB lag
+               list( , $maxLag ) = wfGetLBFactory()->getMainLB( wfWikiID() )->getMaxLag();
+               if ( $maxLag >= 5 ) {
+                       $response['reached'] = 'slave-lag-limit';
+                       return $response;
+               }
+
                // Flush any pending DB writes for sanity
                wfGetLBFactory()->commitMasterChanges();
 
                // Some jobs types should not run until a certain timestamp
-               $backoffs = $this->loadBackoffs( array(), 'wait' ); // map of (type => UNIX expiry)
+               $backoffs = array(); // map of (type => UNIX expiry)
                $backoffDeltas = array(); // map of (type => seconds)
-               $backoffExpireFunc = function ( $t ) {
-                       return $t > time();
-               };
+               $wait = 'wait'; // block to read backoffs the first time
 
-               $jobsRun = 0; // counter
+               $jobsRun = 0;
                $timeMsTotal = 0;
                $flags = JobQueueGroup::USE_CACHE;
-               $sTime = microtime( true ); // time since jobs started running
-               $lastTime = microtime( true ); // time since last slave check
+               $checkPeriod = 5.0; // seconds
+               $checkPhase = mt_rand( 0, 1000 * $checkPeriod ) / 1000; // avoid stampedes
+               $startTime = microtime( true ); // time since jobs started running
+               $lastTime = microtime( true ) - $checkPhase; // time since last slave check
                do {
-                       $backoffs = array_filter( $backoffs, $backoffExpireFunc );
+                       // Sync the persistent backoffs with concurrent runners
+                       $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
                        $blacklist = $noThrottle ? array() : array_keys( $backoffs );
+                       $wait = 'nowait'; // less important now
+
                        if ( $type === false ) {
                                $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags, $blacklist );
                        } elseif ( in_array( $type, $blacklist ) ) {
@@ -101,14 +111,26 @@ class JobRunner {
                        } else {
                                $job = $group->pop( $type ); // job from a single queue
                        }
+
                        if ( $job ) { // found a job
                                $jType = $job->getType();
 
+                               // Back off of certain jobs for a while (for throttling and for errors)
+                               $ttw = $this->getBackoffTimeToWait( $job );
+                               if ( $ttw > 0 ) {
+                                       // Always add the delta for other runners in case the time running the
+                                       // job negated the backoff for each individually but not collectively.
+                                       $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
+                                               ? $backoffDeltas[$jType] + $ttw
+                                               : $ttw;
+                                       $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
+                               }
+
                                $this->runJobsLog( $job->toString() . " STARTING" );
 
                                // Run the job...
                                wfProfileIn( __METHOD__ . '-' . get_class( $job ) );
-                               $sTime = microtime( true );
+                               $jobStartTime = microtime( true );
                                try {
                                        ++$jobsRun;
                                        $status = $job->run();
@@ -120,7 +142,7 @@ class JobRunner {
                                        $error = get_class( $e ) . ': ' . $e->getMessage();
                                        MWExceptionHandler::logException( $e );
                                }
-                               $timeMs = intval( ( microtime( true ) - $sTime ) * 1000 );
+                               $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
                                wfProfileOut( __METHOD__ . '-' . get_class( $job ) );
                                $timeMsTotal += $timeMs;
 
@@ -130,23 +152,13 @@ class JobRunner {
                                }
 
                                // Back off of certain jobs for a while (for throttling and for errors)
-                               $ttw = $this->getBackoffTimeToWait( $job );
                                if ( $status === false && mt_rand( 0, 49 ) == 0 ) {
                                        $ttw = max( $ttw, 30 ); // too many errors
-                               }
-                               if ( $ttw > 0 ) {
-                                       // Always add the delta for other runners in case the time running the
-                                       // job negated the backoff for each individually but not collectively.
                                        $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
                                                ? $backoffDeltas[$jType] + $ttw
                                                : $ttw;
                                }
 
-                               // Sync the persistent backoffs with concurrent runners
-                               $backoffs = $backoffDeltas
-                                       ? $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'nowait' )
-                                       : $this->loadBackoffs( $backoffs, 'nowait' );
-
                                if ( $status === false ) {
                                        $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" );
                                } else {
@@ -164,15 +176,20 @@ class JobRunner {
                                if ( $maxJobs && $jobsRun >= $maxJobs ) {
                                        $response['reached'] = 'job-limit';
                                        break;
-                               } elseif ( $maxTime && ( microtime( true ) - $sTime ) > $maxTime ) {
+                               } elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) {
                                        $response['reached'] = 'time-limit';
                                        break;
                                }
 
-                               // Don't let any of the main DB slaves get backed up
+                               // Don't let any of the main DB slaves get backed up.
+                               // This only waits for so long before exiting and letting
+                               // other wikis in the farm (on different masters) get a chance.
                                $timePassed = microtime( true ) - $lastTime;
                                if ( $timePassed >= 5 || $timePassed < 0 ) {
-                                       wfWaitForSlaves( $lastTime );
+                                       if ( !wfWaitForSlaves( $lastTime, wfWikiID(), false, 5 ) ) {
+                                               $response['reached'] = 'slave-lag-limit';
+                                               break;
+                                       }
                                        $lastTime = microtime( true );
                                }
                                // Don't let any queue slaves/backups fall behind
@@ -249,12 +266,18 @@ class JobRunner {
                        $content = stream_get_contents( $handle );
                        flock( $handle, LOCK_UN );
                        fclose( $handle );
-                       $backoffs = json_decode( $content, true ) ?: array();
+                       $ctime = microtime( true );
+                       $cBackoffs = json_decode( $content, true ) ?: array();
+                       foreach ( $cBackoffs as $type => $timestamp ) {
+                               if ( $timestamp < $ctime ) {
+                                       unset( $cBackoffs[$type] );
+                               }
+                       }
                } else {
-                       $backoffs = array();
+                       $cBackoffs = array();
                }
 
-               return $backoffs;
+               return $cBackoffs;
        }
 
        /**
@@ -271,7 +294,10 @@ class JobRunner {
        private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
                $section = new ProfileSection( __METHOD__ );
 
-               $ctime = time();
+               if ( !$deltas ) {
+                       return $this->loadBackoffs( $backoffs, $mode );
+               }
+
                $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
                $file = wfTempDir() . '/mw-runJobs-backoffs.json';
                $handle = fopen( $file, 'wb+' );
@@ -279,6 +305,7 @@ class JobRunner {
                        fclose( $handle );
                        return $backoffs; // don't wait on lock
                }
+               $ctime = microtime( true );
                $content = stream_get_contents( $handle );
                $cBackoffs = json_decode( $content, true ) ?: array();
                foreach ( $deltas as $type => $seconds ) {
@@ -286,6 +313,11 @@ class JobRunner {
                                ? $cBackoffs[$type] + $seconds
                                : $ctime + $seconds;
                }
+               foreach ( $cBackoffs as $type => $timestamp ) {
+                       if ( $timestamp < $ctime ) {
+                               unset( $cBackoffs[$type] );
+                       }
+               }
                ftruncate( $handle, 0 );
                fwrite( $handle, json_encode( $cBackoffs ) );
                flock( $handle, LOCK_UN );