use MediaWiki\MediaWikiServices;
use MediaWiki\Logger\LoggerFactory;
-use Liuggio\StatsdClient\Factory\StatsdDataFactory;
+use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
use Wikimedia\ScopedCallback;
}
/**
- * @param Job $job
+ * @param RunnableJob $job
* @param LBFactory $lbFactory
- * @param StatsdDataFactory $stats
+ * @param StatsdDataFactoryInterface $stats
* @param float $popTime
* @return array Map of status/error/timeMs
*/
- private function executeJob( Job $job, LBFactory $lbFactory, $stats, $popTime ) {
+ private function executeJob( RunnableJob $job, LBFactory $lbFactory, $stats, $popTime ) {
$jType = $job->getType();
$msg = $job->toString() . " STARTING";
$this->logger->debug( $msg, [
] );
$this->debugCallback( $msg );
+ // Clear out title cache data from prior snapshots
+ // (e.g. from before JobRunner was invoked in this process)
+ MediaWikiServices::getInstance()->getLinkCache()->clear();
+
// Run the job...
$rssStart = $this->getMaxRssKb();
$jobStartTime = microtime( true );
try {
$fnameTrxOwner = get_class( $job ) . '::run'; // give run() outer scope
- if ( !$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
- $lbFactory->beginMasterChanges( $fnameTrxOwner );
+ // Flush any pending changes left over from an implicit transaction round
+ if ( $job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
+ $lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
+ } else {
+ $lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
}
+ // Clear any stale REPEATABLE-READ snapshots from replica DB connections
+ $lbFactory->flushReplicaSnapshots( $fnameTrxOwner );
$status = $job->run();
$error = $job->getLastError();
+ // Commit all pending changes from this job
$this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner );
// Run any deferred update tasks; doUpdates() manages transactions itself
DeferredUpdates::doUpdates();
MWExceptionHandler::logException( $e );
}
- // Commit all outstanding connections that are in a transaction
- // to get a fresh repeatable read snapshot on every connection.
- // Note that jobs are still responsible for handling replica DB lag.
- $lbFactory->flushReplicaSnapshots( __METHOD__ );
- // Clear out title cache data from prior snapshots
- MediaWikiServices::getInstance()->getLinkCache()->clear();
$timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
$rssEnd = $this->getMaxRssKb();
}
/**
- * @param Job $job
+ * @param RunnableJob $job
* @return int Seconds for this runner to avoid doing more jobs of this type
* @see $wgJobBackoffThrottling
*/
- private function getBackoffTimeToWait( Job $job ) {
+ private function getBackoffTimeToWait( RunnableJob $job ) {
$throttling = $this->config->get( 'JobBackoffThrottling' );
if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) {
* $wgJobSerialCommitThreshold for more.
*
* @param LBFactory $lbFactory
- * @param Job $job
+ * @param RunnableJob $job
* @param string $fnameTrxOwner
* @throws DBError
*/
- private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) {
+ private function commitMasterChanges( LBFactory $lbFactory, RunnableJob $job, $fnameTrxOwner ) {
$syncThreshold = $this->config->get( 'JobSerialCommitThreshold' );
$time = false;
$lb = $lbFactory->getMainLB();
- if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) {
+ if ( $syncThreshold !== false && $lb->hasStreamingReplicaServers() ) {
// Generally, there is one master connection to the local DB
$dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
// We need natively blocking fast locks