X-Git-Url: https://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fdeferred%2FDeferredUpdates.php;h=6921b668baa8a624dae3a9ac3653a21ea7e87b4f;hb=dea4d5ad6e556365b9a16cc1ae5b633a1e646292;hp=e3b75704aa0277e663dcd662fc7f5d2ab9de0566;hpb=90f599a5a2b46794a3002ee2f2563fc456a2fddb;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/deferred/DeferredUpdates.php b/includes/deferred/DeferredUpdates.php index e3b75704aa..6921b668ba 100644 --- a/includes/deferred/DeferredUpdates.php +++ b/includes/deferred/DeferredUpdates.php @@ -19,6 +19,7 @@ * * @file */ +use MediaWiki\MediaWikiServices; /** * Class for managing the deferred updates @@ -26,13 +27,23 @@ * In web request mode, deferred updates can be run at the end of the request, either before or * after the HTTP response has been sent. In either case, they run after the DB commit step. If * an update runs after the response is sent, it will not block clients. If sent before, it will - * run synchronously. If such an update works via queueing, it will be more likely to complete by - * the time the client makes their next request after this one. + * run synchronously. These two modes are defined via PRESEND and POSTSEND constants, the latter + * being the default for addUpdate() and addCallableUpdate(). * - * In CLI mode, updates are only deferred until the current wiki has no DB write transaction - * active within this request. + * Updates that work through this system will be more likely to complete by the time the client + * makes their next request after this one than with the JobQueue system. * - * When updates are deferred, they use a FIFO queue (one for pre-send and one for post-send). + * In CLI mode, updates run immediately if no DB writes are pending. Otherwise, they run when: + * - a) Any waitForReplication() call if no writes are pending on any DB + * - b) A commit happens on Maintenance::getDB( DB_MASTER ) if no writes are pending on any DB + * - c) EnqueueableDataUpdate tasks may enqueue on commit of Maintenance::getDB( DB_MASTER ) + * - d) At the completion of Maintenance::execute() + * + * When updates are deferred, they go into one two FIFO "top-queues" (one for pre-send and one + * for post-send). Updates enqueued *during* doUpdate() of a "top" update go into the "sub-queue" + * for that update. After that method finishes, the sub-queue is run until drained. This continues + * for each top-queue job until the entire top queue is drained. This happens for the pre-send + * top-queue, and later on, the post-send top-queue, in execute(). * * @since 1.19 */ @@ -42,56 +53,84 @@ class DeferredUpdates { /** @var DeferrableUpdate[] Updates to be deferred until after request end */ private static $postSendUpdates = []; - const ALL = 0; // all updates + const ALL = 0; // all updates; in web requests, use only after flushing the output buffer const PRESEND = 1; // for updates that should run before flushing output buffer const POSTSEND = 2; // for updates that should run after flushing output buffer + const BIG_QUEUE_SIZE = 100; + + /** @var array|null Information about the current execute() call or null if not running */ + private static $executeContext; + /** - * Add an update to the deferred list + * Add an update to the deferred list to be run later by execute() + * + * In CLI mode, callback magic will also be used to run updates when safe * * @param DeferrableUpdate $update Some object that implements doUpdate() - * @param integer $type DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) + * @param integer $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) */ - public static function addUpdate( DeferrableUpdate $update, $type = self::POSTSEND ) { - if ( $type === self::PRESEND ) { + public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) { + global $wgCommandLineMode; + + // This is a sub-DeferredUpdate, run it right after its parent update + if ( self::$executeContext && self::$executeContext['stage'] >= $stage ) { + self::$executeContext['subqueue'][] = $update; + return; + } + + if ( $stage === self::PRESEND ) { self::push( self::$preSendUpdates, $update ); } else { self::push( self::$postSendUpdates, $update ); } + + // Try to run the updates now if in CLI mode and no transaction is active. + // This covers scripts that don't/barely use the DB but make updates to other stores. + if ( $wgCommandLineMode ) { + self::tryOpportunisticExecute( 'run' ); + } } /** - * Add a callable update. In a lot of cases, we just need a callback/closure, + * Add a callable update. In a lot of cases, we just need a callback/closure, * defining a new DeferrableUpdate object is not necessary * * @see MWCallableUpdate::__construct() * * @param callable $callable - * @param integer $type DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) + * @param integer $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) + * @param IDatabase|null $dbw Abort if this DB is rolled back [optional] (since 1.28) */ - public static function addCallableUpdate( $callable, $type = self::POSTSEND ) { - self::addUpdate( new MWCallableUpdate( $callable ), $type ); + public static function addCallableUpdate( + $callable, $stage = self::POSTSEND, IDatabase $dbw = null + ) { + self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage ); } /** * Do any deferred updates and clear the list * * @param string $mode Use "enqueue" to use the job queue when possible [Default: "run"] - * @param integer $type DeferredUpdates constant (PRESEND, POSTSEND, or ALL) (since 1.27) + * @param integer $stage DeferredUpdates constant (PRESEND, POSTSEND, or ALL) (since 1.27) */ - public static function doUpdates( $mode = 'run', $type = self::ALL ) { - if ( $type === self::ALL || $type == self::PRESEND ) { - self::execute( self::$preSendUpdates, $mode ); + public static function doUpdates( $mode = 'run', $stage = self::ALL ) { + $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage; + + if ( $stage === self::ALL || $stage === self::PRESEND ) { + self::execute( self::$preSendUpdates, $mode, $stageEffective ); } - if ( $type === self::ALL || $type == self::POSTSEND ) { - self::execute( self::$postSendUpdates, $mode ); + if ( $stage === self::ALL || $stage == self::POSTSEND ) { + self::execute( self::$postSendUpdates, $mode, $stageEffective ); } } + /** + * @param DeferrableUpdate[] $queue + * @param DeferrableUpdate $update + */ private static function push( array &$queue, DeferrableUpdate $update ) { - global $wgCommandLineMode; - if ( $update instanceof MergeableUpdate ) { $class = get_class( $update ); // fully-qualified class if ( isset( $queue[$class] ) ) { @@ -104,71 +143,167 @@ class DeferredUpdates { } else { $queue[] = $update; } - - // CLI scripts may forget to periodically flush these updates, - // so try to handle that rather than OOMing and losing them entirely. - // Try to run the updates as soon as there is no current wiki transaction. - static $waitingOnTrx = false; // de-duplicate callback - if ( $wgCommandLineMode && !$waitingOnTrx ) { - $lb = wfGetLB(); - $dbw = $lb->getAnyOpenConnection( $lb->getWriterIndex() ); - // Do the update as soon as there is no transaction - if ( $dbw && $dbw->trxLevel() ) { - $waitingOnTrx = true; - $dbw->onTransactionIdle( function() use ( &$waitingOnTrx ) { - DeferredUpdates::doUpdates(); - $waitingOnTrx = false; - } ); - } else { - self::doUpdates(); - } - } } - public static function execute( array &$queue, $mode ) { - $updates = $queue; // snapshot of queue + /** + * Immediately run/queue a list of updates + * + * @param DeferrableUpdate[] &$queue List of DeferrableUpdate objects + * @param string $mode Use "enqueue" to use the job queue when possible + * @param integer $stage Class constant (PRESEND, POSTSEND) (since 1.28) + * @throws ErrorPageError Happens on top-level calls + * @throws Exception Happens on second-level calls + */ + protected static function execute( array &$queue, $mode, $stage ) { + $services = MediaWikiServices::getInstance(); + $stats = $services->getStatsdDataFactory(); + $lbFactory = $services->getDBLoadBalancerFactory(); + $method = RequestContext::getMain()->getRequest()->getMethod(); + + /** @var ErrorPageError $reportableError */ + $reportableError = null; + /** @var DeferrableUpdate[] $updates Snapshot of queue */ + $updates = $queue; - // Keep doing rounds of updates until none get enqueued - while ( count( $updates ) ) { + // Keep doing rounds of updates until none get enqueued... + while ( $updates ) { $queue = []; // clear the queue - /** @var DataUpdate[] $dataUpdates */ - $dataUpdates = []; - /** @var DeferrableUpdate[] $otherUpdates */ - $otherUpdates = []; - foreach ( $updates as $update ) { - if ( $update instanceof DataUpdate ) { - $dataUpdates[] = $update; - } else { - $otherUpdates[] = $update; + + if ( $mode === 'enqueue' ) { + try { + // Push enqueuable updates to the job queue and get the rest + $updates = self::enqueueUpdates( $updates ); + } catch ( Exception $e ) { + // Let other updates have a chance to run if this failed + MWExceptionHandler::rollbackMasterChangesAndLog( $e ); } } - // Delegate DataUpdate execution to the DataUpdate class - try { - DataUpdate::runUpdates( $dataUpdates, $mode ); - } catch ( Exception $e ) { - // Let the other updates occur if these had to rollback - MWExceptionHandler::logException( $e ); + // Order will be DataUpdate followed by generic DeferrableUpdate tasks + $updatesByType = [ 'data' => [], 'generic' => [] ]; + foreach ( $updates as $du ) { + $updatesByType[$du instanceof DataUpdate ? 'data' : 'generic'][] = $du; + $name = ( $du instanceof DeferrableCallback ) + ? get_class( $du ) . '-' . $du->getOrigin() + : get_class( $du ); + $stats->increment( 'deferred_updates.' . $method . '.' . $name ); } - // Execute the non-DataUpdate tasks - foreach ( $otherUpdates as $update ) { - try { - $update->doUpdate(); - wfGetLBFactory()->commitMasterChanges( __METHOD__ ); - } catch ( Exception $e ) { - // We don't want exceptions thrown during deferred updates to - // be reported to the user since the output is already sent - if ( !$e instanceof ErrorPageError ) { - MWExceptionHandler::logException( $e ); + + // Execute all remaining tasks... + foreach ( $updatesByType as $updatesForType ) { + foreach ( $updatesForType as $update ) { + self::$executeContext = [ + 'update' => $update, + 'stage' => $stage, + 'subqueue' => [] + ]; + /** @var DeferrableUpdate $update */ + $guiError = self::runUpdate( $update, $lbFactory, $stage ); + $reportableError = $reportableError ?: $guiError; + // Do the subqueue updates for $update until there are none + while ( self::$executeContext['subqueue'] ) { + $subUpdate = reset( self::$executeContext['subqueue'] ); + $firstKey = key( self::$executeContext['subqueue'] ); + unset( self::$executeContext['subqueue'][$firstKey] ); + + $guiError = self::runUpdate( $subUpdate, $lbFactory, $stage ); + $reportableError = $reportableError ?: $guiError; } - // Make sure incomplete transactions are not committed and end any - // open atomic sections so that other DB updates have a chance to run - wfGetLBFactory()->rollbackMasterChanges( __METHOD__ ); + self::$executeContext = null; } } $updates = $queue; // new snapshot of queue (check for new entries) } + + if ( $reportableError ) { + throw $reportableError; // throw the first of any GUI errors + } + } + + /** + * @param DeferrableUpdate $update + * @param LBFactory $lbFactory + * @param integer $stage + * @return ErrorPageError|null + */ + private static function runUpdate( DeferrableUpdate $update, LBFactory $lbFactory, $stage ) { + $guiError = null; + try { + $fnameTrxOwner = get_class( $update ) . '::doUpdate'; + $lbFactory->beginMasterChanges( $fnameTrxOwner ); + $update->doUpdate(); + $lbFactory->commitMasterChanges( $fnameTrxOwner ); + } catch ( Exception $e ) { + // Reporting GUI exceptions does not work post-send + if ( $e instanceof ErrorPageError && $stage === self::PRESEND ) { + $guiError = $e; + } + MWExceptionHandler::rollbackMasterChangesAndLog( $e ); + } + + return $guiError; + } + + /** + * Run all deferred updates immediately if there are no DB writes active + * + * If $mode is 'run' but there are busy databates, EnqueueableDataUpdate + * tasks will be enqueued anyway for the sake of progress. + * + * @param string $mode Use "enqueue" to use the job queue when possible + * @return bool Whether updates were allowed to run + * @since 1.28 + */ + public static function tryOpportunisticExecute( $mode = 'run' ) { + // execute() loop is already running + if ( self::$executeContext ) { + return false; + } + + // Avoiding running updates without them having outer scope + if ( !self::getBusyDbConnections() ) { + self::doUpdates( $mode ); + return true; + } + + if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) { + // If we cannot run the updates with outer transaction context, try to + // at least enqueue all the updates that support queueing to job queue + self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates ); + self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates ); + } + + return !self::pendingUpdatesCount(); + } + + /** + * Enqueue a job for each EnqueueableDataUpdate item and return the other items + * + * @param DeferrableUpdate[] $updates A list of deferred update instances + * @return DeferrableUpdate[] Remaining updates that do not support being queued + */ + private static function enqueueUpdates( array $updates ) { + $remaining = []; + + foreach ( $updates as $update ) { + if ( $update instanceof EnqueueableDataUpdate ) { + $spec = $update->getAsJobSpecification(); + JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] ); + } else { + $remaining[] = $update; + } + } + + return $remaining; + } + + /** + * @return integer Number of enqueued updates + * @since 1.28 + */ + public static function pendingUpdatesCount() { + return count( self::$preSendUpdates ) + count( self::$postSendUpdates ); } /** @@ -179,4 +314,22 @@ class DeferredUpdates { self::$preSendUpdates = []; self::$postSendUpdates = []; } + + /** + * @return IDatabase[] Connection where commit() cannot be called yet + */ + private static function getBusyDbConnections() { + $connsBusy = []; + + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) { + $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) { + if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) { + $connsBusy[] = $conn; + } + } ); + } ); + + return $connsBusy; + } }