Merge "API: Have generator=random set a non-continuation value"
[lhc/web/wiklou.git] / includes / jobqueue / JobRunner.php
index 990f112..49b7a45 100644 (file)
@@ -27,6 +27,9 @@ use Liuggio\StatsdClient\Factory\StatsdDataFactory;
 use Psr\Log\LoggerAwareInterface;
 use Psr\Log\LoggerInterface;
 use Wikimedia\ScopedCallback;
+use Wikimedia\Rdbms\LBFactory;
+use Wikimedia\Rdbms\DBError;
+use Wikimedia\Rdbms\DBReplicationWaitError;
 
 /**
  * Job queue runner utility methods
@@ -35,6 +38,8 @@ use Wikimedia\ScopedCallback;
  * @since 1.24
  */
 class JobRunner implements LoggerAwareInterface {
+       /** @var Config */
+       protected $config;
        /** @var callable|null Debug output handler */
        protected $debug;
 
@@ -46,6 +51,7 @@ class JobRunner implements LoggerAwareInterface {
        const MAX_ALLOWED_LAG = 3; // abort if more than this much DB lag is present
        const LAG_CHECK_PERIOD = 1.0; // check replica DB lag this many seconds
        const ERROR_BACKOFF_TTL = 1; // seconds to back off a queue due to errors
+       const READONLY_BACKOFF_TTL = 30; // seconds to back off a queue due to read-only errors
 
        /**
         * @param callable $debug Optional debug output handler
@@ -70,6 +76,7 @@ class JobRunner implements LoggerAwareInterface {
                        $logger = LoggerFactory::getInstance( 'runJobs' );
                }
                $this->setLogger( $logger );
+               $this->config = MediaWikiServices::getInstance()->getMainConfig();
        }
 
        /**
@@ -97,7 +104,8 @@ class JobRunner implements LoggerAwareInterface {
         * @return array Summary response that can easily be JSON serialized
         */
        public function run( array $options ) {
-               global $wgJobClasses, $wgTrxProfilerLimits;
+               $jobClasses = $this->config->get( 'JobClasses' );
+               $profilerLimits = $this->config->get( 'TrxProfilerLimits' );
 
                $response = [ 'jobs' => [], 'reached' => 'none-ready' ];
 
@@ -107,7 +115,7 @@ class JobRunner implements LoggerAwareInterface {
                $noThrottle = isset( $options['throttle'] ) && !$options['throttle'];
 
                // Bail if job type is invalid
-               if ( $type !== false && !isset( $wgJobClasses[$type] ) ) {
+               if ( $type !== false && !isset( $jobClasses[$type] ) ) {
                        $response['reached'] = 'none-possible';
                        return $response;
                }
@@ -132,7 +140,7 @@ class JobRunner implements LoggerAwareInterface {
                // Catch huge single updates that lead to replica DB lag
                $trxProfiler = Profiler::instance()->getTransactionProfiler();
                $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
-               $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
+               $trxProfiler->setExpectations( $profilerLimits['JobRunner'], __METHOD__ );
 
                // Some jobs types should not run until a certain timestamp
                $backoffs = []; // map of (type => UNIX expiry)
@@ -190,7 +198,7 @@ class JobRunner implements LoggerAwareInterface {
 
                                // Back off of certain jobs for a while (for throttling and for errors)
                                if ( $info['status'] === false && mt_rand( 0, 49 ) == 0 ) {
-                                       $ttw = max( $ttw, self::ERROR_BACKOFF_TTL ); // too many errors
+                                       $ttw = max( $ttw, $this->getErrorBackoffTTL( $info['error'] ) );
                                        $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
                                                ? $backoffDeltas[$jType] + $ttw
                                                : $ttw;
@@ -253,6 +261,16 @@ class JobRunner implements LoggerAwareInterface {
                return $response;
        }
 
+       /**
+        * @param string $error
+        * @return int TTL in seconds
+        */
+       private function getErrorBackoffTTL( $error ) {
+               return strpos( $error, 'DBReadOnlyError' ) !== false
+                       ? self::READONLY_BACKOFF_TTL
+                       : self::ERROR_BACKOFF_TTL;
+       }
+
        /**
         * @param Job $job
         * @param LBFactory $lbFactory
@@ -275,6 +293,8 @@ class JobRunner implements LoggerAwareInterface {
                        $status = $job->run();
                        $error = $job->getLastError();
                        $this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner );
+                       // Important: this must be the last deferred update added (T100085, T154425)
+                       DeferredUpdates::addCallableUpdate( [ JobQueueGroup::class, 'pushLazyJobs' ] );
                        // Run any deferred update tasks; doUpdates() manages transactions itself
                        DeferredUpdates::doUpdates();
                } catch ( Exception $e ) {
@@ -346,15 +366,13 @@ class JobRunner implements LoggerAwareInterface {
         * @see $wgJobBackoffThrottling
         */
        private function getBackoffTimeToWait( Job $job ) {
-               global $wgJobBackoffThrottling;
+               $throttling = $this->config->get( 'JobBackoffThrottling' );
 
-               if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) ||
-                       $job instanceof DuplicateJob // no work was done
-               ) {
+               if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) {
                        return 0; // not throttled
                }
 
-               $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()];
+               $itemsPerSecond = $throttling[$job->getType()];
                if ( $itemsPerSecond <= 0 ) {
                        return 0; // not throttled
                }
@@ -502,17 +520,17 @@ class JobRunner implements LoggerAwareInterface {
         * @throws DBError
         */
        private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) {
-               global $wgJobSerialCommitThreshold;
+               $syncThreshold = $this->config->get( 'JobSerialCommitThreshold' );
 
                $time = false;
                $lb = $lbFactory->getMainLB( wfWikiID() );
-               if ( $wgJobSerialCommitThreshold !== false && $lb->getServerCount() > 1 ) {
+               if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) {
                        // Generally, there is one master connection to the local DB
                        $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
                        // We need natively blocking fast locks
                        if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
                                $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
-                               if ( $time < $wgJobSerialCommitThreshold ) {
+                               if ( $time < $syncThreshold ) {
                                        $dbwSerial = false;
                                }
                        } else {
@@ -524,7 +542,12 @@ class JobRunner implements LoggerAwareInterface {
                }
 
                if ( !$dbwSerial ) {
-                       $lbFactory->commitMasterChanges( $fnameTrxOwner );
+                       $lbFactory->commitMasterChanges(
+                               $fnameTrxOwner,
+                               // Abort if any transaction was too big
+                               [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
+                       );
+
                        return;
                }
 
@@ -549,7 +572,11 @@ class JobRunner implements LoggerAwareInterface {
                }
 
                // Actually commit the DB master changes
-               $lbFactory->commitMasterChanges( $fnameTrxOwner );
+               $lbFactory->commitMasterChanges(
+                       $fnameTrxOwner,
+                       // Abort if any transaction was too big
+                       [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
+               );
                ScopedCallback::consume( $unlocker );
        }
 }