Merge "Ensure users are able to edit the page after changing the content model"
[lhc/web/wiklou.git] / includes / deferred / DeferredUpdates.php
index ee14e1a..8a761f5 100644 (file)
@@ -19,6 +19,7 @@
  *
  * @file
  */
+use MediaWiki\MediaWikiServices;
 
 /**
  * Class for managing the deferred updates
  * In web request mode, deferred updates can be run at the end of the request, either before or
  * after the HTTP response has been sent. In either case, they run after the DB commit step. If
  * an update runs after the response is sent, it will not block clients. If sent before, it will
- * run synchronously. If such an update works via queueing, it will be more likely to complete by
- * the time the client makes their next request after this one.
+ * run synchronously. These two modes are defined via PRESEND and POSTSEND constants, the latter
+ * being the default for addUpdate() and addCallableUpdate().
  *
- * In CLI mode, updates are only deferred until the current wiki has no DB write transaction
- * active within this request.
+ * Updates that work through this system will be more likely to complete by the time the client
+ * makes their next request after this one than with the JobQueue system.
  *
- * When updates are deferred, they use a FIFO queue (one for pre-send and one for post-send).
+ * In CLI mode, updates run immediately if no DB writes are pending. Otherwise, they run when:
+ *   - a) Any waitForReplication() call if no writes are pending on any DB
+ *   - b) A commit happens on Maintenance::getDB( DB_MASTER ) if no writes are pending on any DB
+ *   - c) EnqueueableDataUpdate tasks may enqueue on commit of Maintenance::getDB( DB_MASTER )
+ *   - d) At the completion of Maintenance::execute()
+ *
+ * When updates are deferred, they go into one two FIFO "top-queues" (one for pre-send and one
+ * for post-send). Updates enqueued *during* doUpdate() of a "top" update go into the "sub-queue"
+ * for that update. After that method finishes, the sub-queue is run until drained. This continues
+ * for each top-queue job until the entire top queue is drained. This happens for the pre-send
+ * top-queue, and later on, the post-send top-queue, in execute().
  *
  * @since 1.19
  */
@@ -42,22 +53,43 @@ class DeferredUpdates {
        /** @var DeferrableUpdate[] Updates to be deferred until after request end */
        private static $postSendUpdates = [];
 
-       const ALL = 0; // all updates
+       const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
        const PRESEND = 1; // for updates that should run before flushing output buffer
        const POSTSEND = 2; // for updates that should run after flushing output buffer
 
+       const BIG_QUEUE_SIZE = 100;
+
+       /** @var array|null Information about the current execute() call or null if not running */
+       private static $executeContext;
+
        /**
-        * Add an update to the deferred list
+        * Add an update to the deferred list to be run later by execute()
+        *
+        * In CLI mode, callback magic will also be used to run updates when safe
         *
         * @param DeferrableUpdate $update Some object that implements doUpdate()
-        * @param integer $type DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27)
+        * @param integer $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27)
         */
-       public static function addUpdate( DeferrableUpdate $update, $type = self::POSTSEND ) {
-               if ( $type === self::PRESEND ) {
+       public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
+               global $wgCommandLineMode;
+
+               // This is a sub-DeferredUpdate, run it right after its parent update
+               if ( self::$executeContext && self::$executeContext['stage'] >= $stage ) {
+                       self::$executeContext['subqueue'][] = $update;
+                       return;
+               }
+
+               if ( $stage === self::PRESEND ) {
                        self::push( self::$preSendUpdates, $update );
                } else {
                        self::push( self::$postSendUpdates, $update );
                }
+
+               // Try to run the updates now if in CLI mode and no transaction is active.
+               // This covers scripts that don't/barely use the DB but make updates to other stores.
+               if ( $wgCommandLineMode ) {
+                       self::tryOpportunisticExecute( 'run' );
+               }
        }
 
        /**
@@ -67,34 +99,38 @@ class DeferredUpdates {
         * @see MWCallableUpdate::__construct()
         *
         * @param callable $callable
-        * @param integer $type DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27)
+        * @param integer $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27)
         * @param IDatabase|null $dbw Abort if this DB is rolled back [optional] (since 1.28)
         */
        public static function addCallableUpdate(
-               $callable, $type = self::POSTSEND, IDatabase $dbw = null
+               $callable, $stage = self::POSTSEND, IDatabase $dbw = null
        ) {
-               self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $type );
+               self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
        }
 
        /**
         * Do any deferred updates and clear the list
         *
         * @param string $mode Use "enqueue" to use the job queue when possible [Default: "run"]
-        * @param integer $type DeferredUpdates constant (PRESEND, POSTSEND, or ALL) (since 1.27)
+        * @param integer $stage DeferredUpdates constant (PRESEND, POSTSEND, or ALL) (since 1.27)
         */
-       public static function doUpdates( $mode = 'run', $type = self::ALL ) {
-               if ( $type === self::ALL || $type == self::PRESEND ) {
-                       self::execute( self::$preSendUpdates, $mode );
+       public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
+               $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
+
+               if ( $stage === self::ALL || $stage === self::PRESEND ) {
+                       self::execute( self::$preSendUpdates, $mode, $stageEffective );
                }
 
-               if ( $type === self::ALL || $type == self::POSTSEND ) {
-                       self::execute( self::$postSendUpdates, $mode );
+               if ( $stage === self::ALL || $stage == self::POSTSEND ) {
+                       self::execute( self::$postSendUpdates, $mode, $stageEffective );
                }
        }
 
+       /**
+        * @param DeferrableUpdate[] $queue
+        * @param DeferrableUpdate $update
+        */
        private static function push( array &$queue, DeferrableUpdate $update ) {
-               global $wgCommandLineMode;
-
                if ( $update instanceof MergeableUpdate ) {
                        $class = get_class( $update ); // fully-qualified class
                        if ( isset( $queue[$class] ) ) {
@@ -107,78 +143,179 @@ class DeferredUpdates {
                } else {
                        $queue[] = $update;
                }
-
-               // CLI scripts may forget to periodically flush these updates,
-               // so try to handle that rather than OOMing and losing them entirely.
-               // Try to run the updates as soon as there is no current wiki transaction.
-               static $waitingOnTrx = false; // de-duplicate callback
-               if ( $wgCommandLineMode && !$waitingOnTrx ) {
-                       $lb = wfGetLB();
-                       $dbw = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
-                       // Do the update as soon as there is no transaction
-                       if ( $dbw && $dbw->trxLevel() ) {
-                               $waitingOnTrx = true;
-                               $dbw->onTransactionIdle( function() use ( &$waitingOnTrx ) {
-                                       DeferredUpdates::doUpdates();
-                                       $waitingOnTrx = false;
-                               } );
-                       } else {
-                               self::doUpdates();
-                       }
-               }
        }
 
-       public static function execute( array &$queue, $mode ) {
-               $stats = \MediaWiki\MediaWikiServices::getInstance()->getStatsdDataFactory();
+       /**
+        * Immediately run/queue a list of updates
+        *
+        * @param DeferrableUpdate[] &$queue List of DeferrableUpdate objects
+        * @param string $mode Use "enqueue" to use the job queue when possible
+        * @param integer $stage Class constant (PRESEND, POSTSEND) (since 1.28)
+        * @throws ErrorPageError Happens on top-level calls
+        * @throws Exception Happens on second-level calls
+        */
+       protected static function execute( array &$queue, $mode, $stage ) {
+               $services = MediaWikiServices::getInstance();
+               $stats = $services->getStatsdDataFactory();
+               $lbFactory = $services->getDBLoadBalancerFactory();
                $method = RequestContext::getMain()->getRequest()->getMethod();
 
-               $updates = $queue; // snapshot of queue
-               // Keep doing rounds of updates until none get enqueued
-               while ( count( $updates ) ) {
+               $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
+
+               /** @var ErrorPageError $reportableError */
+               $reportableError = null;
+               /** @var DeferrableUpdate[] $updates Snapshot of queue */
+               $updates = $queue;
+
+               // Keep doing rounds of updates until none get enqueued...
+               while ( $updates ) {
                        $queue = []; // clear the queue
-                       /** @var DataUpdate[] $dataUpdates */
-                       $dataUpdates = [];
-                       /** @var DeferrableUpdate[] $otherUpdates */
-                       $otherUpdates = [];
-                       foreach ( $updates as $update ) {
-                               if ( $update instanceof DataUpdate ) {
-                                       $dataUpdates[] = $update;
+
+                       if ( $mode === 'enqueue' ) {
+                               try {
+                                       // Push enqueuable updates to the job queue and get the rest
+                                       $updates = self::enqueueUpdates( $updates );
+                               } catch ( Exception $e ) {
+                                       // Let other updates have a chance to run if this failed
+                                       MWExceptionHandler::rollbackMasterChangesAndLog( $e );
+                               }
+                       }
+
+                       // Order will be DataUpdate followed by generic DeferrableUpdate tasks
+                       $updatesByType = [ 'data' => [], 'generic' => [] ];
+                       foreach ( $updates as $du ) {
+                               if ( $du instanceof DataUpdate ) {
+                                       $du->setTransactionTicket( $ticket );
+                                       $updatesByType['data'][] = $du;
                                } else {
-                                       $otherUpdates[] = $update;
+                                       $updatesByType['generic'][] = $du;
                                }
 
-                               $name = $update instanceof DeferrableCallback
-                                       ? get_class( $update ) . '-' . $update->getOrigin()
-                                       : get_class( $update );
+                               $name = ( $du instanceof DeferrableCallback )
+                                       ? get_class( $du ) . '-' . $du->getOrigin()
+                                       : get_class( $du );
                                $stats->increment( 'deferred_updates.' . $method . '.' . $name );
                        }
 
-                       // Delegate DataUpdate execution to the DataUpdate class
-                       try {
-                               DataUpdate::runUpdates( $dataUpdates, $mode );
-                       } catch ( Exception $e ) {
-                               // Let the other updates occur if these had to rollback
-                               MWExceptionHandler::logException( $e );
-                       }
-                       // Execute the non-DataUpdate tasks
-                       foreach ( $otherUpdates as $update ) {
-                               try {
-                                       $update->doUpdate();
-                                       wfGetLBFactory()->commitMasterChanges( __METHOD__ );
-                               } catch ( Exception $e ) {
-                                       // We don't want exceptions thrown during deferred updates to
-                                       // be reported to the user since the output is already sent
-                                       if ( !$e instanceof ErrorPageError ) {
-                                               MWExceptionHandler::logException( $e );
+                       // Execute all remaining tasks...
+                       foreach ( $updatesByType as $updatesForType ) {
+                               foreach ( $updatesForType as $update ) {
+                                       self::$executeContext = [
+                                               'update' => $update,
+                                               'stage' => $stage,
+                                               'subqueue' => []
+                                       ];
+                                       /** @var DeferrableUpdate $update */
+                                       $guiError = self::runUpdate( $update, $lbFactory, $stage );
+                                       $reportableError = $reportableError ?: $guiError;
+                                       // Do the subqueue updates for $update until there are none
+                                       while ( self::$executeContext['subqueue'] ) {
+                                               $subUpdate = reset( self::$executeContext['subqueue'] );
+                                               $firstKey = key( self::$executeContext['subqueue'] );
+                                               unset( self::$executeContext['subqueue'][$firstKey] );
+
+                                               if ( $subUpdate instanceof DataUpdate ) {
+                                                       $subUpdate->setTransactionTicket( $ticket );
+                                               }
+
+                                               $guiError = self::runUpdate( $subUpdate, $lbFactory, $stage );
+                                               $reportableError = $reportableError ?: $guiError;
                                        }
-                                       // Make sure incomplete transactions are not committed and end any
-                                       // open atomic sections so that other DB updates have a chance to run
-                                       wfGetLBFactory()->rollbackMasterChanges( __METHOD__ );
+                                       self::$executeContext = null;
                                }
                        }
 
                        $updates = $queue; // new snapshot of queue (check for new entries)
                }
+
+               if ( $reportableError ) {
+                       throw $reportableError; // throw the first of any GUI errors
+               }
+       }
+
+       /**
+        * @param DeferrableUpdate $update
+        * @param LBFactory $lbFactory
+        * @param integer $stage
+        * @return ErrorPageError|null
+        */
+       private static function runUpdate( DeferrableUpdate $update, LBFactory $lbFactory, $stage ) {
+               $guiError = null;
+               try {
+                       $fnameTrxOwner = get_class( $update ) . '::doUpdate';
+                       $lbFactory->beginMasterChanges( $fnameTrxOwner );
+                       $update->doUpdate();
+                       $lbFactory->commitMasterChanges( $fnameTrxOwner );
+               } catch ( Exception $e ) {
+                       // Reporting GUI exceptions does not work post-send
+                       if ( $e instanceof ErrorPageError && $stage === self::PRESEND ) {
+                               $guiError = $e;
+                       }
+                       MWExceptionHandler::rollbackMasterChangesAndLog( $e );
+               }
+
+               return $guiError;
+       }
+
+       /**
+        * Run all deferred updates immediately if there are no DB writes active
+        *
+        * If $mode is 'run' but there are busy databates, EnqueueableDataUpdate
+        * tasks will be enqueued anyway for the sake of progress.
+        *
+        * @param string $mode Use "enqueue" to use the job queue when possible
+        * @return bool Whether updates were allowed to run
+        * @since 1.28
+        */
+       public static function tryOpportunisticExecute( $mode = 'run' ) {
+               // execute() loop is already running
+               if ( self::$executeContext ) {
+                       return false;
+               }
+
+               // Avoiding running updates without them having outer scope
+               if ( !self::getBusyDbConnections() ) {
+                       self::doUpdates( $mode );
+                       return true;
+               }
+
+               if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
+                       // If we cannot run the updates with outer transaction context, try to
+                       // at least enqueue all the updates that support queueing to job queue
+                       self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
+                       self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
+               }
+
+               return !self::pendingUpdatesCount();
+       }
+
+       /**
+        * Enqueue a job for each EnqueueableDataUpdate item and return the other items
+        *
+        * @param DeferrableUpdate[] $updates A list of deferred update instances
+        * @return DeferrableUpdate[] Remaining updates that do not support being queued
+        */
+       private static function enqueueUpdates( array $updates ) {
+               $remaining = [];
+
+               foreach ( $updates as $update ) {
+                       if ( $update instanceof EnqueueableDataUpdate ) {
+                               $spec = $update->getAsJobSpecification();
+                               JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] );
+                       } else {
+                               $remaining[] = $update;
+                       }
+               }
+
+               return $remaining;
+       }
+
+       /**
+        * @return integer Number of enqueued updates
+        * @since 1.28
+        */
+       public static function pendingUpdatesCount() {
+               return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
        }
 
        /**
@@ -189,4 +326,22 @@ class DeferredUpdates {
                self::$preSendUpdates = [];
                self::$postSendUpdates = [];
        }
+
+       /**
+        * @return IDatabase[] Connection where commit() cannot be called yet
+        */
+       private static function getBusyDbConnections() {
+               $connsBusy = [];
+
+               $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+               $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
+                       $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
+                               if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
+                                       $connsBusy[] = $conn;
+                               }
+                       } );
+               } );
+
+               return $connsBusy;
+       }
 }