private $errorConnection;
/** @var int The generic (not query grouped) replica DB index */
private $genericReadIndex = -1;
- /** @var bool|DBMasterPos False if not set */
+ /** @var int[] The group replica DB indexes keyed by group */
+ private $readIndexByGroup = [];
+ /** @var bool|DBMasterPos Replication sync position or false if not set */
private $waitForPos;
/** @var bool Whether the generic reader fell back to a lagged replica DB */
private $laggedReplicaMode = false;
private $lastError = 'Unknown error';
/** @var string|bool Reason the LB is read-only or false if not */
private $readOnlyReason = false;
- /** @var int Total connections opened */
- private $connsOpened = 0;
+ /** @var int Total number of new connections ever made with this instance */
+ private $connectionCounter = 0;
/** @var bool */
private $disabled = false;
/** @var bool Whether any connection has been attempted yet */
const ROUND_ERROR = 'error';
public function __construct( array $params ) {
- if ( !isset( $params['servers'] ) ) {
- throw new InvalidArgumentException( __CLASS__ . ': missing "servers" parameter' );
+ if ( !isset( $params['servers'] ) || !count( $params['servers'] ) ) {
+ throw new InvalidArgumentException( 'Missing or empty "servers" parameter' );
}
- $this->servers = $params['servers'];
- foreach ( $this->servers as $i => $server ) {
+
+ $listKey = -1;
+ $this->servers = [];
+ $this->genericLoads = [];
+ foreach ( $params['servers'] as $i => $server ) {
+ if ( ++$listKey !== $i ) {
+ throw new UnexpectedValueException( 'List expected for "servers" parameter' );
+ }
if ( $i == 0 ) {
- $this->servers[$i]['master'] = true;
+ $server['master'] = true;
} else {
- $this->servers[$i]['replica'] = true;
+ $server['replica'] = true;
+ }
+ $this->servers[$i] = $server;
+
+ $this->genericLoads[$i] = $server['load'];
+ if ( isset( $server['groupLoads'] ) ) {
+ foreach ( $server['groupLoads'] as $group => $ratio ) {
+ $this->groupLoads[$group][$i] = $ratio;
+ }
}
}
$this->waitTimeout = $params['waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
- $this->conns = [
- // Connection were transaction rounds may be applied
- self::KEY_LOCAL => [],
- self::KEY_FOREIGN_INUSE => [],
- self::KEY_FOREIGN_FREE => [],
- // Auto-committing counterpart connections that ignore transaction rounds
- self::KEY_LOCAL_NOROUND => [],
- self::KEY_FOREIGN_INUSE_NOROUND => [],
- self::KEY_FOREIGN_FREE_NOROUND => []
- ];
- $this->genericLoads = [];
+ $this->conns = self::newTrackedConnectionsArray();
$this->waitForPos = false;
$this->allowLagged = false;
$this->loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => 'LoadMonitorNull' ];
$this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ];
- foreach ( $params['servers'] as $i => $server ) {
- $this->genericLoads[$i] = $server['load'];
- if ( isset( $server['groupLoads'] ) ) {
- foreach ( $server['groupLoads'] as $group => $ratio ) {
- if ( !isset( $this->groupLoads[$group] ) ) {
- $this->groupLoads[$group] = [];
- }
- $this->groupLoads[$group][$i] = $ratio;
- }
- }
- }
-
$this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff();
$this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
$this->profiler = $params['profiler'] ?? null;
$this->ownerId = $params['ownerId'] ?? null;
}
+ private static function newTrackedConnectionsArray() {
+ return [
+ // Connection were transaction rounds may be applied
+ self::KEY_LOCAL => [],
+ self::KEY_FOREIGN_INUSE => [],
+ self::KEY_FOREIGN_FREE => [],
+ // Auto-committing counterpart connections that ignore transaction rounds
+ self::KEY_LOCAL_NOROUND => [],
+ self::KEY_FOREIGN_INUSE_NOROUND => [],
+ self::KEY_FOREIGN_FREE_NOROUND => []
+ ];
+ }
+
public function getLocalDomainID() {
return $this->localDomain->getId();
}
}
/**
+ * @param int $flags
+ * @return bool
+ */
+ private function sanitizeConnectionFlags( $flags ) {
+ if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) {
+ // Assuming all servers are of the same type (or similar), which is overwhelmingly
+ // the case, use the master server information to get the attributes. The information
+ // for $i cannot be used since it might be DB_REPLICA, which might require connection
+ // attempts in order to be resolved into a real server index.
+ $attributes = $this->getServerAttributes( $this->getWriterIndex() );
+ if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
+ // Callers sometimes want to (a) escape REPEATABLE-READ stateness without locking
+ // rows (e.g. FOR UPDATE) or (b) make small commits during a larger transactions
+ // to reduce lock contention. None of these apply for sqlite and using separate
+ // connections just causes self-deadlocks.
+ $flags &= ~self::CONN_TRX_AUTOCOMMIT;
+ $this->connLogger->info( __METHOD__ .
+ ': ignoring CONN_TRX_AUTOCOMMIT to avoid deadlocks.' );
+ }
+ }
+
+ return $flags;
+ }
+
+ /**
+ * @param IDatabase $conn
+ * @param int $flags
+ * @throws DBUnexpectedError
+ */
+ private function enforceConnectionFlags( IDatabase $conn, $flags ) {
+ if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
+ if ( $conn->trxLevel() ) { // sanity
+ throw new DBUnexpectedError(
+ $conn,
+ 'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
+ );
+ }
+
+ $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
+ }
+ }
+
+ /**
* Get a LoadMonitor instance
*
* @return ILoadMonitor
# Unset excessively lagged servers
foreach ( $lags as $i => $lag ) {
- if ( $i != 0 ) {
+ if ( $i !== $this->getWriterIndex() ) {
# How much lag this server nominally is allowed to have
$maxServerLag = $this->servers[$i]['max lag'] ?? $this->maxLag; // default
# Constrain that futher by $maxLag argument
* @param int $i
* @param array $groups
* @param string|bool $domain
- * @return int
+ * @return int The index of a specific server (replica DBs are checked for connectivity)
*/
private function getConnectionIndex( $i, $groups, $domain ) {
// Check one "group" per default: the generic pool
? $defaultGroups
: (array)$groups;
- if ( $i == self::DB_MASTER ) {
+ if ( $i === self::DB_MASTER ) {
$i = $this->getWriterIndex();
- } elseif ( $i == self::DB_REPLICA ) {
+ } elseif ( $i === self::DB_REPLICA ) {
# Try to find an available server in any the query groups (in order)
foreach ( $groups as $group ) {
$groupIndex = $this->getReaderIndex( $group, $domain );
}
# Operation-based index
- if ( $i == self::DB_REPLICA ) {
+ if ( $i === self::DB_REPLICA ) {
$this->lastError = 'Unknown error'; // reset error string
# Try the general server pool if $groups are unavailable.
$i = ( $groups === [ false ] )
$this->lastError = 'No working replica DB server: ' . $this->lastError;
// Throw an exception
$this->reportConnectionError();
- return null; // not reached
+ return null; // unreachable due to exception
}
}
}
public function getReaderIndex( $group = false, $domain = false ) {
- if ( count( $this->servers ) == 1 ) {
+ if ( $this->getServerCount() == 1 ) {
// Skip the load balancing if there's only one server
return $this->getWriterIndex();
- } elseif ( $group === false && $this->genericReadIndex >= 0 ) {
- // A generic reader index was already selected and "waitForPos" was handled
- return $this->genericReadIndex;
+ }
+
+ $index = $this->getExistingReaderIndex( $group );
+ if ( $index >= 0 ) {
+ // A reader index was already selected and "waitForPos" was handled
+ return $index;
}
if ( $group !== false ) {
$this->getLoadMonitor()->scaleLoads( $loads, $domain );
// Pick a server to use, accounting for weights, load, lag, and "waitForPos"
+ $this->lazyLoadReplicationPositions(); // optimizes server candidate selection
list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
if ( $i === false ) {
// DB connection unsuccessful
// If data seen by queries is expected to reflect the transactions committed as of
// or after a given replication position then wait for the DB to apply those changes
- if ( $this->waitForPos && $i != $this->getWriterIndex() && !$this->doWait( $i ) ) {
+ if ( $this->waitForPos && $i !== $this->getWriterIndex() && !$this->doWait( $i ) ) {
// Data will be outdated compared to what was expected
$laggedReplicaMode = true;
}
- if ( $this->genericReadIndex < 0 && $this->genericLoads[$i] > 0 && $group === false ) {
- // Cache the generic (ungrouped) reader index for future DB_REPLICA handles
- $this->genericReadIndex = $i;
- // Record if the generic reader index is in "lagged replica DB" mode
- $this->laggedReplicaMode = ( $laggedReplicaMode || $this->laggedReplicaMode );
+ // Cache the reader index for future DB_REPLICA handles
+ $this->setExistingReaderIndex( $group, $i );
+ // Record whether the generic reader index is in "lagged replica DB" mode
+ if ( $group === false && $laggedReplicaMode ) {
+ $this->laggedReplicaMode = true;
}
$serverName = $this->getServerName( $i );
return $i;
}
+ /**
+ * Get the server index chosen by the load balancer for use with the given query group
+ *
+ * @param string|bool $group Query group; use false for the generic group
+ * @return int Server index or -1 if none was chosen
+ */
+ protected function getExistingReaderIndex( $group ) {
+ if ( $group === false ) {
+ $index = $this->genericReadIndex;
+ } else {
+ $index = $this->readIndexByGroup[$group] ?? -1;
+ }
+
+ return $index;
+ }
+
+ /**
+ * Set the server index chosen by the load balancer for use with the given query group
+ *
+ * @param string|bool $group Query group; use false for the generic group
+ * @param int $index The index of a specific server
+ */
+ private function setExistingReaderIndex( $group, $index ) {
+ if ( $index < 0 ) {
+ throw new UnexpectedValueException( "Cannot set a negative read server index" );
+ }
+
+ if ( $group === false ) {
+ $this->genericReadIndex = $index;
+ } else {
+ $this->readIndexByGroup[$group] = $index;
+ }
+ }
+
/**
* @param array $loads List of server weights
* @param string|bool $domain
} else {
$i = false;
if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
+ $this->replLogger->debug( __METHOD__ . ": replication positions detected" );
// "chronologyCallback" sets "waitForPos" for session consistency.
// This triggers doWait() after connect, so it's especially good to
// avoid lagged servers so as to avoid excessive delay in that method.
// Any server with less lag than it's 'max lag' param is preferable
$i = $this->getRandomNonLagged( $currentLoads, $domain );
}
- if ( $i === false && count( $currentLoads ) != 0 ) {
+ if ( $i === false && count( $currentLoads ) ) {
// All replica DBs lagged. Switch to read-only mode
$this->replLogger->error(
__METHOD__ . ": all replica DBs lagged. Switch to read-only mode" );
$serverName = $this->getServerName( $i );
$this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
- $conn = $this->openConnection( $i, $domain );
+ $conn = $this->getConnection( $i, [], $domain, self::CONN_SILENCE_ERRORS );
if ( !$conn ) {
$this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" );
unset( $currentLoads[$i] ); // avoid this server next iteration
$oldPos = $this->waitForPos;
try {
$this->waitForPos = $pos;
- $serverCount = count( $this->servers );
+ $serverCount = $this->getServerCount();
$ok = true;
for ( $i = 1; $i < $serverCount; $i++ ) {
}
/**
- * Wait for a given replica DB to catch up to the master pos stored in $this
+ * Wait for a given replica DB to catch up to the master pos stored in "waitForPos"
* @param int $index Server index
* @param bool $open Check the server even if a new connection has to be made
- * @param int|null $timeout Max seconds to wait; default is "waitTimeout" given to __construct()
+ * @param int|null $timeout Max seconds to wait; default is "waitTimeout"
* @return bool
*/
protected function doWait( $index, $open = false, $timeout = null ) {
);
return false;
- } else {
- $conn = $this->openConnection( $index, self::DOMAIN_ANY );
- if ( !$conn ) {
- $this->replLogger->warning(
- __METHOD__ . ': failed to connect to {dbserver}',
- [ 'dbserver' => $server ]
- );
+ }
+ // Open a temporary new connection in order to wait for replication
+ $conn = $this->getConnection( $index, [], self::DOMAIN_ANY, self::CONN_SILENCE_ERRORS );
+ if ( !$conn ) {
+ $this->replLogger->warning(
+ __METHOD__ . ': failed to connect to {dbserver}',
+ [ 'dbserver' => $server ]
+ );
- return false;
- }
- // Avoid connection spam in waitForAll() when connections
- // are made just for the sake of doing this lag check.
- $close = true;
+ return false;
}
+ // Avoid connection spam in waitForAll() when connections
+ // are made just for the sake of doing this lag check.
+ $close = true;
}
$this->replLogger->info(
}
public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
- if ( $i === null || $i === false ) {
+ if ( !is_int( $i ) ) {
throw new InvalidArgumentException( "Cannot connect without a server index" );
+ } elseif ( $groups && $i > 0 ) {
+ throw new InvalidArgumentException( "Got query groups with server index #$i" );
}
$domain = $this->resolveDomainID( $domain );
- $masterOnly = ( $i == self::DB_MASTER || $i == $this->getWriterIndex() );
-
- if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) {
- // Assuming all servers are of the same type (or similar), which is overwhelmingly
- // the case, use the master server information to get the attributes. The information
- // for $i cannot be used since it might be DB_REPLICA, which might require connection
- // attempts in order to be resolved into a real server index.
- $attributes = $this->getServerAttributes( $this->getWriterIndex() );
- if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
- // Callers sometimes want to (a) escape REPEATABLE-READ stateness without locking
- // rows (e.g. FOR UPDATE) or (b) make small commits during a larger transactions
- // to reduce lock contention. None of these apply for sqlite and using separate
- // connections just causes self-deadlocks.
- $flags &= ~self::CONN_TRX_AUTOCOMMIT;
- $this->connLogger->info( __METHOD__ .
- ': ignoring CONN_TRX_AUTOCOMMIT to avoid deadlocks.' );
- }
- }
+ $flags = $this->sanitizeConnectionFlags( $flags );
+ $masterOnly = ( $i === self::DB_MASTER || $i === $this->getWriterIndex() );
// Number of connections made before getting the server index and handle
- $priorConnectionsMade = $this->connsOpened;
- // Decide which server to use (might trigger new connections)
+ $priorConnectionsMade = $this->connectionCounter;
+ // Choose a server if $i is DB_MASTER/DB_REPLICA (might trigger new connections)
$serverIndex = $this->getConnectionIndex( $i, $groups, $domain );
// Get an open connection to that server (might trigger a new connection)
- $conn = $this->openConnection( $serverIndex, $domain, $flags );
- if ( !$conn ) {
- $this->reportConnectionError();
- return null; // unreachable due to exception
+ $conn = $this->localDomain->equals( $domain )
+ ? $this->getLocalConnection( $serverIndex, $flags )
+ : $this->getForeignConnection( $serverIndex, $domain, $flags );
+ // Throw an error or bail out if the connection attempt failed
+ if ( !( $conn instanceof IDatabase ) ) {
+ if ( ( $flags & self::CONN_SILENCE_ERRORS ) != self::CONN_SILENCE_ERRORS ) {
+ $this->reportConnectionError();
+ }
+
+ return false;
}
// Profile any new connections caused by this method
- if ( $this->connsOpened > $priorConnectionsMade ) {
+ if ( $this->connectionCounter > $priorConnectionsMade ) {
$host = $conn->getServer();
$dbname = $conn->getDBname();
$this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
}
- if ( $serverIndex == $this->getWriterIndex() ) {
+ if ( !$conn->isOpen() ) {
+ // Connection was made but later unrecoverably lost for some reason.
+ // Do not return a handle that will just throw exceptions on use,
+ // but let the calling code (e.g. getReaderIndex) try another server.
+ $this->errorConnection = $conn;
+ return false;
+ }
+
+ $this->enforceConnectionFlags( $conn, $flags );
+ if ( $serverIndex === $this->getWriterIndex() ) {
// If the load balancer is read-only, perhaps due to replication lag, then master
// DB handles will reflect that. Note that Database::assertIsWritableMaster() takes
// care of replica DB handles whereas getReadOnlyReason() would cause infinite loops.
: self::DB_REPLICA;
}
+ /**
+ * @param int $i
+ * @param bool $domain
+ * @param int $flags
+ * @return Database|bool Live database handle or false on failure
+ * @deprecated Since 1.34 Use getConnection() instead
+ */
public function openConnection( $i, $domain = false, $flags = 0 ) {
- $domain = $this->resolveDomainID( $domain );
-
- if ( !$this->connectionAttempted && $this->chronologyCallback ) {
- // Load any "waitFor" positions before connecting so that doWait() is triggered
- $this->connLogger->debug( __METHOD__ . ': calling initLB() before first connection.' );
- $this->connectionAttempted = true;
- ( $this->chronologyCallback )( $this );
- }
-
- $conn = $this->localDomain->equals( $domain )
- ? $this->openLocalConnection( $i, $flags )
- : $this->openForeignConnection( $i, $domain, $flags );
-
- if ( $conn instanceof IDatabase && !$conn->isOpen() ) {
- // Connection was made but later unrecoverably lost for some reason.
- // Do not return a handle that will just throw exceptions on use,
- // but let the calling code (e.g. getReaderIndex) try another server.
- $this->errorConnection = $conn;
- $conn = false;
- }
-
- if (
- $conn instanceof IDatabase &&
- ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT )
- ) {
- if ( $conn->trxLevel() ) { // sanity
- throw new DBUnexpectedError(
- $conn,
- 'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
- );
- }
-
- $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
- }
-
- return $conn;
+ return $this->getConnection( $i, [], $domain, $flags | self::CONN_SILENCE_ERRORS );
}
/**
* @param int $flags Class CONN_* constant bitfield
* @return Database
*/
- private function openLocalConnection( $i, $flags = 0 ) {
+ private function getLocalConnection( $i, $flags = 0 ) {
// Connection handles required to be in auto-commit mode use a separate connection
// pool since the main pool is effected by implicit and explicit transaction rounds
$autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
if ( isset( $this->conns[$connKey][$i][0] ) ) {
$conn = $this->conns[$connKey][$i][0];
} else {
- if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
- throw new InvalidArgumentException( "No server with index '$i'" );
- }
// Open a new connection
- $server = $this->servers[$i];
+ $server = $this->getServerInfoStrict( $i );
$server['serverIndex'] = $i;
$server['autoCommitOnly'] = $autoCommit;
$conn = $this->reallyOpenConnection( $server, $this->localDomain );
* @return Database|bool Returns false on connection error
* @throws DBError When database selection fails
*/
- private function openForeignConnection( $i, $domain, $flags = 0 ) {
+ private function getForeignConnection( $i, $domain, $flags = 0 ) {
$domainInstance = DatabaseDomain::newFromId( $domain );
// Connection handles required to be in auto-commit mode use a separate connection
// pool since the main pool is effected by implicit and explicit transaction rounds
}
if ( !$conn ) {
- if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
- throw new InvalidArgumentException( "No server with index '$i'" );
- }
// Open a new connection
- $server = $this->servers[$i];
+ $server = $this->getServerInfoStrict( $i );
$server['serverIndex'] = $i;
$server['foreignPoolRefCount'] = 0;
$server['foreign'] = true;
$masterName = $this->getServerName( $this->getWriterIndex() );
$server['clusterMasterHost'] = $masterName;
- // Log when many connection are made on requests
- if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
- $this->perfLogger->warning( __METHOD__ . ": " .
- "{$this->connsOpened}+ connections made (master=$masterName)" );
- }
-
$server['srvCache'] = $this->srvCache;
// Set loggers and profilers
$server['connLogger'] = $this->connLogger;
// Create a live connection object
try {
$db = Database::factory( $server['type'], $server );
+ // Log when many connection are made on requests
+ ++$this->connectionCounter;
+ $currentConnCount = $this->getCurrentConnectionCount();
+ if ( $currentConnCount >= self::CONN_HELD_WARN_THRESHOLD ) {
+ $this->perfLogger->warning(
+ __METHOD__ . ": {connections}+ connections made (master={masterdb})",
+ [ 'connections' => $currentConnCount, 'masterdb' => $masterName ]
+ );
+ }
} catch ( DBConnectionError $e ) {
// FIXME: This is probably the ugliest thing I have ever done to
// PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
}
}
+ $this->lazyLoadReplicationPositions(); // session consistency
+
return $db;
}
+ /**
+ * Make sure that any "waitForPos" positions are loaded and available to doWait()
+ */
+ private function lazyLoadReplicationPositions() {
+ if ( !$this->connectionAttempted && $this->chronologyCallback ) {
+ $this->connectionAttempted = true;
+ ( $this->chronologyCallback )( $this ); // generally calls waitFor()
+ $this->connLogger->debug( __METHOD__ . ': executed chronology callback.' );
+ }
+ }
+
/**
* @throws DBConnectionError
*/
return 0;
}
+ /**
+ * Returns true if the specified index is a valid server index
+ *
+ * @param int $i
+ * @return bool
+ * @deprecated Since 1.34
+ */
public function haveIndex( $i ) {
return array_key_exists( $i, $this->servers );
}
+ /**
+ * Returns true if the specified index is valid and has non-zero load
+ *
+ * @param int $i
+ * @return bool
+ * @deprecated Since 1.34
+ */
public function isNonZeroLoad( $i ) {
return array_key_exists( $i, $this->servers ) && $this->genericLoads[$i] != 0;
}
return count( $this->servers );
}
+ public function hasReplicaServers() {
+ return ( $this->getServerCount() > 1 );
+ }
+
+ public function hasStreamingReplicaServers() {
+ foreach ( $this->servers as $i => $server ) {
+ if ( $i !== $this->getWriterIndex() && empty( $server['is static'] ) ) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
public function getServerName( $i ) {
- $name = $this->servers[$i]['hostName'] ?? $this->servers[$i]['host'] ?? '';
+ $name = $this->servers[$i]['hostName'] ?? ( $this->servers[$i]['host'] ?? '' );
return ( $name != '' ) ? $name : 'localhost';
}
# master (however unlikely that may be), then we can fetch the position from the replica DB.
$masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
if ( !$masterConn ) {
- $serverCount = count( $this->servers );
+ $serverCount = $this->getServerCount();
for ( $i = 1; $i < $serverCount; $i++ ) {
$conn = $this->getAnyOpenConnection( $i );
if ( $conn ) {
$conn->close();
} );
- $this->conns = [
- self::KEY_LOCAL => [],
- self::KEY_FOREIGN_INUSE => [],
- self::KEY_FOREIGN_FREE => [],
- self::KEY_LOCAL_NOROUND => [],
- self::KEY_FOREIGN_INUSE_NOROUND => [],
- self::KEY_FOREIGN_FREE_NOROUND => []
- ];
- $this->connsOpened = 0;
+ $this->conns = self::newTrackedConnectionsArray();
}
public function closeConnection( IDatabase $conn ) {
$this->connLogger->debug(
__METHOD__ . ": closing connection to database $i at '$host'." );
unset( $this->conns[$type][$serverIndex][$i] );
- --$this->connsOpened;
break 2;
}
}
public function getLaggedReplicaMode( $domain = false ) {
if (
// Avoid recursion if there is only one DB
- $this->getServerCount() > 1 &&
+ $this->hasStreamingReplicaServers() &&
// Avoid recursion if the (non-zero load) master DB was picked for generic reads
$this->genericReadIndex !== $this->getWriterIndex() &&
// Stay in lagged replica mode during the load balancer instance lifetime
}
}
+ /**
+ * @return int
+ */
+ private function getCurrentConnectionCount() {
+ $count = 0;
+ foreach ( $this->conns as $connsByServer ) {
+ foreach ( $connsByServer as $serverConns ) {
+ $count += count( $serverConns );
+ }
+ }
+
+ return $count;
+ }
+
public function getMaxLag( $domain = false ) {
- $maxLag = -1;
$host = '';
+ $maxLag = -1;
$maxIndex = 0;
- if ( $this->getServerCount() <= 1 ) {
- return [ $host, $maxLag, $maxIndex ]; // no replication = no lag
- }
-
- $lagTimes = $this->getLagTimes( $domain );
- foreach ( $lagTimes as $i => $lag ) {
- if ( $this->genericLoads[$i] > 0 && $lag > $maxLag ) {
- $maxLag = $lag;
- $host = $this->servers[$i]['host'];
- $maxIndex = $i;
+ if ( $this->hasReplicaServers() ) {
+ $lagTimes = $this->getLagTimes( $domain );
+ foreach ( $lagTimes as $i => $lag ) {
+ if ( $this->genericLoads[$i] > 0 && $lag > $maxLag ) {
+ $maxLag = $lag;
+ $host = $this->getServerInfoStrict( $i, 'host' );
+ $maxIndex = $i;
+ }
}
}
}
public function getLagTimes( $domain = false ) {
- if ( $this->getServerCount() <= 1 ) {
+ if ( !$this->hasReplicaServers() ) {
return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
}
return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
}
+ /**
+ * Get the lag in seconds for a given connection, or zero if this load
+ * balancer does not have replication enabled.
+ *
+ * This should be used in preference to Database::getLag() in cases where
+ * replication may not be in use, since there is no way to determine if
+ * replication is in use at the connection level without running
+ * potentially restricted queries such as SHOW SLAVE STATUS. Using this
+ * function instead of Database::getLag() avoids a fatal error in this
+ * case on many installations.
+ *
+ * @param IDatabase $conn
+ * @return int|bool Returns false on error
+ * @deprecated Since 1.34 Use IDatabase::getLag() instead
+ */
public function safeGetLag( IDatabase $conn ) {
- if ( $this->getServerCount() <= 1 ) {
- return 0;
- } else {
- return $conn->getLag();
+ if ( $conn->getLBInfo( 'is static' ) ) {
+ return 0; // static dataset
+ } elseif ( $conn->getLBInfo( 'serverIndex' ) == $this->getWriterIndex() ) {
+ return 0; // this is the master
}
+
+ return $conn->getLag();
}
- public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
+ public function waitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
$timeout = max( 1, $timeout ?: $this->waitTimeout );
if ( $this->getServerCount() <= 1 || !$conn->getLBInfo( 'replica' ) ) {
if ( !$pos ) {
// Get the current master position, opening a connection if needed
- $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
+ $index = $this->getWriterIndex();
+ $masterConn = $this->getAnyOpenConnection( $index );
if ( $masterConn ) {
$pos = $masterConn->getMasterPos();
} else {
- $masterConn = $this->openConnection( $this->getWriterIndex(), self::DOMAIN_ANY );
+ $flags = self::CONN_SILENCE_ERRORS;
+ $masterConn = $this->getConnection( $index, [], self::DOMAIN_ANY, $flags );
if ( !$masterConn ) {
throw new DBReplicationWaitError(
null,
}
if ( $pos instanceof DBMasterPos ) {
+ $start = microtime( true );
$result = $conn->masterPosWait( $pos, $timeout );
+ $seconds = max( microtime( true ) - $start, 0 );
if ( $result == -1 || is_null( $result ) ) {
- $msg = __METHOD__ . ': timed out waiting on {host} pos {pos}';
+ $msg = __METHOD__ . ': timed out waiting on {host} pos {pos} [{seconds}s]';
$this->replLogger->warning( $msg, [
'host' => $conn->getServer(),
'pos' => $pos,
+ 'seconds' => round( $seconds, 6 ),
'trace' => ( new RuntimeException() )->getTraceAsString()
] );
$ok = false;
return $ok;
}
+ /**
+ * Wait for a replica DB to reach a specified master position
+ *
+ * This will connect to the master to get an accurate position if $pos is not given
+ *
+ * @param IDatabase $conn Replica DB
+ * @param DBMasterPos|bool $pos Master position; default: current position
+ * @param int $timeout Timeout in seconds [optional]
+ * @return bool Success
+ * @since 1.28
+ * @deprecated Since 1.34 Use waitForMasterPos() instead
+ */
+ public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
+ return $this->waitForMasterPos( $conn, $pos, $timeout );
+ }
+
public function setTransactionListener( $name, callable $callback = null ) {
if ( $callback ) {
$this->trxRecurringCallbacks[$name] = $callback;
}
}
+ /**
+ * @param int $i Server index
+ * @param string|null $field Server index field [optional]
+ * @return array|mixed
+ * @throws InvalidArgumentException
+ */
+ private function getServerInfoStrict( $i, $field = null ) {
+ if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
+ throw new InvalidArgumentException( "No server with index '$i'" );
+ }
+
+ if ( $field !== null ) {
+ if ( !array_key_exists( $field, $this->servers[$i] ) ) {
+ throw new InvalidArgumentException( "No field '$field' in server index '$i'" );
+ }
+
+ return $this->servers[$i][$field];
+ }
+
+ return $this->servers[$i];
+ }
+
function __destruct() {
// Avoid connection leaks for sanity
$this->disable();