use Liuggio\StatsdClient\Factory\StatsdDataFactory;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
+use Wikimedia\ScopedCallback;
+use Wikimedia\Rdbms\LBFactory;
+use Wikimedia\Rdbms\DBError;
+use Wikimedia\Rdbms\DBReplicationWaitError;
/**
* Job queue runner utility methods
* @since 1.24
*/
class JobRunner implements LoggerAwareInterface {
+ /** @var Config */
+ protected $config;
/** @var callable|null Debug output handler */
protected $debug;
const MAX_ALLOWED_LAG = 3; // abort if more than this much DB lag is present
const LAG_CHECK_PERIOD = 1.0; // check replica DB lag this many seconds
const ERROR_BACKOFF_TTL = 1; // seconds to back off a queue due to errors
+ const READONLY_BACKOFF_TTL = 30; // seconds to back off a queue due to read-only errors
/**
* @param callable $debug Optional debug output handler
$logger = LoggerFactory::getInstance( 'runJobs' );
}
$this->setLogger( $logger );
+ $this->config = MediaWikiServices::getInstance()->getMainConfig();
}
/**
* @return array Summary response that can easily be JSON serialized
*/
public function run( array $options ) {
- global $wgJobClasses, $wgTrxProfilerLimits;
+ $jobClasses = $this->config->get( 'JobClasses' );
+ $profilerLimits = $this->config->get( 'TrxProfilerLimits' );
$response = [ 'jobs' => [], 'reached' => 'none-ready' ];
$noThrottle = isset( $options['throttle'] ) && !$options['throttle'];
// Bail if job type is invalid
- if ( $type !== false && !isset( $wgJobClasses[$type] ) ) {
+ if ( $type !== false && !isset( $jobClasses[$type] ) ) {
$response['reached'] = 'none-possible';
return $response;
}
// Catch huge single updates that lead to replica DB lag
$trxProfiler = Profiler::instance()->getTransactionProfiler();
$trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
- $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
+ $trxProfiler->setExpectations( $profilerLimits['JobRunner'], __METHOD__ );
// Some jobs types should not run until a certain timestamp
$backoffs = []; // map of (type => UNIX expiry)
// Back off of certain jobs for a while (for throttling and for errors)
if ( $info['status'] === false && mt_rand( 0, 49 ) == 0 ) {
- $ttw = max( $ttw, self::ERROR_BACKOFF_TTL ); // too many errors
+ $ttw = max( $ttw, $this->getErrorBackoffTTL( $info['error'] ) );
$backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
? $backoffDeltas[$jType] + $ttw
: $ttw;
return $response;
}
+ /**
+ * @param string $error
+ * @return int TTL in seconds
+ */
+ private function getErrorBackoffTTL( $error ) {
+ return strpos( $error, 'DBReadOnlyError' ) !== false
+ ? self::READONLY_BACKOFF_TTL
+ : self::ERROR_BACKOFF_TTL;
+ }
+
/**
* @param Job $job
* @param LBFactory $lbFactory
$status = $job->run();
$error = $job->getLastError();
$this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner );
+ // Important: this must be the last deferred update added (T100085, T154425)
+ DeferredUpdates::addCallableUpdate( [ JobQueueGroup::class, 'pushLazyJobs' ] );
// Run any deferred update tasks; doUpdates() manages transactions itself
DeferredUpdates::doUpdates();
} catch ( Exception $e ) {
*/
private function getMaxRssKb() {
$info = wfGetRusage() ?: [];
- // see http://linux.die.net/man/2/getrusage
+ // see https://linux.die.net/man/2/getrusage
return isset( $info['ru_maxrss'] ) ? (int)$info['ru_maxrss'] : null;
}
* @see $wgJobBackoffThrottling
*/
private function getBackoffTimeToWait( Job $job ) {
- global $wgJobBackoffThrottling;
+ $throttling = $this->config->get( 'JobBackoffThrottling' );
- if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) ||
- $job instanceof DuplicateJob // no work was done
- ) {
+ if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) {
return 0; // not throttled
}
- $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()];
+ $itemsPerSecond = $throttling[$job->getType()];
if ( $itemsPerSecond <= 0 ) {
return 0; // not throttled
}
* @throws DBError
*/
private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) {
- global $wgJobSerialCommitThreshold;
+ $syncThreshold = $this->config->get( 'JobSerialCommitThreshold' );
$time = false;
$lb = $lbFactory->getMainLB( wfWikiID() );
- if ( $wgJobSerialCommitThreshold !== false && $lb->getServerCount() > 1 ) {
+ if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) {
// Generally, there is one master connection to the local DB
$dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
// We need natively blocking fast locks
if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
$time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
- if ( $time < $wgJobSerialCommitThreshold ) {
+ if ( $time < $syncThreshold ) {
$dbwSerial = false;
}
} else {
}
if ( !$dbwSerial ) {
- $lbFactory->commitMasterChanges( $fnameTrxOwner );
+ $lbFactory->commitMasterChanges(
+ $fnameTrxOwner,
+ // Abort if any transaction was too big
+ [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
+ );
+
return;
}
}
// Actually commit the DB master changes
- $lbFactory->commitMasterChanges( $fnameTrxOwner );
+ $lbFactory->commitMasterChanges(
+ $fnameTrxOwner,
+ // Abort if any transaction was too big
+ [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
+ );
ScopedCallback::consume( $unlocker );
}
}