<?php
/**
+ * Database load balancing
+ *
* @file
* @ingroup Database
*/
* @ingroup Database
*/
class LoadBalancer {
- /* private */ var $mServers, $mConns, $mLoads, $mGroupLoads;
- /* private */ var $mFailFunction, $mErrorConnection;
- /* private */ var $mReadIndex, $mAllowLagged;
- /* private */ var $mWaitForPos, $mWaitTimeout;
- /* private */ var $mLaggedSlaveMode, $mLastError = 'Unknown error';
- /* private */ var $mParentInfo, $mLagTimes;
- /* private */ var $mLoadMonitorClass, $mLoadMonitor;
+ private $mServers, $mConns, $mLoads, $mGroupLoads;
+ private $mErrorConnection;
+ private $mReadIndex, $mAllowLagged;
+ private $mWaitForPos, $mWaitTimeout;
+ private $mLaggedSlaveMode, $mLastError = 'Unknown error';
+ private $mParentInfo, $mLagTimes;
+ private $mLoadMonitorClass, $mLoadMonitor;
/**
* @param $params Array with keys:
* servers Required. Array of server info structures.
- * failFunction Deprecated, use exceptions instead.
* masterWaitTimeout Replication lag wait timeout
* loadMonitor Name of a class used to fetch server lag and load.
*/
}
$this->mServers = $params['servers'];
- if ( isset( $params['failFunction'] ) ) {
- $this->mFailFunction = $params['failFunction'];
- } else {
- $this->mFailFunction = false;
- }
if ( isset( $params['waitTimeout'] ) ) {
$this->mWaitTimeout = $params['waitTimeout'];
} else {
$this->mWaitForPos = false;
$this->mLaggedSlaveMode = false;
$this->mErrorConnection = false;
- $this->mAllowLag = false;
+ $this->mAllowLagged = false;
$this->mLoadMonitorClass = isset( $params['loadMonitor'] )
? $params['loadMonitor'] : 'LoadMonitor_MySQL';
}
}
- static function newFromParams( $servers, $failFunction = false, $waitTimeout = 10 )
- {
- return new LoadBalancer( $servers, $failFunction, $waitTimeout );
- }
-
/**
* Get a LoadMonitor instance
+ *
+ * @return LoadMonitor
*/
function getLoadMonitor() {
if ( !isset( $this->mLoadMonitor ) ) {
* Given an array of non-normalised probabilities, this function will select
* an element and return the appropriate key
*/
- function pickRandom( $weights )
- {
+ function pickRandom( $weights ) {
if ( !is_array( $weights ) || count( $weights ) == 0 ) {
return false;
}
# Unset excessively lagged servers
$lags = $this->getLagTimes( $wiki );
foreach ( $lags as $i => $lag ) {
- if ( $i != 0 && isset( $this->mServers[$i]['max lag'] ) ) {
+ if ( $i != 0 ) {
if ( $lag === false ) {
wfDebug( "Server #$i is not replicating\n" );
unset( $loads[$i] );
- } elseif ( $lag > $this->mServers[$i]['max lag'] ) {
+ } elseif ( isset( $this->mServers[$i]['max lag'] ) && $lag > $this->mServers[$i]['max lag'] ) {
wfDebug( "Server #$i is excessively lagged ($lag seconds)\n" );
unset( $loads[$i] );
}
# Scale the configured load ratios according to the dynamic load (if the load monitor supports it)
$this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki );
- $i = false;
- $found = false;
$laggedSlaveMode = false;
# First try quickly looking through the available servers for a server that
$i = $this->getRandomNonLagged( $currentLoads, $wiki );
if ( $i === false && count( $currentLoads ) != 0 ) {
# All slaves lagged. Switch to read-only mode
- $wgReadOnly = wfMsgNoDBForContent( 'readonly_lag' );
+ $wgReadOnly = wfMessage( 'readonly_lag' )->useDatabase( false )->plain();
$i = $this->pickRandom( $currentLoads );
$laggedSlaveMode = true;
}
return $t;
}
- /**
- * Get a random server to use in a query group
- * @deprecated use getReaderIndex
- */
- function getGroupIndex( $group ) {
- return $this->getReaderIndex( $group );
- }
-
/**
* Set the master wait position
* If a DB_SLAVE connection has been opened already, waits
}
wfProfileOut( __METHOD__ );
}
+
+ /**
+ * Set the master wait position and wait for ALL slaves to catch up to it
+ */
+ public function waitForAll( $pos ) {
+ wfProfileIn( __METHOD__ );
+ $this->mWaitForPos = $pos;
+ for ( $i = 1; $i < count( $this->mServers ); $i++ ) {
+ $this->doWait( $i , true );
+ }
+ wfProfileOut( __METHOD__ );
+ }
/**
* Get any open connection to a given server index, local or foreign
* Returns false if there is no connection open
+ *
+ * @return DatabaseBase
*/
function getAnyOpenConnection( $i ) {
- foreach ( $this->mConns as $type => $conns ) {
+ foreach ( $this->mConns as $conns ) {
if ( !empty( $conns[$i] ) ) {
return reset( $conns[$i] );
}
/**
* Wait for a given slave to catch up to the master pos stored in $this
*/
- function doWait( $index ) {
+ function doWait( $index, $open = false ) {
# Find a connection to wait on
$conn = $this->getAnyOpenConnection( $index );
if ( !$conn ) {
- wfDebug( __METHOD__ . ": no connection open\n" );
- return false;
+ if ( !$open ) {
+ wfDebug( __METHOD__ . ": no connection open\n" );
+ return false;
+ } else {
+ $conn = $this->openConnection( $index );
+ if ( !$conn ) {
+ wfDebug( __METHOD__ . ": failed to open connection\n" );
+ return false;
+ }
+ }
}
wfDebug( __METHOD__.": Waiting for slave #$index to catch up...\n" );
/**
* Get a connection by index
* This is the main entry point for this class.
+ *
* @param $i Integer: server index
* @param $groups Array: query groups
* @param $wiki String: wiki ID
+ *
+ * @return DatabaseBase
*/
public function &getConnection( $i, $groups = array(), $wiki = false ) {
wfProfileIn( __METHOD__ );
if ( $i === false ) {
$this->mLastError = 'No working slave server: ' . $this->mLastError;
$this->reportConnectionError( $this->mErrorConnection );
+ wfProfileOut( __METHOD__ );
return false;
}
}
* Mark a foreign connection as being available for reuse under a different
* DB name or prefix. This mechanism is reference-counted, and must be called
* the same number of times as getConnection() to work.
+ *
+ * @param DatabaseBase $conn
*/
public function reuseConnection( $conn ) {
$serverIndex = $conn->getLBInfo('serverIndex');
*/
function reallyOpenConnection( $server, $dbNameOverride = false ) {
if( !is_array( $server ) ) {
- throw new MWException( 'You must update your load-balancing configuration. See DefaultSettings.php entry for $wgDBservers.' );
+ throw new MWException( 'You must update your load-balancing configuration. ' .
+ 'See DefaultSettings.php entry for $wgDBservers.' );
}
- extract( $server );
+ $host = $server['host'];
+ $dbname = $server['dbname'];
+
if ( $dbNameOverride !== false ) {
- $dbname = $dbNameOverride;
+ $server['dbname'] = $dbname = $dbNameOverride;
}
- # Get class for this database type
- $class = 'Database' . ucfirst( $type );
-
# Create object
wfDebug( "Connecting to $host $dbname...\n" );
- $db = new $class( $host, $user, $password, $dbname, 1, $flags );
+ $db = DatabaseBase::newFromType( $server['type'], $server );
if ( $db->isOpen() ) {
- wfDebug( "Connected\n" );
+ wfDebug( "Connected to $host $dbname.\n" );
} else {
- wfDebug( "Failed\n" );
+ wfDebug( "Connection failed to $host $dbname.\n" );
}
$db->setLBInfo( $server );
if ( isset( $server['fakeSlaveLag'] ) ) {
// No last connection, probably due to all servers being too busy
wfLogDBError( "LB failure with no last connection\n" );
$conn = new Database;
- if ( $this->mFailFunction ) {
- $conn->failFunction( $this->mFailFunction );
- $conn->reportConnectionError( $this->mLastError );
- } else {
- // If all servers were busy, mLastError will contain something sensible
- throw new DBConnectionError( $conn, $this->mLastError );
- }
+ // If all servers were busy, mLastError will contain something sensible
+ throw new DBConnectionError( $conn, $this->mLastError );
} else {
- if ( $this->mFailFunction ) {
- $conn->failFunction( $this->mFailFunction );
- } else {
- $conn->failFunction( false );
- }
$server = $conn->getProperty( 'mServer' );
wfLogDBError( "Connection error: {$this->mLastError} ({$server})\n" );
$conn->reportConnectionError( "{$this->mLastError} ({$server})" );
}
}
+ /**
+ * Sets the server info structure for the given index. Entry at index $i is created if it doesn't exist
+ */
+ function setServerInfo( $i, $serverInfo ) {
+ $this->mServers[$i] = $serverInfo;
+ }
+
/**
* Get the current master position for chronology control purposes
* @return mixed
);
}
+ /**
+ * Deprecated function, typo in function name
+ */
+ function closeConnecton( $conn ) {
+ $this->closeConnection( $conn );
+ }
+
/**
* Close a connection
* Using this function makes sure the LoadBalancer knows the connection is closed.
* If you use $conn->close() directly, the load balancer won't update its state.
+ * @param $conn
+ * @return void
*/
- function closeConnecton( $conn ) {
+ function closeConnection( $conn ) {
$done = false;
foreach ( $this->mConns as $i1 => $conns2 ) {
foreach ( $conns2 as $i2 => $conns3 ) {
function commitMasterChanges() {
// Always 0, but who knows.. :)
$masterIndex = $this->getWriterIndex();
- foreach ( $this->mConns as $type => $conns2 ) {
+ foreach ( $this->mConns as $conns2 ) {
if ( empty( $conns2[$masterIndex] ) ) {
continue;
}