* Now passing msg:search as the third paramater to googlesearch
[lhc/web/wiklou.git] / includes / LoadBalancer.php
1 <?php
2 /**
3 *
4 * @package MediaWiki
5 */
6
7 /**
8 * Depends on the database object
9 */
10 require_once( 'Database.php' );
11
12 # Valid database indexes
13 # Operation-based indexes
14 define( 'DB_SLAVE', -1 ); # Read from the slave (or only server)
15 define( 'DB_MASTER', -2 ); # Write to master (or only server)
16 define( 'DB_LAST', -3 ); # Whatever database was used last
17
18 # Obsolete aliases
19 define( 'DB_READ', -1 );
20 define( 'DB_WRITE', -2 );
21
22 /**
23 * Database load balancing object
24 *
25 * @todo document
26 * @package MediaWiki
27 */
28 class LoadBalancer {
29 /* private */ var $mServers, $mConnections, $mLoads, $mGroupLoads;
30 /* private */ var $mFailFunction;
31 /* private */ var $mForce, $mReadIndex, $mLastIndex;
32 /* private */ var $mWaitForFile, $mWaitForPos, $mWaitTimeout;
33 /* private */ var $mLaggedSlaveMode;
34
35 function LoadBalancer()
36 {
37 $this->mServers = array();
38 $this->mConnections = array();
39 $this->mFailFunction = false;
40 $this->mReadIndex = -1;
41 $this->mForce = -1;
42 $this->mLastIndex = -1;
43 }
44
45 function newFromParams( $servers, $failFunction = false, $waitTimeout = 10 )
46 {
47 $lb = new LoadBalancer;
48 $lb->initialise( $servers, $failFunction = false );
49 return $lb;
50 }
51
52 function initialise( $servers, $failFunction = false, $waitTimeout = 10 )
53 {
54 $this->mServers = $servers;
55 $this->mFailFunction = $failFunction;
56 $this->mReadIndex = -1;
57 $this->mWriteIndex = -1;
58 $this->mForce = -1;
59 $this->mConnections = array();
60 $this->mLastIndex = 1;
61 $this->mLoads = array();
62 $this->mWaitForFile = false;
63 $this->mWaitForPos = false;
64 $this->mWaitTimeout = $waitTimeout;
65 $this->mLaggedSlaveMode = false;
66
67 foreach( $servers as $i => $server ) {
68 $this->mLoads[$i] = $server['load'];
69 if ( isset( $server['groupLoads'] ) ) {
70 foreach ( $server['groupLoads'] as $group => $ratio ) {
71 if ( !isset( $this->mGroupLoads[$group] ) ) {
72 $this->mGroupLoads[$group] = array();
73 }
74 $this->mGroupLoads[$group][$i] = $ratio;
75 }
76 }
77 }
78 }
79
80 /**
81 * Given an array of non-normalised probabilities, this function will select
82 * an element and return the appropriate key
83 */
84 function pickRandom( $weights )
85 {
86 if ( !is_array( $weights ) || count( $weights ) == 0 ) {
87 return false;
88 }
89
90 $sum = 0;
91 foreach ( $weights as $w ) {
92 $sum += $w;
93 }
94 $max = mt_getrandmax();
95 $rand = mt_rand(0, $max) / $max * $sum;
96
97 $sum = 0;
98 foreach ( $weights as $i => $w ) {
99 $sum += $w;
100 if ( $sum >= $rand ) {
101 break;
102 }
103 }
104 return $i;
105 }
106
107 function getReaderIndex()
108 {
109 $fname = 'LoadBalancer::getReaderIndex';
110 wfProfileIn( $fname );
111
112 $i = false;
113 if ( $this->mForce >= 0 ) {
114 $i = $this->mForce;
115 } else {
116 if ( $this->mReadIndex >= 0 ) {
117 $i = $this->mReadIndex;
118 } else {
119 # $loads is $this->mLoads except with elements knocked out if they
120 # don't work
121 $loads = $this->mLoads;
122 do {
123 $i = $this->pickRandom( $loads );
124 if ( $i !== false ) {
125 wfDebug( "Using reader #$i: {$this->mServers[$i]['host']}...\n" );
126 $this->openConnection( $i );
127
128 if ( !$this->isOpen( $i ) ) {
129 wfDebug( "Failed\n" );
130 unset( $loads[$i] );
131 } elseif ( isset( $this->mServers[$i]['slave pos'] ) ) {
132 wfDebug( "Lagged slave\n" );
133 $this->mLaggedSlaveMode = true;
134 } else {
135 wfDebug( "OK\n" );
136 }
137 }
138 } while ( $i !== false && !$this->isOpen( $i ) );
139
140 if ( $this->isOpen( $i ) ) {
141 $this->mReadIndex = $i;
142 } else {
143 $i = false;
144 }
145 }
146 }
147 wfProfileOut( $fname );
148 return $i;
149 }
150
151 /**
152 * Get a random server to use in a query group
153 */
154 function getGroupIndex( $group ) {
155 if ( isset( $this->mGroupLoads[$group] ) ) {
156 $i = $this->pickRandom( $this->mGroupLoads[$group] );
157 } else {
158 $i = false;
159 }
160 wfDebug( "Query group $group => $i\n" );
161 return $i;
162 }
163
164 /**
165 * Set the master wait position
166 * If a DB_SLAVE connection has been opened already, waits
167 * Otherwise sets a variable telling it to wait if such a connection is opened
168 */
169 function waitFor( $file, $pos ) {
170 $fname = 'LoadBalancer::waitFor';
171 wfProfileIn( $fname );
172
173 wfDebug( "User master pos: $file $pos\n" );
174 $this->mWaitForFile = false;
175 $this->mWaitForPos = false;
176
177 if ( count( $this->mServers ) > 1 ) {
178 $this->mWaitForFile = $file;
179 $this->mWaitForPos = $pos;
180 $i = $this->mReadIndex;
181
182 if ( $i > 0 ) {
183 if ( !$this->doWait( $i ) ) {
184 $this->mServers[$i]['slave pos'] = $this->mConnections[$i]->getSlavePos();
185 $this->mLaggedSlaveMode = true;
186 }
187 }
188 }
189 wfProfileOut( $fname );
190 }
191
192 /**
193 * Wait for a given slave to catch up to the master pos stored in $this
194 */
195 function doWait( $index ) {
196 global $wgMemc;
197
198 $retVal = false;
199
200 # Debugging hacks
201 if ( isset( $this->mServers[$index]['lagged slave'] ) ) {
202 return false;
203 } elseif ( isset( $this->mServers[$index]['fake slave'] ) ) {
204 return true;
205 }
206
207 $key = 'masterpos:' . $index;
208 $memcPos = $wgMemc->get( $key );
209 if ( $memcPos ) {
210 list( $file, $pos ) = explode( ' ', $memcPos );
211 # If the saved position is later than the requested position, return now
212 if ( $file == $this->mWaitForFile && $this->mWaitForPos <= $pos ) {
213 $retVal = true;
214 }
215 }
216
217 if ( !$retVal && $this->isOpen( $index ) ) {
218 $conn =& $this->mConnections[$index];
219 wfDebug( "Waiting for slave #$index to catch up...\n" );
220 $result = $conn->masterPosWait( $this->mWaitForFile, $this->mWaitForPos, $this->mWaitTimeout );
221
222 if ( $result == -1 || is_null( $result ) ) {
223 # Timed out waiting for slave, use master instead
224 wfDebug( "Timed out waiting for slave #$index pos {$this->mWaitForFile} {$this->mWaitForPos}\n" );
225 $retVal = false;
226 } else {
227 $retVal = true;
228 wfDebug( "Done\n" );
229 }
230 }
231 return $retVal;
232 }
233
234 /**
235 * Get a connection by index
236 */
237 function &getConnection( $i, $fail = true, $groups = array() )
238 {
239 $fname = 'LoadBalancer::getConnection';
240 wfProfileIn( $fname );
241
242 # Query groups
243 $groupIndex = false;
244 foreach ( $groups as $group ) {
245 $groupIndex = $this->getGroupIndex( $group );
246 if ( $groupIndex !== false ) {
247 $i = $groupIndex;
248 break;
249 }
250 }
251
252 # Operation-based index
253 if ( $i == DB_SLAVE ) {
254 $i = $this->getReaderIndex();
255 } elseif ( $i == DB_MASTER ) {
256 $i = $this->getWriterIndex();
257 } elseif ( $i == DB_LAST ) {
258 # Just use $this->mLastIndex, which should already be set
259 $i = $this->mLastIndex;
260 if ( $i === -1 ) {
261 # Oh dear, not set, best to use the writer for safety
262 wfDebug( "Warning: DB_LAST used when there was no previous index\n" );
263 $i = $this->getWriterIndex();
264 }
265 }
266 # Now we have an explicit index into the servers array
267 $this->openConnection( $i, $fail );
268
269 wfProfileOut( $fname );
270 return $this->mConnections[$i];
271 }
272
273 /**
274 * Open a connection to the server given by the specified index
275 * Index must be an actual index into the array
276 * Returns success
277 * @private
278 */
279 function openConnection( $i, $fail = false ) {
280 $fname = 'LoadBalancer::openConnection';
281 wfProfileIn( $fname );
282 $success = true;
283
284 if ( !$this->isOpen( $i ) ) {
285 $this->mConnections[$i] = $this->reallyOpenConnection( $this->mServers[$i] );
286
287 if ( $this->isOpen( $i ) && $i != 0 && $this->mWaitForFile ) {
288 if ( !$this->doWait( $i ) ) {
289 $this->mServers[$i]['slave pos'] = $this->mConnections[$i]->getSlavePos();
290 $success = false;
291 }
292 }
293 }
294 if ( !$this->isOpen( $i ) ) {
295 wfDebug( "Failed to connect to database $i at {$this->mServers[$i]['host']}\n" );
296 if ( $fail ) {
297 $this->reportConnectionError( $this->mConnections[$i] );
298 }
299 $this->mConnections[$i] = false;
300 $success = false;
301 }
302 $this->mLastIndex = $i;
303 wfProfileOut( $fname );
304 return $success;
305 }
306
307 /**
308 * Test if the specified index represents an open connection
309 * @private
310 */
311 function isOpen( $index ) {
312 if( !is_integer( $index ) ) {
313 return false;
314 }
315 if ( array_key_exists( $index, $this->mConnections ) && is_object( $this->mConnections[$index] ) &&
316 $this->mConnections[$index]->isOpen() )
317 {
318 return true;
319 } else {
320 return false;
321 }
322 }
323
324 /**
325 * Really opens a connection
326 * @private
327 */
328 function reallyOpenConnection( &$server ) {
329 if( !is_array( $server ) ) {
330 wfDebugDieBacktrace( 'You must update your load-balancing configuration. See DefaultSettings.php entry for $wgDBservers.' );
331 }
332
333 extract( $server );
334 # Get class for this database type
335 $class = 'Database' . ucfirst( $type );
336 if ( !class_exists( $class ) ) {
337 require_once( "$class.php" );
338 }
339
340 # Create object
341 return new $class( $host, $user, $password, $dbname, 1, $flags );
342 }
343
344 function reportConnectionError( &$conn )
345 {
346 $fname = 'LoadBalancer::reportConnectionError';
347 wfProfileIn( $fname );
348 # Prevent infinite recursion
349
350 static $reporting = false;
351 if ( !$reporting ) {
352 $reporting = true;
353 if ( !is_object( $conn ) ) {
354 $conn = new Database;
355 }
356 if ( $this->mFailFunction ) {
357 $conn->failFunction( $this->mFailFunction );
358 } else {
359 $conn->failFunction( 'wfEmergencyAbort' );
360 }
361 $conn->reportConnectionError();
362 $reporting = false;
363 }
364 wfProfileOut( $fname );
365 }
366
367 function getWriterIndex()
368 {
369 return 0;
370 }
371
372 function force( $i )
373 {
374 $this->mForce = $i;
375 }
376
377 function haveIndex( $i )
378 {
379 return array_key_exists( $i, $this->mServers );
380 }
381
382 /**
383 * Get the number of defined servers (not the number of open connections)
384 */
385 function getServerCount() {
386 return count( $this->mServers );
387 }
388
389 /**
390 * Save master pos to the session and to memcached, if the session exists
391 */
392 function saveMasterPos() {
393 global $wgSessionStarted;
394 if ( $wgSessionStarted && count( $this->mServers ) > 1 ) {
395 # If this entire request was served from a slave without opening a connection to the
396 # master (however unlikely that may be), then we can fetch the position from the slave.
397 if ( empty( $this->mConnections[0] ) ) {
398 $conn =& $this->getConnection( DB_SLAVE );
399 list( $file, $pos ) = $conn->getSlavePos();
400 wfDebug( "Saving master pos fetched from slave: $file $pos\n" );
401 } else {
402 $conn =& $this->getConnection( 0 );
403 list( $file, $pos ) = $conn->getMasterPos();
404 wfDebug( "Saving master pos: $file $pos\n" );
405 }
406 if ( $file !== false ) {
407 $_SESSION['master_log_file'] = $file;
408 $_SESSION['master_pos'] = $pos;
409 }
410 }
411 }
412
413 /**
414 * Loads the master pos from the session, waits for it if necessary
415 */
416 function loadMasterPos() {
417 if ( isset( $_SESSION['master_log_file'] ) && isset( $_SESSION['master_pos'] ) ) {
418 $this->waitFor( $_SESSION['master_log_file'], $_SESSION['master_pos'] );
419 }
420 }
421
422 /**
423 * Close all open connections
424 */
425 function closeAll() {
426 foreach( $this->mConnections as $i => $conn ) {
427 if ( $this->isOpen( $i ) ) {
428 // Need to use this syntax because $conn is a copy not a reference
429 $this->mConnections[$i]->close();
430 }
431 }
432 }
433
434 function commitAll() {
435 foreach( $this->mConnections as $i => $conn ) {
436 if ( $this->isOpen( $i ) ) {
437 // Need to use this syntax because $conn is a copy not a reference
438 $this->mConnections[$i]->immediateCommit();
439 }
440 }
441 }
442
443 function waitTimeout( $value = NULL ) {
444 return wfSetVar( $this->mWaitTimeout, $value );
445 }
446
447 function getLaggedSlaveMode() {
448 return $this->mLaggedSlaveMode;
449 }
450
451 function pingAll() {
452 $success = true;
453 foreach ( $this->mConnections as $i => $conn ) {
454 if ( $this->isOpen( $i ) ) {
455 if ( !$this->mConnections[$i]->ping() ) {
456 $success = false;
457 }
458 }
459 }
460 return $success;
461 }
462 }
463
464 ?>