Various code cleanup to JobRunner
[lhc/web/wiklou.git] / includes / jobqueue / JobRunner.php
index b8c5d6c..efc36cc 100644 (file)
@@ -35,6 +35,11 @@ class JobRunner implements LoggerAwareInterface {
        /** @var callable|null Debug output handler */
        protected $debug;
 
+       /**
+        * @var LoggerInterface $logger
+        */
+       protected $logger;
+
        /**
         * @param callable $debug Optional debug output handler
         */
@@ -42,13 +47,9 @@ class JobRunner implements LoggerAwareInterface {
                $this->debug = $debug;
        }
 
-       /**
-        * @var LoggerInterface $logger
-        */
-       protected $logger;
-
        /**
         * @param LoggerInterface $logger
+        * @return void
         */
        public function setLogger( LoggerInterface $logger ) {
                $this->logger = $logger;
@@ -88,7 +89,7 @@ class JobRunner implements LoggerAwareInterface {
         * @return array Summary response that can easily be JSON serialized
         */
        public function run( array $options ) {
-               global $wgJobClasses;
+               global $wgJobClasses, $wgTrxProfilerLimits;
 
                $response = array( 'jobs' => array(), 'reached' => 'none-ready' );
 
@@ -102,43 +103,45 @@ class JobRunner implements LoggerAwareInterface {
                        return $response;
                }
 
-               $group = JobQueueGroup::singleton();
-               // Handle any required periodic queue maintenance
-               $count = $group->executeReadyPeriodicTasks();
-               if ( $count > 0 ) {
-                       $msg = "Executed $count periodic queue task(s).";
-                       $this->logger->debug( $msg );
-                       $this->debugCallback( $msg );
-               }
-
                // Bail out if in read-only mode
                if ( wfReadOnly() ) {
                        $response['reached'] = 'read-only';
                        return $response;
                }
 
-               // Bail out if there is too much DB lag
-               list( , $maxLag ) = wfGetLBFactory()->getMainLB( wfWikiID() )->getMaxLag();
-               if ( $maxLag >= 5 ) {
+               $profiler = Profiler::instance();
+
+               // Catch huge single updates that lead to slave lag
+               $trxProfiler = $profiler->getTransactionProfiler();
+               $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
+               $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
+
+               // Bail out if there is too much DB lag.
+               // This check should not block as we want to try other wiki queues.
+               $maxAllowedLag = 3;
+               list( , $maxLag ) = wfGetLB( wfWikiID() )->getMaxLag();
+               if ( $maxLag >= $maxAllowedLag ) {
                        $response['reached'] = 'slave-lag-limit';
                        return $response;
                }
 
+               $group = JobQueueGroup::singleton();
+               
                // Flush any pending DB writes for sanity
-               wfGetLBFactory()->commitMasterChanges();
+               wfGetLBFactory()->commitAll();
 
                // Some jobs types should not run until a certain timestamp
                $backoffs = array(); // map of (type => UNIX expiry)
                $backoffDeltas = array(); // map of (type => seconds)
                $wait = 'wait'; // block to read backoffs the first time
 
-               $jobsRun = 0;
+               $stats = RequestContext::getMain()->getStats();
+               $jobsPopped = 0;
                $timeMsTotal = 0;
                $flags = JobQueueGroup::USE_CACHE;
-               $checkPeriod = 5.0; // seconds
-               $checkPhase = mt_rand( 0, 1000 * $checkPeriod ) / 1000; // avoid stampedes
                $startTime = microtime( true ); // time since jobs started running
-               $lastTime = microtime( true ) - $checkPhase; // time since last slave check
+               $checkLagPeriod = 1.0; // check slave lag this many seconds
+               $lastCheckTime = 1; // timestamp of last slave check
                do {
                        // Sync the persistent backoffs with concurrent runners
                        $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
@@ -154,6 +157,7 @@ class JobRunner implements LoggerAwareInterface {
                        }
 
                        if ( $job ) { // found a job
+                               $popTime = time();
                                $jType = $job->getType();
 
                                // Back off of certain jobs for a while (for throttling and for errors)
@@ -168,24 +172,38 @@ class JobRunner implements LoggerAwareInterface {
                                }
 
                                $msg = $job->toString() . " STARTING";
-                               $this->logger->info( $msg );
+                               $this->logger->debug( $msg );
                                $this->debugCallback( $msg );
 
                                // Run the job...
+                               $psection = $profiler->scopedProfileIn( __METHOD__ . '-' . $jType );
                                $jobStartTime = microtime( true );
                                try {
-                                       ++$jobsRun;
+                                       ++$jobsPopped;
                                        $status = $job->run();
                                        $error = $job->getLastError();
-                                       wfGetLBFactory()->commitMasterChanges();
+                                       $this->commitMasterChanges( $job );
+
+                                       DeferredUpdates::doUpdates();
+                                       $this->commitMasterChanges( $job );
                                } catch ( Exception $e ) {
                                        MWExceptionHandler::rollbackMasterChangesAndLog( $e );
                                        $status = false;
                                        $error = get_class( $e ) . ': ' . $e->getMessage();
                                        MWExceptionHandler::logException( $e );
                                }
+                               // Commit all outstanding connections that are in a transaction
+                               // to get a fresh repeatable read snapshot on every connection.
+                               wfGetLBFactory()->commitAll();
                                $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
                                $timeMsTotal += $timeMs;
+                               $profiler->scopedProfileOut( $psection );
+
+                               if ( $job->getQueuedTimestamp() ) {
+                                       // Record time to run for the job type
+                                       $stats->timing( "job-pickuptime-$jType",
+                                               $popTime - $job->getQueuedTimestamp() );
+                               }
 
                                // Mark the job as done on success or when the job cannot be retried
                                if ( $status !== false || !$job->allowRetries() ) {
@@ -218,7 +236,7 @@ class JobRunner implements LoggerAwareInterface {
                                );
 
                                // Break out if we hit the job count or wall time limits...
-                               if ( $maxJobs && $jobsRun >= $maxJobs ) {
+                               if ( $maxJobs && $jobsPopped >= $maxJobs ) {
                                        $response['reached'] = 'job-limit';
                                        break;
                                } elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) {
@@ -229,16 +247,16 @@ class JobRunner implements LoggerAwareInterface {
                                // Don't let any of the main DB slaves get backed up.
                                // This only waits for so long before exiting and letting
                                // other wikis in the farm (on different masters) get a chance.
-                               $timePassed = microtime( true ) - $lastTime;
-                               if ( $timePassed >= 5 || $timePassed < 0 ) {
-                                       if ( !wfWaitForSlaves( $lastTime, false, '*', 5 ) ) {
+                               $timePassed = microtime( true ) - $lastCheckTime;
+                               if ( $timePassed >= $checkLagPeriod || $timePassed < 0 ) {
+                                       if ( !wfWaitForSlaves( $lastCheckTime, false, '*', $maxAllowedLag ) ) {
                                                $response['reached'] = 'slave-lag-limit';
                                                break;
                                        }
-                                       $lastTime = microtime( true );
+                                       $lastCheckTime = microtime( true );
                                }
                                // Don't let any queue slaves/backups fall behind
-                               if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) {
+                               if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) {
                                        $group->waitForBackups();
                                }
 
@@ -298,7 +316,6 @@ class JobRunner implements LoggerAwareInterface {
         * @return array Map of (job type => backoff expiry timestamp)
         */
        private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
-
                $file = wfTempDir() . '/mw-runJobs-backoffs.json';
                if ( is_file( $file ) ) {
                        $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
@@ -336,7 +353,6 @@ class JobRunner implements LoggerAwareInterface {
         * @return array The new backoffs account for $backoffs and the latest file data
         */
        private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
-
                if ( !$deltas ) {
                        return $this->loadBackoffs( $backoffs, $mode );
                }
@@ -403,4 +419,68 @@ class JobRunner implements LoggerAwareInterface {
                        call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) );
                }
        }
+
+       /**
+        * Issue a commit on all masters who are currently in a transaction and have
+        * made changes to the database. It also supports sometimes waiting for the
+        * local wiki's slaves to catch up. See the documentation for
+        * $wgJobSerialCommitThreshold for more.
+        *
+        * @param Job $job
+        * @throws DBError
+        */
+       private function commitMasterChanges( Job $job ) {
+               global $wgJobSerialCommitThreshold;
+
+               $lb = wfGetLB( wfWikiID() );
+               if ( $wgJobSerialCommitThreshold !== false ) {
+                       // Generally, there is one master connection to the local DB
+                       $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
+               } else {
+                       $dbwSerial = false;
+               }
+
+               if ( !$dbwSerial
+                       || !$dbwSerial->namedLocksEnqueue()
+                       || $dbwSerial->pendingWriteQueryDuration() < $wgJobSerialCommitThreshold
+               ) {
+                       // Writes are all to foreign DBs, named locks don't form queues,
+                       // or $wgJobSerialCommitThreshold is not reached; commit changes now
+                       wfGetLBFactory()->commitMasterChanges();
+                       return;
+               }
+
+               $ms = intval( 1000 * $dbwSerial->pendingWriteQueryDuration() );
+               $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]";
+               $this->logger->info( $msg );
+               $this->debugCallback( $msg );
+
+               // Wait for an exclusive lock to commit
+               if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 30 ) ) {
+                       // This will trigger a rollback in the main loop
+                       throw new DBError( $dbwSerial, "Timed out waiting on commit queue." );
+               }
+               // Wait for the generic slave to catch up
+               $pos = $lb->getMasterPos();
+               if ( $pos ) {
+                       $lb->waitForOne( $pos );
+               }
+
+               $fname = __METHOD__;
+               // Re-ping all masters with transactions. This throws DBError if some
+               // connection died while waiting on locks/slaves, triggering a rollback.
+               wfGetLBFactory()->forEachLB( function( LoadBalancer $lb ) use ( $fname ) {
+                       $lb->forEachOpenConnection( function( DatabaseBase $conn ) use ( $fname ) {
+                               if ( $conn->writesOrCallbacksPending() ) {
+                                       $conn->query( "SELECT 1", $fname );
+                               }
+                       } );
+               } );
+
+               // Actually commit the DB master changes
+               wfGetLBFactory()->commitMasterChanges();
+
+               // Release the lock
+               $dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ );
+       }
 }