*
* @file
*/
+use Wikimedia\Rdbms\IDatabase;
use MediaWiki\MediaWikiServices;
+use Wikimedia\Rdbms\LBFactory;
+use Wikimedia\Rdbms\LoadBalancer;
/**
* Class for managing the deferred updates
private static $preSendUpdates = [];
/** @var DeferrableUpdate[] Updates to be deferred until after request end */
private static $postSendUpdates = [];
- /** @var bool Whether to just run updates in addUpdate() */
- private static $immediateMode = false;
const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
const PRESEND = 1; // for updates that should run before flushing output buffer
public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
global $wgCommandLineMode;
- // This is a sub-DeferredUpdate, run it right after its parent update
if ( self::$executeContext && self::$executeContext['stage'] >= $stage ) {
+ // This is a sub-DeferredUpdate; run it right after its parent update.
+ // Also, while post-send updates are running, push any "pre-send" jobs to the
+ // active post-send queue to make sure they get run this round (or at all).
self::$executeContext['subqueue'][] = $update;
+
return;
}
self::push( self::$postSendUpdates, $update );
}
- if ( self::$immediateMode ) {
- // No more explicit doUpdates() calls will happen, so run this now
- self::doUpdates( 'run' );
- return;
- }
-
// Try to run the updates now if in CLI mode and no transaction is active.
// This covers scripts that don't/barely use the DB but make updates to other stores.
if ( $wgCommandLineMode ) {
/**
* @param bool $value Whether to just immediately run updates in addUpdate()
* @since 1.28
+ * @deprecated 1.29 Causes issues in Web-executed jobs - see T165714 and T100085.
*/
public static function setImmediateMode( $value ) {
- self::$immediateMode = (bool)$value;
+ wfDeprecated( __METHOD__, '1.29' );
}
/**
while ( $updates ) {
$queue = []; // clear the queue
- if ( $mode === 'enqueue' ) {
- try {
- // Push enqueuable updates to the job queue and get the rest
- $updates = self::enqueueUpdates( $updates );
- } catch ( Exception $e ) {
- // Let other updates have a chance to run if this failed
- MWExceptionHandler::rollbackMasterChangesAndLog( $e );
- }
- }
-
// Order will be DataUpdate followed by generic DeferrableUpdate tasks
$updatesByType = [ 'data' => [], 'generic' => [] ];
foreach ( $updates as $du ) {
// Execute all remaining tasks...
foreach ( $updatesByType as $updatesForType ) {
foreach ( $updatesForType as $update ) {
- self::$executeContext = [
- 'update' => $update,
- 'stage' => $stage,
- 'subqueue' => []
- ];
+ self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
/** @var DeferrableUpdate $update */
- $guiError = self::runUpdate( $update, $lbFactory, $stage );
+ $guiError = self::runUpdate( $update, $lbFactory, $mode, $stage );
$reportableError = $reportableError ?: $guiError;
// Do the subqueue updates for $update until there are none
while ( self::$executeContext['subqueue'] ) {
$subUpdate->setTransactionTicket( $ticket );
}
- $guiError = self::runUpdate( $subUpdate, $lbFactory, $stage );
+ $guiError = self::runUpdate( $subUpdate, $lbFactory, $mode, $stage );
$reportableError = $reportableError ?: $guiError;
}
self::$executeContext = null;
/**
* @param DeferrableUpdate $update
* @param LBFactory $lbFactory
+ * @param string $mode
* @param integer $stage
* @return ErrorPageError|null
*/
- private static function runUpdate( DeferrableUpdate $update, LBFactory $lbFactory, $stage ) {
+ private static function runUpdate(
+ DeferrableUpdate $update, LBFactory $lbFactory, $mode, $stage
+ ) {
$guiError = null;
try {
- $fnameTrxOwner = get_class( $update ) . '::doUpdate';
- $lbFactory->beginMasterChanges( $fnameTrxOwner );
- $update->doUpdate();
- $lbFactory->commitMasterChanges( $fnameTrxOwner );
+ if ( $mode === 'enqueue' && $update instanceof EnqueueableDataUpdate ) {
+ // Run only the job enqueue logic to complete the update later
+ $spec = $update->getAsJobSpecification();
+ JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] );
+ } else {
+ // Run the bulk of the update now
+ $fnameTrxOwner = get_class( $update ) . '::doUpdate';
+ $lbFactory->beginMasterChanges( $fnameTrxOwner );
+ $update->doUpdate();
+ $lbFactory->commitMasterChanges( $fnameTrxOwner );
+ }
} catch ( Exception $e ) {
// Reporting GUI exceptions does not work post-send
if ( $e instanceof ErrorPageError && $stage === self::PRESEND ) {
}
// Avoiding running updates without them having outer scope
- if ( !self::getBusyDbConnections() ) {
+ if ( !self::areDatabaseTransactionsActive() ) {
self::doUpdates( $mode );
return true;
}
return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
}
+ /**
+ * @param integer $stage DeferredUpdates constant (PRESEND, POSTSEND, or ALL)
+ * @return DeferrableUpdate[]
+ * @since 1.29
+ */
+ public static function getPendingUpdates( $stage = self::ALL ) {
+ $updates = [];
+ if ( $stage === self::ALL || $stage === self::PRESEND ) {
+ $updates = array_merge( $updates, self::$preSendUpdates );
+ }
+ if ( $stage === self::ALL || $stage === self::POSTSEND ) {
+ $updates = array_merge( $updates, self::$postSendUpdates );
+ }
+ return $updates;
+ }
+
/**
* Clear all pending updates without performing them. Generally, you don't
* want or need to call this. Unit tests need it though.
}
/**
- * @return IDatabase[] Connection where commit() cannot be called yet
+ * @return bool If a transaction round is active or connection is not ready for commit()
*/
- private static function getBusyDbConnections() {
- $connsBusy = [];
-
+ private static function areDatabaseTransactionsActive() {
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ if ( $lbFactory->hasTransactionRound() ) {
+ return true;
+ }
+
+ $connsBusy = false;
$lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
$lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
- $connsBusy[] = $conn;
+ $connsBusy = true;
}
} );
} );