Merge "Turn a wfDebug call in MessageCache::get into a wfDebugLog call"
[lhc/web/wiklou.git] / includes / jobqueue / JobRunner.php
index 617a3a3..4d2c618 100644 (file)
@@ -76,23 +76,34 @@ class JobRunner {
                        $this->runJobsLog( "Executed $count periodic queue task(s)." );
                }
 
+               // Bail out if there is too much DB lag
+               list( , $maxLag ) = wfGetLBFactory()->getMainLB( wfWikiID() )->getMaxLag();
+               if ( $maxLag >= 5 ) {
+                       $response['reached'] = 'slave-lag-limit';
+                       return $response;
+               }
+
                // Flush any pending DB writes for sanity
                wfGetLBFactory()->commitMasterChanges();
 
-               $backoffs = $this->loadBackoffs(); // map of (type => UNIX expiry)
-               $startingBackoffs = $backoffs; // avoid unnecessary writes
-               $backoffExpireFunc = function ( $t ) {
-                       return $t > time();
-               };
+               // Some jobs types should not run until a certain timestamp
+               $backoffs = array(); // map of (type => UNIX expiry)
+               $backoffDeltas = array(); // map of (type => seconds)
+               $wait = 'wait'; // block to read backoffs the first time
 
-               $jobsRun = 0; // counter
+               $jobsRun = 0;
                $timeMsTotal = 0;
                $flags = JobQueueGroup::USE_CACHE;
+               $checkPeriod = 5.0; // seconds
+               $checkPhase = mt_rand( 0, 1000 * $checkPeriod ) / 1000; // avoid stampedes
                $startTime = microtime( true ); // time since jobs started running
-               $lastTime = microtime( true ); // time since last slave check
+               $lastTime = microtime( true ) - $checkPhase; // time since last slave check
                do {
-                       $backoffs = array_filter( $backoffs, $backoffExpireFunc );
+                       // Sync the persistent backoffs with concurrent runners
+                       $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
                        $blacklist = $noThrottle ? array() : array_keys( $backoffs );
+                       $wait = 'nowait'; // less important now
+
                        if ( $type === false ) {
                                $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags, $blacklist );
                        } elseif ( in_array( $type, $blacklist ) ) {
@@ -100,14 +111,26 @@ class JobRunner {
                        } else {
                                $job = $group->pop( $type ); // job from a single queue
                        }
+
                        if ( $job ) { // found a job
                                $jType = $job->getType();
 
+                               // Back off of certain jobs for a while (for throttling and for errors)
+                               $ttw = $this->getBackoffTimeToWait( $job );
+                               if ( $ttw > 0 ) {
+                                       // Always add the delta for other runners in case the time running the
+                                       // job negated the backoff for each individually but not collectively.
+                                       $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
+                                               ? $backoffDeltas[$jType] + $ttw
+                                               : $ttw;
+                                       $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
+                               }
+
                                $this->runJobsLog( $job->toString() . " STARTING" );
 
                                // Run the job...
                                wfProfileIn( __METHOD__ . '-' . get_class( $job ) );
-                               $t = microtime( true );
+                               $jobStartTime = microtime( true );
                                try {
                                        ++$jobsRun;
                                        $status = $job->run();
@@ -119,7 +142,7 @@ class JobRunner {
                                        $error = get_class( $e ) . ': ' . $e->getMessage();
                                        MWExceptionHandler::logException( $e );
                                }
-                               $timeMs = intval( ( microtime( true ) - $t ) * 1000 );
+                               $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
                                wfProfileOut( __METHOD__ . '-' . get_class( $job ) );
                                $timeMsTotal += $timeMs;
 
@@ -128,6 +151,14 @@ class JobRunner {
                                        $group->ack( $job ); // done
                                }
 
+                               // Back off of certain jobs for a while (for throttling and for errors)
+                               if ( $status === false && mt_rand( 0, 49 ) == 0 ) {
+                                       $ttw = max( $ttw, 30 ); // too many errors
+                                       $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
+                                               ? $backoffDeltas[$jType] + $ttw
+                                               : $ttw;
+                               }
+
                                if ( $status === false ) {
                                        $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" );
                                } else {
@@ -141,16 +172,6 @@ class JobRunner {
                                        'time'   => $timeMs
                                );
 
-                               // Back off of certain jobs for a while (for throttling and for errors)
-                               $ttw = $this->getBackoffTimeToWait( $job );
-                               if ( $status === false && mt_rand( 0, 49 ) == 0 ) {
-                                       $ttw = max( $ttw, 30 );
-                               }
-                               if ( $ttw > 0 ) {
-                                       $backoffs[$jType] = isset( $backoffs[$jType] ) ? $backoffs[$jType] : 0;
-                                       $backoffs[$jType] = max( $backoffs[$jType], time() + $ttw );
-                               }
-
                                // Break out if we hit the job count or wall time limits...
                                if ( $maxJobs && $jobsRun >= $maxJobs ) {
                                        $response['reached'] = 'job-limit';
@@ -160,10 +181,15 @@ class JobRunner {
                                        break;
                                }
 
-                               // Don't let any of the main DB slaves get backed up
+                               // Don't let any of the main DB slaves get backed up.
+                               // This only waits for so long before exiting and letting
+                               // other wikis in the farm (on different masters) get a chance.
                                $timePassed = microtime( true ) - $lastTime;
                                if ( $timePassed >= 5 || $timePassed < 0 ) {
-                                       wfWaitForSlaves( $lastTime );
+                                       if ( !wfWaitForSlaves( $lastTime, wfWikiID(), false, 5 ) ) {
+                                               $response['reached'] = 'slave-lag-limit';
+                                               break;
+                                       }
                                        $lastTime = microtime( true );
                                }
                                // Don't let any queue slaves/backups fall behind
@@ -177,9 +203,8 @@ class JobRunner {
                } while ( $job ); // stop when there are no jobs
 
                // Sync the persistent backoffs for the next runJobs.php pass
-               $backoffs = array_filter( $backoffs, $backoffExpireFunc );
-               if ( $backoffs !== $startingBackoffs ) {
-                       $this->syncBackoffs( $backoffs );
+               if ( $backoffDeltas ) {
+                       $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' );
                }
 
                $response['backoffs'] = $backoffs;
@@ -221,47 +246,86 @@ class JobRunner {
 
        /**
         * Get the previous backoff expiries from persistent storage
+        * On I/O or lock acquisition failure this returns the original $backoffs.
         *
+        * @param array $backoffs Map of (job type => UNIX timestamp)
+        * @param string $mode Lock wait mode - "wait" or "nowait"
         * @return array Map of (job type => backoff expiry timestamp)
         */
-       private function loadBackoffs() {
+       private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
                $section = new ProfileSection( __METHOD__ );
 
-               $backoffs = array();
                $file = wfTempDir() . '/mw-runJobs-backoffs.json';
                if ( is_file( $file ) ) {
+                       $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
                        $handle = fopen( $file, 'rb' );
-                       flock( $handle, LOCK_SH );
+                       if ( !flock( $handle, LOCK_SH | $noblock ) ) {
+                               fclose( $handle );
+                               return $backoffs; // don't wait on lock
+                       }
                        $content = stream_get_contents( $handle );
                        flock( $handle, LOCK_UN );
                        fclose( $handle );
-                       $backoffs = json_decode( $content, true ) ?: array();
+                       $ctime = microtime( true );
+                       $cBackoffs = json_decode( $content, true ) ?: array();
+                       foreach ( $cBackoffs as $type => $timestamp ) {
+                               if ( $timestamp < $ctime ) {
+                                       unset( $cBackoffs[$type] );
+                               }
+                       }
+               } else {
+                       $cBackoffs = array();
                }
 
-               return $backoffs;
+               return $cBackoffs;
        }
 
        /**
         * Merge the current backoff expiries from persistent storage
         *
-        * @param array $backoffs Map of (job type => backoff expiry timestamp)
+        * The $deltas map is set to an empty array on success.
+        * On I/O or lock acquisition failure this returns the original $backoffs.
+        *
+        * @param array $backoffs Map of (job type => UNIX timestamp)
+        * @param array $deltas Map of (job type => seconds)
+        * @param string $mode Lock wait mode - "wait" or "nowait"
+        * @return array The new backoffs account for $backoffs and the latest file data
         */
-       private function syncBackoffs( array $backoffs ) {
+       private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
                $section = new ProfileSection( __METHOD__ );
 
+               if ( !$deltas ) {
+                       return $this->loadBackoffs( $backoffs, $mode );
+               }
+
+               $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
                $file = wfTempDir() . '/mw-runJobs-backoffs.json';
                $handle = fopen( $file, 'wb+' );
-               flock( $handle, LOCK_EX );
+               if ( !flock( $handle, LOCK_EX | $noblock ) ) {
+                       fclose( $handle );
+                       return $backoffs; // don't wait on lock
+               }
+               $ctime = microtime( true );
                $content = stream_get_contents( $handle );
                $cBackoffs = json_decode( $content, true ) ?: array();
-               foreach ( $backoffs as $type => $timestamp ) {
-                       $cBackoffs[$type] = isset( $cBackoffs[$type] ) ? $cBackoffs[$type] : 0;
-                       $cBackoffs[$type] = max( $cBackoffs[$type], $backoffs[$type] );
+               foreach ( $deltas as $type => $seconds ) {
+                       $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
+                               ? $cBackoffs[$type] + $seconds
+                               : $ctime + $seconds;
+               }
+               foreach ( $cBackoffs as $type => $timestamp ) {
+                       if ( $timestamp < $ctime ) {
+                               unset( $cBackoffs[$type] );
+                       }
                }
                ftruncate( $handle, 0 );
-               fwrite( $handle, json_encode( $backoffs ) );
+               fwrite( $handle, json_encode( $cBackoffs ) );
                flock( $handle, LOCK_UN );
                fclose( $handle );
+
+               $deltas = array();
+
+               return $cBackoffs;
        }
 
        /**