merge( $update ); } else { $queue[$class] = $update; } } else { $queue[] = $update; } // CLI scripts may forget to periodically flush these updates, // so try to handle that rather than OOMing and losing them entirely. // Try to run the updates as soon as there is no current wiki transaction. static $waitingOnTrx = false; // de-duplicate callback if ( $wgCommandLineMode && !$waitingOnTrx ) { $lb = wfGetLB(); $dbw = $lb->getAnyOpenConnection( $lb->getWriterIndex() ); // Do the update as soon as there is no transaction if ( $dbw && $dbw->trxLevel() ) { $waitingOnTrx = true; $dbw->onTransactionIdle( function() use ( &$waitingOnTrx ) { DeferredUpdates::doUpdates(); $waitingOnTrx = false; } ); } else { self::doUpdates(); } } } public static function execute( array &$queue, $mode ) { $stats = \MediaWiki\MediaWikiServices::getInstance()->getStatsdDataFactory(); $method = RequestContext::getMain()->getRequest()->getMethod(); $updates = $queue; // snapshot of queue // Keep doing rounds of updates until none get enqueued while ( count( $updates ) ) { $queue = []; // clear the queue /** @var DataUpdate[] $dataUpdates */ $dataUpdates = []; /** @var DeferrableUpdate[] $otherUpdates */ $otherUpdates = []; foreach ( $updates as $update ) { if ( $update instanceof DataUpdate ) { $dataUpdates[] = $update; } else { $otherUpdates[] = $update; } $name = $update instanceof DeferrableCallback ? get_class( $update ) . '-' . $update->getOrigin() : get_class( $update ); $stats->increment( 'deferred_updates.' . $method . '.' . $name ); } // Delegate DataUpdate execution to the DataUpdate class try { DataUpdate::runUpdates( $dataUpdates, $mode ); } catch ( Exception $e ) { // Let the other updates occur if these had to rollback MWExceptionHandler::logException( $e ); } // Execute the non-DataUpdate tasks foreach ( $otherUpdates as $update ) { try { $update->doUpdate(); wfGetLBFactory()->commitMasterChanges( __METHOD__ ); } catch ( Exception $e ) { // We don't want exceptions thrown during deferred updates to // be reported to the user since the output is already sent if ( !$e instanceof ErrorPageError ) { MWExceptionHandler::logException( $e ); } // Make sure incomplete transactions are not committed and end any // open atomic sections so that other DB updates have a chance to run wfGetLBFactory()->rollbackMasterChanges( __METHOD__ ); } } $updates = $queue; // new snapshot of queue (check for new entries) } } /** * Run all deferred updates immediately if there are no DB writes active * * If $mode is 'run' but there are busy databates, EnqueueableDataUpdate * tasks will be enqueued anyway for the sake of progress. * * @param string $mode Use "enqueue" to use the job queue when possible * @return bool Whether updates were allowed to run * @since 1.28 */ public static function tryOpportunisticExecute( $mode = 'run' ) { if ( !self::getBusyDbConnections() ) { self::doUpdates( $mode ); return true; } if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) { // If we cannot run the updates with outer transaction context, try to // at least enqueue all the updates that support queueing to job queue self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates ); self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates ); } return !self::pendingUpdatesCount(); } /** * Enqueue a job for each EnqueueableDataUpdate item and return the other items * * @param DeferrableUpdate[] $updates A list of deferred update instances * @return DeferrableUpdate[] Remaining updates that do not support being queued */ private static function enqueueUpdates( array $updates ) { $remaining = []; foreach ( $updates as $update ) { if ( $update instanceof EnqueueableDataUpdate ) { $spec = $update->getAsJobSpecification(); JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] ); } else { $remaining[] = $update; } } return $remaining; } /** * @return integer Number of enqueued updates * @since 1.28 */ public static function pendingUpdatesCount() { return count( self::$preSendUpdates ) + count( self::$postSendUpdates ); } /** * Clear all pending updates without performing them. Generally, you don't * want or need to call this. Unit tests need it though. */ public static function clearPendingUpdates() { self::$preSendUpdates = []; self::$postSendUpdates = []; } /** * Set the rollback/commit watcher on a DB to trigger update runs when safe * * @TODO: use this to replace DB logic in push() * @param LoadBalancer $lb * @since 1.28 */ public static function installDBListener( LoadBalancer $lb ) { static $triggers = [ IDatabase::TRIGGER_COMMIT, IDatabase::TRIGGER_ROLLBACK ]; // Hook into active master connections to find a moment where no writes are pending $lb->setTransactionListener( __METHOD__, function ( $trigger, IDatabase $conn ) use ( $triggers ) { global $wgCommandLineMode; if ( $wgCommandLineMode && in_array( $trigger, $triggers ) ) { DeferredUpdates::tryOpportunisticExecute(); } } ); } /** * @return IDatabase[] Connection where commit() cannot be called yet */ private static function getBusyDbConnections() { $connsBusy = []; $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) { $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) { if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) { $connsBusy[] = $conn; } } ); } ); return $connsBusy; } }