* Introducing bit field for database parameters
[lhc/web/wiklou.git] / includes / LoadBalancer.php
1 <?php
2 # Database load balancing object
3
4 require_once( "Database.php" );
5
6 # Valid database indexes
7 # Operation-based indexes
8 define( "DB_SLAVE", -1 ); # Read from the slave (or only server)
9 define( "DB_MASTER", -2 ); # Write to master (or only server)
10 define( "DB_LAST", -3 ); # Whatever database was used last
11
12 # Obsolete aliases
13 define( "DB_READ", -1 );
14 define( "DB_WRITE", -2 );
15
16 # Task-based indexes
17 # ***NOT USED YET, EXPERIMENTAL***
18 # These may be defined in $wgDBservers. If they aren't, the default reader or writer will be used
19 # Even numbers are always readers, odd numbers are writers
20 define( "DB_TASK_FIRST", 1000 ); # First in list
21 define( "DB_SEARCH_R", 1000 ); # Search read
22 define( "DB_SEARCH_W", 1001 ); # Search write
23 define( "DB_ASKSQL_R", 1002 ); # Special:Asksql read
24 define( "DB_WATCHLIST_R", 1004 ); # Watchlist read
25 define( "DB_TASK_LAST", 1004) ; # Last in list
26
27 define( "MASTER_WAIT_TIMEOUT", 15 ); # Time to wait for a slave to synchronise
28
29 class LoadBalancer {
30 /* private */ var $mServers, $mConnections, $mLoads;
31 /* private */ var $mFailFunction;
32 /* private */ var $mForce, $mReadIndex, $mLastIndex;
33 /* private */ var $mWaitForFile, $mWaitForPos;
34
35 function LoadBalancer()
36 {
37 $this->mServers = array();
38 $this->mConnections = array();
39 $this->mFailFunction = false;
40 $this->mReadIndex = -1;
41 $this->mForce = -1;
42 $this->mLastIndex = -1;
43 }
44
45 function newFromParams( $servers, $failFunction = false )
46 {
47 $lb = new LoadBalancer;
48 $lb->initialise( $servers, $failFunction = false );
49 return $lb;
50 }
51
52 function initialise( $servers, $failFunction = false )
53 {
54 $this->mServers = $servers;
55 $this->mFailFunction = $failFunction;
56 $this->mReadIndex = -1;
57 $this->mWriteIndex = -1;
58 $this->mForce = -1;
59 $this->mConnections = array();
60 $this->mLastIndex = 1;
61 $this->mLoads = array();
62 $this->mWaitForFile = false;
63 $this->mWaitForPos = false;
64
65 foreach( $servers as $i => $server ) {
66 $this->mLoads[$i] = $server['load'];
67 }
68 }
69
70 # Given an array of non-normalised probabilities, this function will select
71 # an element and return the appropriate key
72 function pickRandom( $weights )
73 {
74 if ( !is_array( $weights ) || count( $weights ) == 0 ) {
75 return false;
76 }
77
78 $sum = 0;
79 foreach ( $weights as $w ) {
80 $sum += $w;
81 }
82 $max = mt_getrandmax();
83 $rand = mt_rand(0, $max) / $max * $sum;
84
85 $sum = 0;
86 foreach ( $weights as $i => $w ) {
87 $sum += $w;
88 if ( $sum >= $rand ) {
89 break;
90 }
91 }
92 return $i;
93 }
94
95 function getReaderIndex()
96 {
97 $i = false;
98 if ( $this->mForce >= 0 ) {
99 $i = $this->mForce;
100 } else {
101 if ( $this->mReadIndex >= 0 ) {
102 $i = $this->mReadIndex;
103 } else {
104 # $loads is $this->mLoads except with elements knocked out if they
105 # don't work
106 $loads = $this->mLoads;
107 do {
108 $i = $this->pickRandom( $loads );
109 if ( $i !== false ) {
110 wfDebug( "Using reader #$i: {$this->mServers[$i]['host']}\n" );
111
112 $conn =& $this->getConnection( $i );
113 $this->mConnections[$i] =& $conn;
114
115 if ( !$conn->isOpen() ) {
116 unset( $loads[$i] );
117 }
118 }
119 } while ( $i !== false && !$conn->isOpen() );
120 if ( $conn->isOpen() ) {
121 $this->mReadIndex = $i;
122 } else {
123 $i = false;
124 }
125 }
126 }
127 return $i;
128 }
129
130 # Set the master wait position
131 # If a DB_SLAVE connection has been opened already, waits
132 # Otherwise sets a variable telling it to wait if such a connection is opened
133 function waitFor( $file, $pos ) {
134 wfDebug( "User master pos: $file $pos\n" );
135 $this->mWaitForFile = false;
136 $this->mWaitForPos = false;
137
138 if ( count( $this->mServers ) == 1 ) {
139 return;
140 }
141
142 $this->mWaitForFile = $file;
143 $this->mWaitForPos = $pos;
144
145 if ( $this->mReadIndex > 0 ) {
146 if ( !$this->doWait( $this->mReadIndex ) ) {
147 # Use master instead
148 $this->mReadIndex = 0;
149 }
150 }
151 }
152
153 # Wait for a given slave to catch up to the master pos stored in $this
154 function doWait( $index ) {
155 global $wgMemc;
156
157 $key = "masterpos:" . $index;
158 $memcPos = $wgMemc->get( $key );
159 if ( $memcPos ) {
160 list( $file, $pos ) = explode( ' ', $memcPos );
161 # If the saved position is later than the requested position, return now
162 if ( $file == $this->mWaitForFile && $this->mWaitForPos <= $pos ) {
163 return true;
164 }
165 }
166
167 $conn =& $this->getConnection( $index );
168 wfDebug( "Waiting for slave #$index to catch up...\n" );
169 $result = $conn->masterPosWait( $this->mWaitForFile, $this->mWaitForPos, MASTER_WAIT_TIMEOUT );
170
171 if ( $result == -1 || is_null( $result ) ) {
172 # Timed out waiting for slave, use master instead
173 wfDebug( "Timed out waiting for slave #$index pos {$this->mWaitForFile} {$this->mWaitForPos}\n" );
174 $retVal = false;
175 } else {
176 $retVal = true;
177 wfDebug( "Done\n" );
178 }
179 return $retVal;
180 }
181
182 # Get a connection by index
183 function &getConnection( $i, $fail = false )
184 {
185 /*
186 # Task-based index
187 if ( $i >= DB_TASK_FIRST && $i < DB_TASK_LAST ) {
188 if ( $i % 2 ) {
189 # Odd index use writer
190 $i = DB_MASTER;
191 } else {
192 # Even index use reader
193 $i = DB_SLAVE;
194 }
195 }*/
196
197 # Operation-based index
198 if ( $i == DB_SLAVE ) {
199 # Note: re-entrant
200 $i = $this->getReaderIndex();
201 } elseif ( $i == DB_MASTER ) {
202 $i = $this->getWriterIndex();
203 } elseif ( $i == DB_LAST ) {
204 # Just use $this->mLastIndex, which should already be set
205 $i = $this->mLastIndex;
206 if ( $i === -1 ) {
207 # Oh dear, not set, best to use the writer for safety
208 $i = $this->getWriterIndex();
209 }
210 }
211 # Now we have an explicit index into the servers array
212 if ( !$this->isOpen( $i ) ) {
213 $this->mConnections[$i] = $this->makeConnection( $this->mServers[$i] );
214
215 if ( $i != 0 && $this->mWaitForFile ) {
216 if ( !$this->doWait( $i ) ) {
217 # Error waiting for this slave, use master instead
218 $this->mReadIndex = 0;
219 $i = 0;
220 if ( !$this->isOpen( 0 ) ) {
221 $this->mConnections[0] = $this->makeConnection( $this->mServers[0] );
222 }
223 wfDebug( "Failed over to {$this->mConnections[0]->mServer}\n" );
224 }
225 }
226 }
227 if ( !$this->isOpen( $i ) ) {
228 wfDebug( "Failed to connect to database $i at {$this->mServers[$i]['host']}\n" );
229 if ( $fail ) {
230 $this->reportConnectionError( $this->mConnections[$i] );
231 }
232 $this->mConnections[$i] = false;
233 }
234 $this->mLastIndex = $i;
235
236 return $this->mConnections[$i];
237 }
238
239 /* private */ function isOpen( $index ) {
240 if ( array_key_exists( $index, $this->mConnections ) && $this->mConnections[$index]->isOpen() ) {
241 return true;
242 } else {
243 return false;
244 }
245 }
246
247 /* private */ function makeConnection( &$server ) {
248 extract( $server );
249 # Get class for this database type
250 $class = 'Database' . ucfirst( $type );
251 if ( !class_exists( $class ) ) {
252 require_once( "$class.php" );
253 }
254
255 # Create object
256 return new $class( $host, $user, $password, $dbname, 1, $flags );
257 }
258
259 function reportConnectionError( &$conn )
260 {
261 if ( !is_object( $conn ) ) {
262 $conn = new Database;
263 }
264 if ( $this->mFailFunction ) {
265 $conn->setFailFunction( $this->mFailFunction );
266 } else {
267 $conn->setFailFunction( "wfEmergencyAbort" );
268 }
269 $conn->reportConnectionError();
270 }
271
272 function getWriterIndex()
273 {
274 return 0;
275 }
276
277 function force( $i )
278 {
279 $this->mForce = $i;
280 }
281
282 function haveIndex( $i )
283 {
284 return array_key_exists( $i, $this->mServers );
285 }
286
287 # Get the number of defined servers (not the number of open connections)
288 function getServerCount() {
289 return count( $this->mServers );
290 }
291
292 # Save master pos to the session and to memcached, if the session exists
293 function saveMasterPos() {
294 global $wgSessionStarted;
295 if ( $wgSessionStarted && count( $this->mServers ) > 1 ) {
296 # If this entire request was served from a slave without opening a connection to the
297 # master (however unlikely that may be), then we can fetch the position from the slave.
298 if ( empty( $this->mConnections[0] ) ) {
299 $conn =& $this->getConnection( DB_SLAVE );
300 list( $file, $pos ) = $conn->getSlavePos();
301 wfDebug( "Saving master pos fetched from slave: $file $pos\n" );
302 } else {
303 $conn =& $this->getConnection( 0 );
304 list( $file, $pos ) = $conn->getMasterPos();
305 wfDebug( "Saving master pos: $file $pos\n" );
306 }
307 if ( $file !== false ) {
308 $_SESSION['master_log_file'] = $file;
309 $_SESSION['master_pos'] = $pos;
310 }
311 }
312 }
313
314 # Loads the master pos from the session, waits for it if necessary
315 function loadMasterPos() {
316 if ( isset( $_SESSION['master_log_file'] ) && isset( $_SESSION['master_pos'] ) ) {
317 $this->waitFor( $_SESSION['master_log_file'], $_SESSION['master_pos'] );
318 }
319 }
320
321 # Close all open connections
322 function closeAll() {
323 foreach( $this->mConnections as $i => $conn ) {
324 if ( $this->isOpen( $i ) ) {
325 $conn->close();
326 }
327 }
328 }
329
330 function commitAll() {
331 foreach( $this->mConnections as $i => $conn ) {
332 if ( $this->isOpen( $i ) ) {
333 $conn->immediateCommit();
334 }
335 }
336 }
337 }