Merge "Skin: Make skins aware of their registered skin name"
[lhc/web/wiklou.git] / includes / jobqueue / JobRunner.php
index a1aeaba..fa7d605 100644 (file)
@@ -38,6 +38,8 @@ use Wikimedia\Rdbms\DBReplicationWaitError;
  * @since 1.24
  */
 class JobRunner implements LoggerAwareInterface {
+       /** @var Config */
+       protected $config;
        /** @var callable|null Debug output handler */
        protected $debug;
 
@@ -74,6 +76,7 @@ class JobRunner implements LoggerAwareInterface {
                        $logger = LoggerFactory::getInstance( 'runJobs' );
                }
                $this->setLogger( $logger );
+               $this->config = MediaWikiServices::getInstance()->getMainConfig();
        }
 
        /**
@@ -101,7 +104,8 @@ class JobRunner implements LoggerAwareInterface {
         * @return array Summary response that can easily be JSON serialized
         */
        public function run( array $options ) {
-               global $wgJobClasses, $wgTrxProfilerLimits;
+               $jobClasses = $this->config->get( 'JobClasses' );
+               $profilerLimits = $this->config->get( 'TrxProfilerLimits' );
 
                $response = [ 'jobs' => [], 'reached' => 'none-ready' ];
 
@@ -111,10 +115,11 @@ class JobRunner implements LoggerAwareInterface {
                $noThrottle = isset( $options['throttle'] ) && !$options['throttle'];
 
                // Bail if job type is invalid
-               if ( $type !== false && !isset( $wgJobClasses[$type] ) ) {
+               if ( $type !== false && !isset( $jobClasses[$type] ) ) {
                        $response['reached'] = 'none-possible';
                        return $response;
                }
+
                // Bail out if DB is in read-only mode
                if ( wfReadOnly() ) {
                        $response['reached'] = 'read-only';
@@ -122,6 +127,9 @@ class JobRunner implements LoggerAwareInterface {
                }
 
                $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+               if ( $lbFactory->hasTransactionRound() ) {
+                       throw new LogicException( __METHOD__ . ' called with an active transaction round.' );
+               }
                // Bail out if there is too much DB lag.
                // This check should not block as we want to try other wiki queues.
                list( , $maxLag ) = $lbFactory->getMainLB( wfWikiID() )->getMaxLag();
@@ -130,13 +138,10 @@ class JobRunner implements LoggerAwareInterface {
                        return $response;
                }
 
-               // Flush any pending DB writes for sanity
-               $lbFactory->commitAll( __METHOD__ );
-
                // Catch huge single updates that lead to replica DB lag
                $trxProfiler = Profiler::instance()->getTransactionProfiler();
                $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
-               $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
+               $trxProfiler->setExpectations( $profilerLimits['JobRunner'], __METHOD__ );
 
                // Some jobs types should not run until a certain timestamp
                $backoffs = []; // map of (type => UNIX expiry)
@@ -166,7 +171,6 @@ class JobRunner implements LoggerAwareInterface {
                        } else {
                                $job = $group->pop( $type ); // job from a single queue
                        }
-                       $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes
 
                        if ( $job ) { // found a job
                                ++$jobsPopped;
@@ -189,7 +193,6 @@ class JobRunner implements LoggerAwareInterface {
                                $info = $this->executeJob( $job, $lbFactory, $stats, $popTime );
                                if ( $info['status'] !== false || !$job->allowRetries() ) {
                                        $group->ack( $job ); // succeeded or job cannot be retried
-                                       $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes
                                }
 
                                // Back off of certain jobs for a while (for throttling and for errors)
@@ -277,7 +280,9 @@ class JobRunner implements LoggerAwareInterface {
        private function executeJob( Job $job, LBFactory $lbFactory, $stats, $popTime ) {
                $jType = $job->getType();
                $msg = $job->toString() . " STARTING";
-               $this->logger->debug( $msg );
+               $this->logger->debug( $msg, [
+                       'job_type' => $job->getType(),
+               ] );
                $this->debugCallback( $msg );
 
                // Run the job...
@@ -289,10 +294,10 @@ class JobRunner implements LoggerAwareInterface {
                        $status = $job->run();
                        $error = $job->getLastError();
                        $this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner );
+                       // Important: this must be the last deferred update added (T100085, T154425)
+                       DeferredUpdates::addCallableUpdate( [ JobQueueGroup::class, 'pushLazyJobs' ] );
                        // Run any deferred update tasks; doUpdates() manages transactions itself
                        DeferredUpdates::doUpdates();
-                       // Push lazy jobs added by the job or its deferred udpates
-                       JobQueueGroup::pushLazyJobs();
                } catch ( Exception $e ) {
                        MWExceptionHandler::rollbackMasterChangesAndLog( $e );
                        $status = false;
@@ -335,12 +340,23 @@ class JobRunner implements LoggerAwareInterface {
                }
 
                if ( $status === false ) {
+                       $msg = $job->toString() . " t={job_duration} error={job_error}";
+                       $this->logger->error( $msg, [
+                               'job_type' => $job->getType(),
+                               'job_duration' => $timeMs,
+                               'job_error' => $error,
+                       ] );
+
                        $msg = $job->toString() . " t=$timeMs error={$error}";
-                       $this->logger->error( $msg );
                        $this->debugCallback( $msg );
                } else {
+                       $msg = $job->toString() . " t={job_duration} good";
+                       $this->logger->info( $msg, [
+                               'job_type' => $job->getType(),
+                               'job_duration' => $timeMs,
+                       ] );
+
                        $msg = $job->toString() . " t=$timeMs good";
-                       $this->logger->info( $msg );
                        $this->debugCallback( $msg );
                }
 
@@ -362,15 +378,13 @@ class JobRunner implements LoggerAwareInterface {
         * @see $wgJobBackoffThrottling
         */
        private function getBackoffTimeToWait( Job $job ) {
-               global $wgJobBackoffThrottling;
+               $throttling = $this->config->get( 'JobBackoffThrottling' );
 
-               if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) ||
-                       $job instanceof DuplicateJob // no work was done
-               ) {
+               if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) {
                        return 0; // not throttled
                }
 
-               $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()];
+               $itemsPerSecond = $throttling[$job->getType()];
                if ( $itemsPerSecond <= 0 ) {
                        return 0; // not throttled
                }
@@ -486,9 +500,14 @@ class JobRunner implements LoggerAwareInterface {
                }
                $usedBytes = memory_get_usage();
                if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
+                       $msg = "Detected excessive memory usage ({used_bytes}/{max_bytes}).";
+                       $this->logger->error( $msg, [
+                               'used_bytes' => $usedBytes,
+                               'max_bytes' => $maxBytes,
+                       ] );
+
                        $msg = "Detected excessive memory usage ($usedBytes/$maxBytes).";
                        $this->debugCallback( $msg );
-                       $this->logger->error( $msg );
 
                        return false;
                }
@@ -518,17 +537,17 @@ class JobRunner implements LoggerAwareInterface {
         * @throws DBError
         */
        private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) {
-               global $wgJobSerialCommitThreshold;
+               $syncThreshold = $this->config->get( 'JobSerialCommitThreshold' );
 
                $time = false;
                $lb = $lbFactory->getMainLB( wfWikiID() );
-               if ( $wgJobSerialCommitThreshold !== false && $lb->getServerCount() > 1 ) {
+               if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) {
                        // Generally, there is one master connection to the local DB
                        $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
                        // We need natively blocking fast locks
                        if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
                                $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
-                               if ( $time < $wgJobSerialCommitThreshold ) {
+                               if ( $time < $syncThreshold ) {
                                        $dbwSerial = false;
                                }
                        } else {
@@ -540,13 +559,24 @@ class JobRunner implements LoggerAwareInterface {
                }
 
                if ( !$dbwSerial ) {
-                       $lbFactory->commitMasterChanges( $fnameTrxOwner );
+                       $lbFactory->commitMasterChanges(
+                               $fnameTrxOwner,
+                               // Abort if any transaction was too big
+                               [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
+                       );
+
                        return;
                }
 
                $ms = intval( 1000 * $time );
+
+               $msg = $job->toString() . " COMMIT ENQUEUED [{job_commit_write_ms}ms of writes]";
+               $this->logger->info( $msg, [
+                       'job_type' => $job->getType(),
+                       'job_commit_write_ms' => $ms,
+               ] );
+
                $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]";
-               $this->logger->info( $msg );
                $this->debugCallback( $msg );
 
                // Wait for an exclusive lock to commit
@@ -565,7 +595,11 @@ class JobRunner implements LoggerAwareInterface {
                }
 
                // Actually commit the DB master changes
-               $lbFactory->commitMasterChanges( $fnameTrxOwner );
+               $lbFactory->commitMasterChanges(
+                       $fnameTrxOwner,
+                       // Abort if any transaction was too big
+                       [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
+               );
                ScopedCallback::consume( $unlocker );
        }
 }