*/
class LoadBalancer {
private $mServers, $mConns, $mLoads, $mGroupLoads;
+
+ /** @var bool|DatabaseBase Database connection that caused a problem */
private $mErrorConnection;
private $mReadIndex, $mAllowLagged;
foreach ( $lags as $i => $lag ) {
if ( $i != 0 ) {
if ( $lag === false ) {
- wfDebugLog( 'replication', "Server #$i is not replicating\n" );
+ wfDebugLog( 'replication', "Server #$i is not replicating" );
unset( $loads[$i] );
} elseif ( isset( $this->mServers[$i]['max lag'] ) && $lag > $this->mServers[$i]['max lag'] ) {
- wfDebugLog( 'replication', "Server #$i is excessively lagged ($lag seconds)\n" );
+ wfDebugLog( 'replication', "Server #$i is excessively lagged ($lag seconds)" );
unset( $loads[$i] );
}
}
* @return bool|int|string
*/
function getReaderIndex( $group = false, $wiki = false ) {
- global $wgReadOnly, $wgDBClusterTimeout, $wgDBAvgStatusPoll, $wgDBtype;
+ global $wgReadOnly, $wgDBtype;
# @todo FIXME: For now, only go through all this for mysql databases
if ( $wgDBtype != 'mysql' ) {
if ( count( $this->mServers ) == 1 ) {
# Skip the load balancing if there's only one server
return 0;
- } elseif ( $group === false and $this->mReadIndex >= 0 ) {
+ } elseif ( $group === false && $this->mReadIndex >= 0 ) {
# Shortcut if generic reader exists already
return $this->mReadIndex;
}
- wfProfileIn( __METHOD__ );
-
- $totalElapsed = 0;
-
- # convert from seconds to microseconds
- $timeout = $wgDBClusterTimeout * 1e6;
+ $section = new ProfileSection( __METHOD__ );
# Find the relevant load array
if ( $group !== false ) {
} else {
# No loads for this group, return false and the caller can use some other group
wfDebug( __METHOD__ . ": no loads for group $group\n" );
- wfProfileOut( __METHOD__ );
return false;
}
$nonErrorLoads = $this->mLoads;
}
- if ( !$nonErrorLoads ) {
- wfProfileOut( __METHOD__ );
+ if ( !count( $nonErrorLoads ) ) {
throw new MWException( "Empty server array given to LoadBalancer" );
}
$laggedSlaveMode = false;
+ # No server found yet
+ $i = false;
# First try quickly looking through the available servers for a server that
# meets our criteria
- do {
- $totalThreadsConnected = 0;
- $overloadedServers = 0;
- $currentLoads = $nonErrorLoads;
- while ( count( $currentLoads ) ) {
- if ( $wgReadOnly || $this->mAllowLagged || $laggedSlaveMode ) {
+ $currentLoads = $nonErrorLoads;
+ while ( count( $currentLoads ) ) {
+ if ( $wgReadOnly || $this->mAllowLagged || $laggedSlaveMode ) {
+ $i = ArrayUtils::pickRandom( $currentLoads );
+ } else {
+ $i = $this->getRandomNonLagged( $currentLoads, $wiki );
+ if ( $i === false && count( $currentLoads ) != 0 ) {
+ # All slaves lagged. Switch to read-only mode
+ wfDebugLog( 'replication', "All slaves lagged. Switch to read-only mode" );
+ $wgReadOnly = 'The database has been automatically locked ' .
+ 'while the slave database servers catch up to the master';
$i = ArrayUtils::pickRandom( $currentLoads );
- } else {
- $i = $this->getRandomNonLagged( $currentLoads, $wiki );
- if ( $i === false && count( $currentLoads ) != 0 ) {
- # All slaves lagged. Switch to read-only mode
- wfDebugLog( 'replication', "All slaves lagged. Switch to read-only mode\n" );
- $wgReadOnly = 'The database has been automatically locked ' .
- 'while the slave database servers catch up to the master';
- $i = ArrayUtils::pickRandom( $currentLoads );
- $laggedSlaveMode = true;
- }
- }
-
- if ( $i === false ) {
- # pickRandom() returned false
- # This is permanent and means the configuration or the load monitor
- # wants us to return false.
- wfDebugLog( 'connect', __METHOD__ . ": pickRandom() returned false\n" );
- wfProfileOut( __METHOD__ );
-
- return false;
- }
-
- wfDebugLog( 'connect', __METHOD__ . ": Using reader #$i: {$this->mServers[$i]['host']}...\n" );
- $conn = $this->openConnection( $i, $wiki );
-
- if ( !$conn ) {
- wfDebugLog( 'connect', __METHOD__ . ": Failed connecting to $i/$wiki\n" );
- unset( $nonErrorLoads[$i] );
- unset( $currentLoads[$i] );
- continue;
+ $laggedSlaveMode = true;
}
+ }
- // Perform post-connection backoff
- $threshold = isset( $this->mServers[$i]['max threads'] )
- ? $this->mServers[$i]['max threads'] : false;
- $backoff = $this->getLoadMonitor()->postConnectionBackoff( $conn, $threshold );
-
- // Decrement reference counter, we are finished with this connection.
- // It will be incremented for the caller later.
- if ( $wiki !== false ) {
- $this->reuseConnection( $conn );
- }
+ if ( $i === false ) {
+ # pickRandom() returned false
+ # This is permanent and means the configuration or the load monitor
+ # wants us to return false.
+ wfDebugLog( 'connect', __METHOD__ . ": pickRandom() returned false" );
- if ( $backoff ) {
- # Post-connection overload, don't use this server for now
- $totalThreadsConnected += $backoff;
- $overloadedServers++;
- unset( $currentLoads[$i] );
- } else {
- # Return this server
- break 2;
- }
+ return false;
}
- # No server found yet
- $i = false;
+ wfDebugLog( 'connect', __METHOD__ .
+ ": Using reader #$i: {$this->mServers[$i]['host']}..." );
- # If all servers were down, quit now
- if ( !count( $nonErrorLoads ) ) {
- wfDebugLog( 'connect', "All servers down\n" );
- break;
+ $conn = $this->openConnection( $i, $wiki );
+ if ( !$conn ) {
+ wfDebugLog( 'connect', __METHOD__ . ": Failed connecting to $i/$wiki" );
+ unset( $nonErrorLoads[$i] );
+ unset( $currentLoads[$i] );
+ continue;
}
- # Some servers must have been overloaded
- if ( $overloadedServers == 0 ) {
- throw new MWException( __METHOD__ . ": unexpectedly found no overloaded servers" );
+ // Decrement reference counter, we are finished with this connection.
+ // It will be incremented for the caller later.
+ if ( $wiki !== false ) {
+ $this->reuseConnection( $conn );
}
- # Back off for a while
- # Scale the sleep time by the number of connected threads, to produce a
- # roughly constant global poll rate
- $avgThreads = $totalThreadsConnected / $overloadedServers;
- $totalElapsed += $this->sleep( $wgDBAvgStatusPoll * $avgThreads );
- } while ( $totalElapsed < $timeout );
-
- if ( $totalElapsed >= $timeout ) {
- wfDebugLog( 'connect', "All servers busy\n" );
- $this->mErrorConnection = false;
- $this->mLastError = 'All servers busy';
+
+ # Return this server
+ break;
+ }
+
+ # If all servers were down, quit now
+ if ( !count( $nonErrorLoads ) ) {
+ wfDebugLog( 'connect', "All servers down" );
}
if ( $i !== false ) {
$this->mServers[$i]['slave pos'] = $conn->getSlavePos();
}
}
- if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $i !== false ) {
+ if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group !== false ) {
$this->mReadIndex = $i;
}
}
- wfProfileOut( __METHOD__ );
return $i;
}
/**
* Returns true if the specified index is a valid server index
*
- * @param $i
+ * @param string $i
* @return bool
*/
function haveIndex( $i ) {
/**
* Returns true if the specified index is valid and has non-zero load
*
- * @param $i
+ * @param string $i
* @return bool
*/
function isNonZeroLoad( $i ) {
/**
* Get the host name or IP address of the server with the specified index
* Prefer a readable name if available.
- * @param $i
+ * @param string $i
* @return string
*/
function getServerName( $i ) {
function closeAll() {
foreach ( $this->mConns as $conns2 ) {
foreach ( $conns2 as $conns3 ) {
+ /** @var DatabaseBase $conn */
foreach ( $conns3 as $conn ) {
$conn->close();
}
* Deprecated function, typo in function name
*
* @deprecated in 1.18
- * @param $conn
+ * @param DatabaseBase $conn
*/
function closeConnecton( $conn ) {
wfDeprecated( __METHOD__, '1.18' );
* Close a connection
* Using this function makes sure the LoadBalancer knows the connection is closed.
* If you use $conn->close() directly, the load balancer won't update its state.
- * @param $conn DatabaseBase
+ * @param DatabaseBase $conn
*/
function closeConnection( $conn ) {
$done = false;
function commitAll() {
foreach ( $this->mConns as $conns2 ) {
foreach ( $conns2 as $conns3 ) {
+ /** @var DatabaseBase[] $conns3 */
foreach ( $conns3 as $conn ) {
if ( $conn->trxLevel() ) {
$conn->commit( __METHOD__, 'flush' );
if ( empty( $conns2[$masterIndex] ) ) {
continue;
}
+ /** @var DatabaseBase $conn */
foreach ( $conns2[$masterIndex] as $conn ) {
if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
$conn->commit( __METHOD__, 'flush' );
$success = true;
foreach ( $this->mConns as $conns2 ) {
foreach ( $conns2 as $conns3 ) {
+ /** @var DatabaseBase[] $conns3 */
foreach ( $conns3 as $conn ) {
if ( !$conn->ping() ) {
$success = false;