Merge "Move LoadMonitor to Rdbms namespace"
[lhc/web/wiklou.git] / includes / libs / rdbms / loadmonitor / LoadMonitor.php
1 <?php
2 /**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 * @ingroup Database
20 */
21
22 namespace Wikimedia\Rdbms;
23
24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
26 use Wikimedia\ScopedCallback;
27 use ILoadBalancer;
28 use IDatabase;
29 use BagOStuff;
30
31 /**
32 * Basic DB load monitor with no external dependencies
33 * Uses memcached to cache the replication lag for a short time
34 *
35 * @ingroup Database
36 */
37 class LoadMonitor implements ILoadMonitor {
38 /** @var ILoadBalancer */
39 protected $parent;
40 /** @var BagOStuff */
41 protected $srvCache;
42 /** @var BagOStuff */
43 protected $mainCache;
44 /** @var LoggerInterface */
45 protected $replLogger;
46
47 /** @var float Moving average ratio (e.g. 0.1 for 10% weight to new weight) */
48 private $movingAveRatio;
49
50 const VERSION = 1; // cache key version
51
52 public function __construct(
53 ILoadBalancer $lb, BagOStuff $srvCache, BagOStuff $cache, array $options = []
54 ) {
55 $this->parent = $lb;
56 $this->srvCache = $srvCache;
57 $this->mainCache = $cache;
58 $this->replLogger = new NullLogger();
59
60 $this->movingAveRatio = isset( $options['movingAveRatio'] )
61 ? $options['movingAveRatio']
62 : 0.1;
63 }
64
65 public function setLogger( LoggerInterface $logger ) {
66 $this->replLogger = $logger;
67 }
68
69 public function scaleLoads( array &$weightByServer, $domain ) {
70 $serverIndexes = array_keys( $weightByServer );
71 $states = $this->getServerStates( $serverIndexes, $domain );
72 $coefficientsByServer = $states['weightScales'];
73 foreach ( $weightByServer as $i => $weight ) {
74 if ( isset( $coefficientsByServer[$i] ) ) {
75 $weightByServer[$i] = $weight * $coefficientsByServer[$i];
76 } else { // server recently added to config?
77 $host = $this->parent->getServerName( $i );
78 $this->replLogger->error( __METHOD__ . ": host $host not in cache" );
79 }
80 }
81 }
82
83 public function getLagTimes( array $serverIndexes, $domain ) {
84 $states = $this->getServerStates( $serverIndexes, $domain );
85
86 return $states['lagTimes'];
87 }
88
89 protected function getServerStates( array $serverIndexes, $domain ) {
90 $writerIndex = $this->parent->getWriterIndex();
91 if ( count( $serverIndexes ) == 1 && reset( $serverIndexes ) == $writerIndex ) {
92 # Single server only, just return zero without caching
93 return [
94 'lagTimes' => [ $writerIndex => 0 ],
95 'weightScales' => [ $writerIndex => 1.0 ]
96 ];
97 }
98
99 $key = $this->getCacheKey( $serverIndexes );
100 # Randomize TTLs to reduce stampedes (4.0 - 5.0 sec)
101 $ttl = mt_rand( 4e6, 5e6 ) / 1e6;
102 # Keep keys around longer as fallbacks
103 $staleTTL = 60;
104
105 # (a) Check the local APC cache
106 $value = $this->srvCache->get( $key );
107 if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
108 $this->replLogger->debug( __METHOD__ . ": got lag times ($key) from local cache" );
109 return $value; // cache hit
110 }
111 $staleValue = $value ?: false;
112
113 # (b) Check the shared cache and backfill APC
114 $value = $this->mainCache->get( $key );
115 if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
116 $this->srvCache->set( $key, $value, $staleTTL );
117 $this->replLogger->debug( __METHOD__ . ": got lag times ($key) from main cache" );
118
119 return $value; // cache hit
120 }
121 $staleValue = $value ?: $staleValue;
122
123 # (c) Cache key missing or expired; regenerate and backfill
124 if ( $this->mainCache->lock( $key, 0, 10 ) ) {
125 # Let this process alone update the cache value
126 $cache = $this->mainCache;
127 /** @noinspection PhpUnusedLocalVariableInspection */
128 $unlocker = new ScopedCallback( function () use ( $cache, $key ) {
129 $cache->unlock( $key );
130 } );
131 } elseif ( $staleValue ) {
132 # Could not acquire lock but an old cache exists, so use it
133 return $staleValue;
134 }
135
136 $lagTimes = [];
137 $weightScales = [];
138 $movAveRatio = $this->movingAveRatio;
139 foreach ( $serverIndexes as $i ) {
140 if ( $i == $this->parent->getWriterIndex() ) {
141 $lagTimes[$i] = 0; // master always has no lag
142 $weightScales[$i] = 1.0; // nominal weight
143 continue;
144 }
145
146 $conn = $this->parent->getAnyOpenConnection( $i );
147 if ( $conn ) {
148 $close = false; // already open
149 } else {
150 $conn = $this->parent->openConnection( $i, $domain );
151 $close = true; // new connection
152 }
153
154 $lastWeight = isset( $staleValue['weightScales'][$i] )
155 ? $staleValue['weightScales'][$i]
156 : 1.0;
157 $coefficient = $this->getWeightScale( $i, $conn ?: null );
158 $newWeight = $movAveRatio * $coefficient + ( 1 - $movAveRatio ) * $lastWeight;
159
160 // Scale from 10% to 100% of nominal weight
161 $weightScales[$i] = max( $newWeight, .10 );
162
163 if ( !$conn ) {
164 $lagTimes[$i] = false;
165 $host = $this->parent->getServerName( $i );
166 $this->replLogger->error( __METHOD__ . ": host $host is unreachable" );
167 continue;
168 }
169
170 if ( $conn->getLBInfo( 'is static' ) ) {
171 $lagTimes[$i] = 0;
172 } else {
173 $lagTimes[$i] = $conn->getLag();
174 if ( $lagTimes[$i] === false ) {
175 $host = $this->parent->getServerName( $i );
176 $this->replLogger->error( __METHOD__ . ": host $host is not replicating?" );
177 }
178 }
179
180 if ( $close ) {
181 # Close the connection to avoid sleeper connections piling up.
182 # Note that the caller will pick one of these DBs and reconnect,
183 # which is slightly inefficient, but this only matters for the lag
184 # time cache miss cache, which is far less common that cache hits.
185 $this->parent->closeConnection( $conn );
186 }
187 }
188
189 # Add a timestamp key so we know when it was cached
190 $value = [
191 'lagTimes' => $lagTimes,
192 'weightScales' => $weightScales,
193 'timestamp' => microtime( true )
194 ];
195 $this->mainCache->set( $key, $value, $staleTTL );
196 $this->srvCache->set( $key, $value, $staleTTL );
197 $this->replLogger->info( __METHOD__ . ": re-calculated lag times ($key)" );
198
199 return $value;
200 }
201
202 /**
203 * @param integer $index Server index
204 * @param IDatabase|null $conn Connection handle or null on connection failure
205 * @return float
206 */
207 protected function getWeightScale( $index, IDatabase $conn = null ) {
208 return $conn ? 1.0 : 0.0;
209 }
210
211 private function getCacheKey( array $serverIndexes ) {
212 sort( $serverIndexes );
213 // Lag is per-server, not per-DB, so key on the master DB name
214 return $this->srvCache->makeGlobalKey(
215 'lag-times',
216 self::VERSION,
217 $this->parent->getServerName( $this->parent->getWriterIndex() ),
218 implode( '-', $serverIndexes )
219 );
220 }
221 }