* @ingroup Database
*/
+namespace Wikimedia\Rdbms;
+
use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
+use Wikimedia\ScopedCallback;
+use BagOStuff;
/**
* Basic DB load monitor with no external dependencies
/** @var LoggerInterface */
protected $replLogger;
- public function __construct( ILoadBalancer $lb, BagOStuff $srvCache, BagOStuff $cache ) {
+ /** @var float Moving average ratio (e.g. 0.1 for 10% weight to new weight) */
+ private $movingAveRatio;
+
+ const VERSION = 1; // cache key version
+
+ public function __construct(
+ ILoadBalancer $lb, BagOStuff $srvCache, BagOStuff $cache, array $options = []
+ ) {
$this->parent = $lb;
$this->srvCache = $srvCache;
$this->mainCache = $cache;
- $this->replLogger = new \Psr\Log\NullLogger();
+ $this->replLogger = new NullLogger();
+
+ $this->movingAveRatio = isset( $options['movingAveRatio'] )
+ ? $options['movingAveRatio']
+ : 0.1;
}
public function setLogger( LoggerInterface $logger ) {
$this->replLogger = $logger;
}
- public function scaleLoads( &$loads, $group = false, $domain = false ) {
+ public function scaleLoads( array &$weightByServer, $domain ) {
+ $serverIndexes = array_keys( $weightByServer );
+ $states = $this->getServerStates( $serverIndexes, $domain );
+ $coefficientsByServer = $states['weightScales'];
+ foreach ( $weightByServer as $i => $weight ) {
+ if ( isset( $coefficientsByServer[$i] ) ) {
+ $weightByServer[$i] = $weight * $coefficientsByServer[$i];
+ } else { // server recently added to config?
+ $host = $this->parent->getServerName( $i );
+ $this->replLogger->error( __METHOD__ . ": host $host not in cache" );
+ }
+ }
+ }
+
+ public function getLagTimes( array $serverIndexes, $domain ) {
+ $states = $this->getServerStates( $serverIndexes, $domain );
+
+ return $states['lagTimes'];
}
- public function getLagTimes( $serverIndexes, $domain ) {
- if ( count( $serverIndexes ) == 1 && reset( $serverIndexes ) == 0 ) {
+ protected function getServerStates( array $serverIndexes, $domain ) {
+ $writerIndex = $this->parent->getWriterIndex();
+ if ( count( $serverIndexes ) == 1 && reset( $serverIndexes ) == $writerIndex ) {
# Single server only, just return zero without caching
- return [ 0 => 0 ];
+ return [
+ 'lagTimes' => [ $writerIndex => 0 ],
+ 'weightScales' => [ $writerIndex => 1.0 ]
+ ];
}
- $key = $this->getLagTimeCacheKey();
+ $key = $this->getCacheKey( $serverIndexes );
# Randomize TTLs to reduce stampedes (4.0 - 5.0 sec)
$ttl = mt_rand( 4e6, 5e6 ) / 1e6;
# Keep keys around longer as fallbacks
$value = $this->srvCache->get( $key );
if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
$this->replLogger->debug( __METHOD__ . ": got lag times ($key) from local cache" );
- return $value['lagTimes']; // cache hit
+ return $value; // cache hit
}
$staleValue = $value ?: false;
$this->srvCache->set( $key, $value, $staleTTL );
$this->replLogger->debug( __METHOD__ . ": got lag times ($key) from main cache" );
- return $value['lagTimes']; // cache hit
+ return $value; // cache hit
}
$staleValue = $value ?: $staleValue;
} );
} elseif ( $staleValue ) {
# Could not acquire lock but an old cache exists, so use it
- return $staleValue['lagTimes'];
+ return $staleValue;
}
$lagTimes = [];
+ $weightScales = [];
+ $movAveRatio = $this->movingAveRatio;
foreach ( $serverIndexes as $i ) {
if ( $i == $this->parent->getWriterIndex() ) {
$lagTimes[$i] = 0; // master always has no lag
+ $weightScales[$i] = 1.0; // nominal weight
continue;
}
$close = true; // new connection
}
+ $lastWeight = isset( $staleValue['weightScales'][$i] )
+ ? $staleValue['weightScales'][$i]
+ : 1.0;
+ $coefficient = $this->getWeightScale( $i, $conn ?: null );
+ $newWeight = $movAveRatio * $coefficient + ( 1 - $movAveRatio ) * $lastWeight;
+
+ // Scale from 10% to 100% of nominal weight
+ $weightScales[$i] = max( $newWeight, .10 );
+
if ( !$conn ) {
$lagTimes[$i] = false;
$host = $this->parent->getServerName( $i );
- $this->replLogger->error( __METHOD__ . ": host $host (#$i) is unreachable" );
+ $this->replLogger->error(
+ __METHOD__ . ": host {db_server} is unreachable",
+ [ 'db_server' => $host ]
+ );
continue;
}
- $lagTimes[$i] = $conn->getLag();
- if ( $lagTimes[$i] === false ) {
- $host = $this->parent->getServerName( $i );
- $this->replLogger->error( __METHOD__ . ": host $host (#$i) is not replicating?" );
+ if ( $conn->getLBInfo( 'is static' ) ) {
+ $lagTimes[$i] = 0;
+ } else {
+ $lagTimes[$i] = $conn->getLag();
+ if ( $lagTimes[$i] === false ) {
+ $host = $this->parent->getServerName( $i );
+ $this->replLogger->error(
+ __METHOD__ . ": host {db_server} is not replicating?",
+ [ 'db_server' => $host ]
+ );
+ }
}
if ( $close ) {
}
# Add a timestamp key so we know when it was cached
- $value = [ 'lagTimes' => $lagTimes, 'timestamp' => microtime( true ) ];
+ $value = [
+ 'lagTimes' => $lagTimes,
+ 'weightScales' => $weightScales,
+ 'timestamp' => microtime( true )
+ ];
$this->mainCache->set( $key, $value, $staleTTL );
$this->srvCache->set( $key, $value, $staleTTL );
$this->replLogger->info( __METHOD__ . ": re-calculated lag times ($key)" );
- return $value['lagTimes'];
+ return $value;
}
- public function clearCaches() {
- $key = $this->getLagTimeCacheKey();
- $this->srvCache->delete( $key );
- $this->mainCache->delete( $key );
+ /**
+ * @param integer $index Server index
+ * @param IDatabase|null $conn Connection handle or null on connection failure
+ * @return float
+ */
+ protected function getWeightScale( $index, IDatabase $conn = null ) {
+ return $conn ? 1.0 : 0.0;
}
- private function getLagTimeCacheKey() {
- $writerIndex = $this->parent->getWriterIndex();
+ private function getCacheKey( array $serverIndexes ) {
+ sort( $serverIndexes );
// Lag is per-server, not per-DB, so key on the master DB name
return $this->srvCache->makeGlobalKey(
'lag-times',
- $this->parent->getServerName( $writerIndex )
+ self::VERSION,
+ $this->parent->getServerName( $this->parent->getWriterIndex() ),
+ implode( '-', $serverIndexes )
);
}
}