*
* @file
*/
+
+use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
+use MediaWiki\Logger\LoggerFactory;
+use Psr\Log\LoggerInterface;
use Wikimedia\Rdbms\IDatabase;
use MediaWiki\MediaWikiServices;
use Wikimedia\Rdbms\LBFactory;
+use Wikimedia\Rdbms\ILBFactory;
use Wikimedia\Rdbms\LoadBalancer;
/**
// Normally, these use the subqueue, but that isn't true for MergeableUpdate items.
do {
if ( $stage === self::ALL || $stage === self::PRESEND ) {
- self::execute( self::$preSendUpdates, $mode, $stageEffective );
+ self::handleUpdateQueue( self::$preSendUpdates, $mode, $stageEffective );
}
if ( $stage === self::ALL || $stage == self::POSTSEND ) {
- self::execute( self::$postSendUpdates, $mode, $stageEffective );
+ self::handleUpdateQueue( self::$postSendUpdates, $mode, $stageEffective );
}
} while ( $stage === self::ALL && self::$preSendUpdates );
}
}
/**
- * Immediately run/queue a list of updates
+ * Immediately run or enqueue a list of updates
*
* @param DeferrableUpdate[] &$queue List of DeferrableUpdate objects
- * @param string $mode Use "enqueue" to use the job queue when possible
+ * @param string $mode Either "run" or "enqueue" (to use the job queue when possible)
* @param int $stage Class constant (PRESEND, POSTSEND) (since 1.28)
* @throws ErrorPageError Happens on top-level calls
* @throws Exception Happens on second-level calls
*/
- protected static function execute( array &$queue, $mode, $stage ) {
+ protected static function handleUpdateQueue( array &$queue, $mode, $stage ) {
$services = MediaWikiServices::getInstance();
$stats = $services->getStatsdDataFactory();
- $lbFactory = $services->getDBLoadBalancerFactory();
- $method = RequestContext::getMain()->getRequest()->getMethod();
-
- $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
-
- /** @var ErrorPageError $reportableError */
- $reportableError = null;
+ $lbf = $services->getDBLoadBalancerFactory();
+ $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
+ $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
+ ? 'cli'
+ : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
+
+ /** @var ErrorPageError $guiEx */
+ $guiEx = null;
/** @var DeferrableUpdate[] $updates Snapshot of queue */
$updates = $queue;
while ( $updates ) {
$queue = []; // clear the queue
- // Order will be DataUpdate followed by generic DeferrableUpdate tasks
- $updatesByType = [ 'data' => [], 'generic' => [] ];
- foreach ( $updates as $du ) {
- if ( $du instanceof DataUpdate ) {
- $du->setTransactionTicket( $ticket );
- $updatesByType['data'][] = $du;
+ // Segregate the queue into one for DataUpdate and one for everything else
+ $dataUpdateQueue = [];
+ $genericUpdateQueue = [];
+ foreach ( $updates as $update ) {
+ if ( $update instanceof DataUpdate ) {
+ $dataUpdateQueue[] = $update;
} else {
- $updatesByType['generic'][] = $du;
+ $genericUpdateQueue[] = $update;
}
-
- $name = ( $du instanceof DeferrableCallback )
- ? get_class( $du ) . '-' . $du->getOrigin()
- : get_class( $du );
- $stats->increment( 'deferred_updates.' . $method . '.' . $name );
}
-
- // Execute all remaining tasks...
- foreach ( $updatesByType as $updatesForType ) {
- foreach ( $updatesForType as $update ) {
+ // Execute all DataUpdate queue followed by the DeferrableUpdate queue...
+ foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) {
+ foreach ( $updateQueue as $du ) {
+ // Enqueue the task into the job queue system instead if applicable
+ if ( $mode === 'enqueue' && $du instanceof EnqueueableDataUpdate ) {
+ self::jobify( $du, $lbf, $logger, $stats, $httpMethod );
+ continue;
+ }
+ // Otherwise, execute the task and any subtasks that it spawns
self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
try {
- /** @var DeferrableUpdate $update */
- $guiError = self::runUpdate( $update, $lbFactory, $mode, $stage );
- $reportableError = $reportableError ?: $guiError;
+ $e = self::run( $du, $lbf, $logger, $stats, $httpMethod );
+ $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
// Do the subqueue updates for $update until there are none
while ( self::$executeContext['subqueue'] ) {
- $subUpdate = reset( self::$executeContext['subqueue'] );
+ $duChild = reset( self::$executeContext['subqueue'] );
$firstKey = key( self::$executeContext['subqueue'] );
unset( self::$executeContext['subqueue'][$firstKey] );
- if ( $subUpdate instanceof DataUpdate ) {
- $subUpdate->setTransactionTicket( $ticket );
- }
-
- $guiError = self::runUpdate( $subUpdate, $lbFactory, $mode, $stage );
- $reportableError = $reportableError ?: $guiError;
+ $e = self::run( $duChild, $lbf, $logger, $stats, $httpMethod );
+ $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
}
} finally {
// Make sure we always clean up the context.
$updates = $queue; // new snapshot of queue (check for new entries)
}
- if ( $reportableError ) {
- throw $reportableError; // throw the first of any GUI errors
+ // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
+ // callers should check permissions *before* enqueueing updates. If the main transaction
+ // round actions succeed but some deferred updates fail due to permissions errors then
+ // there is a risk that some secondary data was not properly updated.
+ if ( $guiEx && $stage === self::PRESEND && !headers_sent() ) {
+ throw $guiEx;
}
}
/**
+ * Run a task and catch/log any exceptions
+ *
* @param DeferrableUpdate $update
* @param LBFactory $lbFactory
- * @param string $mode
- * @param int $stage
- * @return ErrorPageError|null
+ * @param LoggerInterface $logger
+ * @param StatsdDataFactoryInterface $stats
+ * @param string $httpMethod
+ * @return Exception|Throwable|null
*/
- private static function runUpdate(
- DeferrableUpdate $update, LBFactory $lbFactory, $mode, $stage
+ private static function run(
+ DeferrableUpdate $update,
+ LBFactory $lbFactory,
+ LoggerInterface $logger,
+ StatsdDataFactoryInterface $stats,
+ $httpMethod
) {
- $guiError = null;
+ $name = get_class( $update );
+ $suffix = ( $update instanceof DeferrableCallback ) ? "_{$update->getOrigin()}" : '';
+ $stats->increment( "deferred_updates.$httpMethod.{$name}{$suffix}" );
+
+ $e = null;
try {
- if ( $mode === 'enqueue' && $update instanceof EnqueueableDataUpdate ) {
- // Run only the job enqueue logic to complete the update later
- $spec = $update->getAsJobSpecification();
- $domain = $spec['domain'] ?? $spec['wiki'];
- JobQueueGroup::singleton( $domain )->push( $spec['job'] );
- } elseif ( $update instanceof TransactionRoundDefiningUpdate ) {
- $update->doUpdate();
- } else {
- // Run the bulk of the update now
- $fnameTrxOwner = get_class( $update ) . '::doUpdate';
- $lbFactory->beginMasterChanges( $fnameTrxOwner );
- $update->doUpdate();
- $lbFactory->commitMasterChanges( $fnameTrxOwner );
- }
+ self::attemptUpdate( $update, $lbFactory );
} catch ( Exception $e ) {
- // Reporting GUI exceptions does not work post-send
- if ( $e instanceof ErrorPageError && $stage === self::PRESEND ) {
- $guiError = $e;
- }
- MWExceptionHandler::rollbackMasterChangesAndLog( $e );
+ } catch ( Throwable $e ) {
+ }
+ if ( $e ) {
+ $logger->error(
+ "Deferred update {type} failed: {message}",
+ [
+ 'type' => $name . $suffix,
+ 'message' => $e->getMessage(),
+ 'trace' => $e->getTraceAsString()
+ ]
+ );
+ $lbFactory->rollbackMasterChanges( __METHOD__ );
// VW-style hack to work around T190178, so we can make sure
// PageMetaDataUpdater doesn't throw exceptions.
if ( defined( 'MW_PHPUNIT_TEST' ) ) {
}
}
- return $guiError;
+ return $e;
+ }
+
+ /**
+ * Push a task into the job queue system and catch/log any exceptions
+ *
+ * @param EnqueueableDataUpdate $update
+ * @param LBFactory $lbFactory
+ * @param LoggerInterface $logger
+ * @param StatsdDataFactoryInterface $stats
+ * @param string $httpMethod
+ */
+ private static function jobify(
+ EnqueueableDataUpdate $update,
+ LBFactory $lbFactory,
+ LoggerInterface $logger,
+ StatsdDataFactoryInterface $stats,
+ $httpMethod
+ ) {
+ $stats->increment( "deferred_updates.$httpMethod." . get_class( $update ) );
+
+ $e = null;
+ try {
+ $spec = $update->getAsJobSpecification();
+ JobQueueGroup::singleton( $spec['domain'] ?? $spec['wiki'] )->push( $spec['job'] );
+ } catch ( Exception $e ) {
+ } catch ( Throwable $e ) {
+ }
+
+ if ( $e ) {
+ $logger->error(
+ "Job insertion of deferred update {type} failed: {message}",
+ [
+ 'type' => get_class( $update ),
+ 'message' => $e->getMessage(),
+ 'trace' => $e->getTraceAsString()
+ ]
+ );
+ $lbFactory->rollbackMasterChanges( __METHOD__ );
+ }
+ }
+
+ /**
+ * Attempt to run an update with the appropriate transaction round state it expects
+ *
+ * DeferredUpdate classes that wrap the execution of bundles of other DeferredUpdate
+ * instances can use this method to run the updates. Any such wrapper class should
+ * always use TRX_ROUND_ABSENT itself.
+ *
+ * @param DeferrableUpdate $update
+ * @param ILBFactory $lbFactory
+ * @since 1.34
+ */
+ public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
+ if ( $update instanceof DataUpdate ) {
+ $update->setTransactionTicket( $lbFactory->getEmptyTransactionTicket( __METHOD__ ) );
+ }
+
+ if (
+ $update instanceof TransactionRoundAwareUpdate &&
+ $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
+ ) {
+ $fnameTrxOwner = null;
+ } else {
+ $fnameTrxOwner = get_class( $update ) . '::doUpdate';
+ }
+
+ if ( $fnameTrxOwner !== null ) {
+ $lbFactory->beginMasterChanges( $fnameTrxOwner );
+ }
+
+ $update->doUpdate();
+
+ if ( $fnameTrxOwner !== null ) {
+ $lbFactory->commitMasterChanges( $fnameTrxOwner );
+ }
}
/**
foreach ( $updates as $update ) {
if ( $update instanceof EnqueueableDataUpdate ) {
$spec = $update->getAsJobSpecification();
- JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] );
+ $domain = $spec['domain'] ?? $spec['wiki'];
+ JobQueueGroup::singleton( $domain )->push( $spec['job'] );
} else {
$remaining[] = $update;
}