use Psr\Log\NullLogger;
use Wikimedia\ScopedCallback;
use BagOStuff;
+use WANObjectCache;
/**
* Basic DB load monitor with no external dependencies
protected $parent;
/** @var BagOStuff */
protected $srvCache;
- /** @var BagOStuff */
- protected $mainCache;
+ /** @var WANObjectCache */
+ protected $wanCache;
/** @var LoggerInterface */
protected $replLogger;
/** @var float Moving average ratio (e.g. 0.1 for 10% weight to new weight) */
private $movingAveRatio;
+ /** @var int Amount of replication lag in seconds before warnings are logged */
+ private $lagWarnThreshold;
- const VERSION = 1; // cache key version
+ /** @var int cache key version */
+ const VERSION = 1;
+ /** @var int Default 'max lag' in seconds when unspecified */
+ const LAG_WARN_THRESHOLD = 10;
+ /**
+ * @param ILoadBalancer $lb
+ * @param BagOStuff $srvCache
+ * @param WANObjectCache $wCache
+ * @param array $options
+ * - movingAveRatio: moving average constant for server weight updates based on lag
+ * - lagWarnThreshold: how many seconds of lag trigger warnings
+ */
public function __construct(
- ILoadBalancer $lb, BagOStuff $srvCache, BagOStuff $cache, array $options = []
+ ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options = []
) {
$this->parent = $lb;
$this->srvCache = $srvCache;
- $this->mainCache = $cache;
+ $this->wanCache = $wCache;
$this->replLogger = new NullLogger();
$this->movingAveRatio = isset( $options['movingAveRatio'] )
? $options['movingAveRatio']
: 0.1;
+ $this->lagWarnThreshold = isset( $options['lagWarnThreshold'] )
+ ? $options['lagWarnThreshold']
+ : self::LAG_WARN_THRESHOLD;
}
public function setLogger( LoggerInterface $logger ) {
$this->replLogger = $logger;
}
- public function scaleLoads( array &$weightByServer, $domain ) {
+ final public function scaleLoads( array &$weightByServer, $domain ) {
$serverIndexes = array_keys( $weightByServer );
$states = $this->getServerStates( $serverIndexes, $domain );
- $coefficientsByServer = $states['weightScales'];
+ $newScalesByServer = $states['weightScales'];
foreach ( $weightByServer as $i => $weight ) {
- if ( isset( $coefficientsByServer[$i] ) ) {
- $weightByServer[$i] = $weight * $coefficientsByServer[$i];
+ if ( isset( $newScalesByServer[$i] ) ) {
+ $weightByServer[$i] = $weight * $newScalesByServer[$i];
} else { // server recently added to config?
$host = $this->parent->getServerName( $i );
$this->replLogger->error( __METHOD__ . ": host $host not in cache" );
}
}
- public function getLagTimes( array $serverIndexes, $domain ) {
- $states = $this->getServerStates( $serverIndexes, $domain );
-
- return $states['lagTimes'];
+ final public function getLagTimes( array $serverIndexes, $domain ) {
+ return $this->getServerStates( $serverIndexes, $domain )['lagTimes'];
}
protected function getServerStates( array $serverIndexes, $domain ) {
$staleValue = $value ?: false;
# (b) Check the shared cache and backfill APC
- $value = $this->mainCache->get( $key );
+ $value = $this->wanCache->get( $key );
if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
$this->srvCache->set( $key, $value, $staleTTL );
$this->replLogger->debug( __METHOD__ . ": got lag times ($key) from main cache" );
$staleValue = $value ?: $staleValue;
# (c) Cache key missing or expired; regenerate and backfill
- if ( $this->mainCache->lock( $key, 0, 10 ) ) {
- # Let this process alone update the cache value
- $cache = $this->mainCache;
+ if ( $this->srvCache->lock( $key, 0, 10 ) ) {
+ # Let only this process update the cache value on this server
+ $sCache = $this->srvCache;
/** @noinspection PhpUnusedLocalVariableInspection */
- $unlocker = new ScopedCallback( function () use ( $cache, $key ) {
- $cache->unlock( $key );
+ $unlocker = new ScopedCallback( function () use ( $sCache, $key ) {
+ $sCache->unlock( $key );
} );
} elseif ( $staleValue ) {
# Could not acquire lock but an old cache exists, so use it
}
$conn = $this->parent->getAnyOpenConnection( $i );
- if ( $conn ) {
+ if ( $conn && !$conn->trxLevel() ) {
+ # Handles with open transactions are avoided since they might be subject
+ # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
$close = false; // already open
} else {
- $conn = $this->parent->openConnection( $i, $domain );
+ $conn = $this->parent->openConnection( $i, '' );
$close = true; // new connection
}
$newWeight = $movAveRatio * $coefficient + ( 1 - $movAveRatio ) * $lastWeight;
// Scale from 10% to 100% of nominal weight
- $weightScales[$i] = max( $newWeight, .10 );
+ $weightScales[$i] = max( $newWeight, 0.10 );
+
+ $host = $this->parent->getServerName( $i );
if ( !$conn ) {
$lagTimes[$i] = false;
- $host = $this->parent->getServerName( $i );
$this->replLogger->error(
__METHOD__ . ": host {db_server} is unreachable",
[ 'db_server' => $host ]
} else {
$lagTimes[$i] = $conn->getLag();
if ( $lagTimes[$i] === false ) {
- $host = $this->parent->getServerName( $i );
$this->replLogger->error(
__METHOD__ . ": host {db_server} is not replicating?",
[ 'db_server' => $host ]
);
+ } elseif ( $lagTimes[$i] > $this->lagWarnThreshold ) {
+ $this->replLogger->error(
+ "Server {host} has {lag} seconds of lag (>= {maxlag})",
+ [
+ 'host' => $host,
+ 'lag' => $lagTimes[$i],
+ 'maxlag' => $this->lagWarnThreshold
+ ]
+ );
}
}
'weightScales' => $weightScales,
'timestamp' => microtime( true )
];
- $this->mainCache->set( $key, $value, $staleTTL );
+ $this->wanCache->set( $key, $value, $staleTTL );
$this->srvCache->set( $key, $value, $staleTTL );
$this->replLogger->info( __METHOD__ . ": re-calculated lag times ($key)" );
}
/**
- * @param integer $index Server index
+ * @param int $index Server index
* @param IDatabase|null $conn Connection handle or null on connection failure
* @return float
*/