Merge "MimeAnalyzer: Add testcases for mp3 detection"
[lhc/web/wiklou.git] / includes / libs / rdbms / loadmonitor / LoadMonitor.php
index 1da8f4e..4300e9f 100644 (file)
  * @ingroup Database
  */
 
+namespace Wikimedia\Rdbms;
+
 use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
+use Wikimedia\ScopedCallback;
+use BagOStuff;
 
 /**
  * Basic DB load monitor with no external dependencies
@@ -37,27 +42,59 @@ class LoadMonitor implements ILoadMonitor {
        /** @var LoggerInterface */
        protected $replLogger;
 
-       public function __construct( ILoadBalancer $lb, BagOStuff $srvCache, BagOStuff $cache ) {
+       /** @var float Moving average ratio (e.g. 0.1 for 10% weight to new weight) */
+       private $movingAveRatio;
+
+       const VERSION = 1; // cache key version
+
+       public function __construct(
+               ILoadBalancer $lb, BagOStuff $srvCache, BagOStuff $cache, array $options = []
+       ) {
                $this->parent = $lb;
                $this->srvCache = $srvCache;
                $this->mainCache = $cache;
-               $this->replLogger = new \Psr\Log\NullLogger();
+               $this->replLogger = new NullLogger();
+
+               $this->movingAveRatio = isset( $options['movingAveRatio'] )
+                       ? $options['movingAveRatio']
+                       : 0.1;
        }
 
        public function setLogger( LoggerInterface $logger ) {
                $this->replLogger = $logger;
        }
 
-       public function scaleLoads( &$loads, $group = false, $domain = false ) {
+       public function scaleLoads( array &$weightByServer, $domain ) {
+               $serverIndexes = array_keys( $weightByServer );
+               $states = $this->getServerStates( $serverIndexes, $domain );
+               $coefficientsByServer = $states['weightScales'];
+               foreach ( $weightByServer as $i => $weight ) {
+                       if ( isset( $coefficientsByServer[$i] ) ) {
+                               $weightByServer[$i] = $weight * $coefficientsByServer[$i];
+                       } else { // server recently added to config?
+                               $host = $this->parent->getServerName( $i );
+                               $this->replLogger->error( __METHOD__ . ": host $host not in cache" );
+                       }
+               }
+       }
+
+       public function getLagTimes( array $serverIndexes, $domain ) {
+               $states = $this->getServerStates( $serverIndexes, $domain );
+
+               return $states['lagTimes'];
        }
 
-       public function getLagTimes( $serverIndexes, $domain ) {
-               if ( count( $serverIndexes ) == 1 && reset( $serverIndexes ) == 0 ) {
+       protected function getServerStates( array $serverIndexes, $domain ) {
+               $writerIndex = $this->parent->getWriterIndex();
+               if ( count( $serverIndexes ) == 1 && reset( $serverIndexes ) == $writerIndex ) {
                        # Single server only, just return zero without caching
-                       return [ 0 => 0 ];
+                       return [
+                               'lagTimes' => [ $writerIndex => 0 ],
+                               'weightScales' => [ $writerIndex => 1.0 ]
+                       ];
                }
 
-               $key = $this->getLagTimeCacheKey();
+               $key = $this->getCacheKey( $serverIndexes );
                # Randomize TTLs to reduce stampedes (4.0 - 5.0 sec)
                $ttl = mt_rand( 4e6, 5e6 ) / 1e6;
                # Keep keys around longer as fallbacks
@@ -67,7 +104,7 @@ class LoadMonitor implements ILoadMonitor {
                $value = $this->srvCache->get( $key );
                if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
                        $this->replLogger->debug( __METHOD__ . ": got lag times ($key) from local cache" );
-                       return $value['lagTimes']; // cache hit
+                       return $value; // cache hit
                }
                $staleValue = $value ?: false;
 
@@ -77,7 +114,7 @@ class LoadMonitor implements ILoadMonitor {
                        $this->srvCache->set( $key, $value, $staleTTL );
                        $this->replLogger->debug( __METHOD__ . ": got lag times ($key) from main cache" );
 
-                       return $value['lagTimes']; // cache hit
+                       return $value; // cache hit
                }
                $staleValue = $value ?: $staleValue;
 
@@ -91,13 +128,16 @@ class LoadMonitor implements ILoadMonitor {
                        } );
                } elseif ( $staleValue ) {
                        # Could not acquire lock but an old cache exists, so use it
-                       return $staleValue['lagTimes'];
+                       return $staleValue;
                }
 
                $lagTimes = [];
+               $weightScales = [];
+               $movAveRatio = $this->movingAveRatio;
                foreach ( $serverIndexes as $i ) {
                        if ( $i == $this->parent->getWriterIndex() ) {
                                $lagTimes[$i] = 0; // master always has no lag
+                               $weightScales[$i] = 1.0; // nominal weight
                                continue;
                        }
 
@@ -109,17 +149,36 @@ class LoadMonitor implements ILoadMonitor {
                                $close = true; // new connection
                        }
 
+                       $lastWeight = isset( $staleValue['weightScales'][$i] )
+                               ? $staleValue['weightScales'][$i]
+                               : 1.0;
+                       $coefficient = $this->getWeightScale( $i, $conn ?: null );
+                       $newWeight = $movAveRatio * $coefficient + ( 1 - $movAveRatio ) * $lastWeight;
+
+                       // Scale from 10% to 100% of nominal weight
+                       $weightScales[$i] = max( $newWeight, .10 );
+
                        if ( !$conn ) {
                                $lagTimes[$i] = false;
                                $host = $this->parent->getServerName( $i );
-                               $this->replLogger->error( __METHOD__ . ": host $host (#$i) is unreachable" );
+                               $this->replLogger->error(
+                                       __METHOD__ . ": host {db_server} is unreachable",
+                                       [ 'db_server' => $host ]
+                               );
                                continue;
                        }
 
-                       $lagTimes[$i] = $conn->getLag();
-                       if ( $lagTimes[$i] === false ) {
-                               $host = $this->parent->getServerName( $i );
-                               $this->replLogger->error( __METHOD__ . ": host $host (#$i) is not replicating?" );
+                       if ( $conn->getLBInfo( 'is static' ) ) {
+                               $lagTimes[$i] = 0;
+                       } else {
+                               $lagTimes[$i] = $conn->getLag();
+                               if ( $lagTimes[$i] === false ) {
+                                       $host = $this->parent->getServerName( $i );
+                                       $this->replLogger->error(
+                                               __METHOD__ . ": host {db_server} is not replicating?",
+                                               [ 'db_server' => $host ]
+                                       );
+                               }
                        }
 
                        if ( $close ) {
@@ -132,26 +191,35 @@ class LoadMonitor implements ILoadMonitor {
                }
 
                # Add a timestamp key so we know when it was cached
-               $value = [ 'lagTimes' => $lagTimes, 'timestamp' => microtime( true ) ];
+               $value = [
+                       'lagTimes' => $lagTimes,
+                       'weightScales' => $weightScales,
+                       'timestamp' => microtime( true )
+               ];
                $this->mainCache->set( $key, $value, $staleTTL );
                $this->srvCache->set( $key, $value, $staleTTL );
                $this->replLogger->info( __METHOD__ . ": re-calculated lag times ($key)" );
 
-               return $value['lagTimes'];
+               return $value;
        }
 
-       public function clearCaches() {
-               $key = $this->getLagTimeCacheKey();
-               $this->srvCache->delete( $key );
-               $this->mainCache->delete( $key );
+       /**
+        * @param integer $index Server index
+        * @param IDatabase|null $conn Connection handle or null on connection failure
+        * @return float
+        */
+       protected function getWeightScale( $index, IDatabase $conn = null ) {
+               return $conn ? 1.0 : 0.0;
        }
 
-       private function getLagTimeCacheKey() {
-               $writerIndex = $this->parent->getWriterIndex();
+       private function getCacheKey( array $serverIndexes ) {
+               sort( $serverIndexes );
                // Lag is per-server, not per-DB, so key on the master DB name
                return $this->srvCache->makeGlobalKey(
                        'lag-times',
-                       $this->parent->getServerName( $writerIndex )
+                       self::VERSION,
+                       $this->parent->getServerName( $this->parent->getWriterIndex() ),
+                       implode( '-', $serverIndexes )
                );
        }
 }