* @ingroup JobQueue
*/
+use MediaWiki\MediaWikiServices;
use MediaWiki\Logger\LoggerFactory;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
protected $logger;
const MAX_ALLOWED_LAG = 3; // abort if more than this much DB lag is present
- const LAG_CHECK_PERIOD = 1.0; // check slave lag this many seconds
+ const LAG_CHECK_PERIOD = 1.0; // check replica DB lag this many seconds
const ERROR_BACKOFF_TTL = 1; // seconds to back off a queue due to errors
/**
}
// Flush any pending DB writes for sanity
- wfGetLBFactory()->commitAll( __METHOD__ );
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $lbFactory->commitAll( __METHOD__ );
- // Catch huge single updates that lead to slave lag
+ // Catch huge single updates that lead to replica DB lag
$trxProfiler = Profiler::instance()->getTransactionProfiler();
$trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
$trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
$jobsPopped = 0;
$timeMsTotal = 0;
$startTime = microtime( true ); // time since jobs started running
- $lastCheckTime = 1; // timestamp of last slave check
+ $lastCheckTime = 1; // timestamp of last replica DB check
do {
// Sync the persistent backoffs with concurrent runners
$backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
$backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
}
+ $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes
$info = $this->executeJob( $job, $stats, $popTime );
if ( $info['status'] !== false || !$job->allowRetries() ) {
$group->ack( $job ); // succeeded or job cannot be retried
+ $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes
}
// Back off of certain jobs for a while (for throttling and for errors)
break;
}
- // Don't let any of the main DB slaves get backed up.
+ // Don't let any of the main DB replica DBs get backed up.
// This only waits for so long before exiting and letting
// other wikis in the farm (on different masters) get a chance.
$timePassed = microtime( true ) - $lastCheckTime;
if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
try {
- wfGetLBFactory()->waitForReplication( [
+ $lbFactory->waitForReplication( [
'ifWritesSince' => $lastCheckTime,
'timeout' => self::MAX_ALLOWED_LAG
] );
}
$lastCheckTime = microtime( true );
}
- // Don't let any queue slaves/backups fall behind
+ // Don't let any queue replica DBs/backups fall behind
if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) {
$group->waitForBackups();
}
$msg = $job->toString() . " STARTING";
$this->logger->debug( $msg );
$this->debugCallback( $msg );
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
// Run the job...
$rssStart = $this->getMaxRssKb();
// Commit all outstanding connections that are in a transaction
// to get a fresh repeatable read snapshot on every connection.
- // Note that jobs are still responsible for handling slave lag.
- wfGetLBFactory()->commitAll( __METHOD__ );
+ // Note that jobs are still responsible for handling replica DB lag.
+ $lbFactory->flushReplicaSnapshots( __METHOD__ );
// Clear out title cache data from prior snapshots
LinkCache::singleton()->clear();
$timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
/**
* Issue a commit on all masters who are currently in a transaction and have
* made changes to the database. It also supports sometimes waiting for the
- * local wiki's slaves to catch up. See the documentation for
+ * local wiki's replica DBs to catch up. See the documentation for
* $wgJobSerialCommitThreshold for more.
*
* @param Job $job
if ( $wgJobSerialCommitThreshold !== false && $lb->getServerCount() > 1 ) {
// Generally, there is one master connection to the local DB
$dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
+ // We need natively blocking fast locks
+ if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
+ $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
+ if ( $time < $wgJobSerialCommitThreshold ) {
+ $dbwSerial = false;
+ }
+ } else {
+ $dbwSerial = false;
+ }
} else {
+ // There are no replica DBs or writes are all to foreign DB (we don't handle that)
$dbwSerial = false;
}
- if ( !$dbwSerial
- || !$dbwSerial->namedLocksEnqueue()
- || $dbwSerial->pendingWriteQueryDuration() < $wgJobSerialCommitThreshold
- ) {
- // Writes are all to foreign DBs, named locks don't form queues,
- // or $wgJobSerialCommitThreshold is not reached; commit changes now
+ if ( !$dbwSerial ) {
wfGetLBFactory()->commitMasterChanges( __METHOD__ );
return;
}
// This will trigger a rollback in the main loop
throw new DBError( $dbwSerial, "Timed out waiting on commit queue." );
}
- // Wait for the slave DBs to catch up
+ // Wait for the replica DBs to catch up
$pos = $lb->getMasterPos();
if ( $pos ) {
$lb->waitForAll( $pos );