3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
22 use Psr\Log\LoggerInterface
;
25 * Basic DB load monitor with no external dependencies
26 * Uses memcached to cache the replication lag for a short time
30 class LoadMonitor
implements ILoadMonitor
{
31 /** @var ILoadBalancer */
37 /** @var LoggerInterface */
38 protected $replLogger;
40 /** @var float Moving average ratio (e.g. 0.1 for 10% weight to new weight) */
41 private $movingAveRatio;
45 public function __construct(
46 ILoadBalancer
$lb, BagOStuff
$srvCache, BagOStuff
$cache, array $options = []
49 $this->srvCache
= $srvCache;
50 $this->mainCache
= $cache;
51 $this->replLogger
= new \Psr\Log\
NullLogger();
53 $this->movingAveRatio
= isset( $options['movingAveRatio'] )
54 ?
$options['movingAveRatio']
58 public function setLogger( LoggerInterface
$logger ) {
59 $this->replLogger
= $logger;
62 public function scaleLoads( array &$weightByServer, $group = false, $domain = false ) {
63 $serverIndexes = array_keys( $weightByServer );
64 $states = $this->getServerStates( $serverIndexes, $domain );
65 $coefficientsByServer = $states['weightScales'];
66 foreach ( $weightByServer as $i => $weight ) {
67 $weightByServer[$i] = $weight * $coefficientsByServer[$i];
71 public function getLagTimes( array $serverIndexes, $domain ) {
72 $states = $this->getServerStates( $serverIndexes, $domain );
74 return $states['lagTimes'];
77 protected function getServerStates( array $serverIndexes, $domain ) {
78 if ( count( $serverIndexes ) == 1 && reset( $serverIndexes ) == 0 ) {
79 # Single server only, just return zero without caching
81 'lagTimes' => [ $this->parent
->getWriterIndex() => 0 ],
82 'weightScales' => [ $this->parent
->getWriterIndex() => 1 ]
86 $key = $this->getCacheKey();
87 # Randomize TTLs to reduce stampedes (4.0 - 5.0 sec)
88 $ttl = mt_rand( 4e6
, 5e6
) / 1e6
;
89 # Keep keys around longer as fallbacks
92 # (a) Check the local APC cache
93 $value = $this->srvCache
->get( $key );
94 if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
95 $this->replLogger
->debug( __METHOD__
. ": got lag times ($key) from local cache" );
96 return $value; // cache hit
98 $staleValue = $value ?
: false;
100 # (b) Check the shared cache and backfill APC
101 $value = $this->mainCache
->get( $key );
102 if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
103 $this->srvCache
->set( $key, $value, $staleTTL );
104 $this->replLogger
->debug( __METHOD__
. ": got lag times ($key) from main cache" );
106 return $value; // cache hit
108 $staleValue = $value ?
: $staleValue;
110 # (c) Cache key missing or expired; regenerate and backfill
111 if ( $this->mainCache
->lock( $key, 0, 10 ) ) {
112 # Let this process alone update the cache value
113 $cache = $this->mainCache
;
114 /** @noinspection PhpUnusedLocalVariableInspection */
115 $unlocker = new ScopedCallback( function () use ( $cache, $key ) {
116 $cache->unlock( $key );
118 } elseif ( $staleValue ) {
119 # Could not acquire lock but an old cache exists, so use it
125 $movAveRatio = $this->movingAveRatio
;
126 foreach ( $serverIndexes as $i ) {
127 if ( $i == $this->parent
->getWriterIndex() ) {
128 $lagTimes[$i] = 0; // master always has no lag
129 $weightScales[$i] = 1.0; // nominal weight
133 $conn = $this->parent
->getAnyOpenConnection( $i );
135 $close = false; // already open
137 $conn = $this->parent
->openConnection( $i, $domain );
138 $close = true; // new connection
141 $lastWeight = isset( $staleValue['weightScales'][$i] )
142 ?
$staleValue['weightScales'][$i]
144 $coefficient = $this->getWeightScale( $i, $conn ?
: null );
145 $newWeight = $movAveRatio * $coefficient +
( 1 - $movAveRatio ) * $lastWeight;
147 // Scale from 10% to 100% of nominal weight
148 $weightScales[$i] = max( $newWeight, .10 );
151 $lagTimes[$i] = false;
152 $host = $this->parent
->getServerName( $i );
153 $this->replLogger
->error( __METHOD__
. ": host $host is unreachable" );
157 $lagTimes[$i] = $conn->getLag();
158 if ( $lagTimes[$i] === false ) {
159 $host = $this->parent
->getServerName( $i );
160 $this->replLogger
->error( __METHOD__
. ": host $host is not replicating?" );
164 # Close the connection to avoid sleeper connections piling up.
165 # Note that the caller will pick one of these DBs and reconnect,
166 # which is slightly inefficient, but this only matters for the lag
167 # time cache miss cache, which is far less common that cache hits.
168 $this->parent
->closeConnection( $conn );
172 # Add a timestamp key so we know when it was cached
174 'lagTimes' => $lagTimes,
175 'weightScales' => $weightScales,
176 'timestamp' => microtime( true )
178 $this->mainCache
->set( $key, $value, $staleTTL );
179 $this->srvCache
->set( $key, $value, $staleTTL );
180 $this->replLogger
->info( __METHOD__
. ": re-calculated lag times ($key)" );
186 * @param integer $index Server index
187 * @param IDatabase|null $conn Connection handle or null on connection failure
190 protected function getWeightScale( $index, IDatabase
$conn = null ) {
191 return $conn ?
1.0 : 0.0;
194 public function clearCaches() {
195 $key = $this->getCacheKey();
196 $this->srvCache
->delete( $key );
197 $this->mainCache
->delete( $key );
200 private function getCacheKey() {
201 // Lag is per-server, not per-DB, so key on the master DB name
202 return $this->srvCache
->makeGlobalKey(
205 $this->parent
->getServerName( $this->parent
->getWriterIndex() )