/** @var callable Deprecation logger */
private $deprecationLogger;
- /** @var DatabaseDomain Local Domain ID and default for selectDB() calls */
+ /** @var DatabaseDomain Local DB domain ID and default for selectDB() calls */
private $localDomain;
/** @var Database[][][] Map of (connection category => server index => IDatabase[]) */
/** @var array[] Map of (server index => server config array) */
private $servers;
/** @var float[] Map of (server index => weight) */
- private $loads;
+ private $genericLoads;
/** @var array[] Map of (group => server index => weight) */
private $groupLoads;
/** @var bool Whether to disregard replica DB lag as a factor in replica DB selection */
private $waitTimeout;
/** @var array The LoadMonitor configuration */
private $loadMonitorConfig;
- /** @var string Alternate ID string for the domain instead of DatabaseDomain::getId() */
+ /** @var string Alternate local DB domain instead of DatabaseDomain::getId() */
private $localDomainIdAlias;
/** @var int */
private $maxLag;
/** @var Database DB connection object that caused a problem */
private $errorConnection;
- /** @var int The generic (not query grouped) replica DB index (of $mServers) */
- private $readIndex;
+ /** @var int The generic (not query grouped) replica DB index */
+ private $genericReadIndex = -1;
+ /** @var int[] The group replica DB indexes keyed by group */
+ private $readIndexByGroup = [];
/** @var bool|DBMasterPos False if not set */
private $waitForPos;
/** @var bool Whether the generic reader fell back to a lagged replica DB */
/** @var bool Whether any connection has been attempted yet */
private $connectionAttempted = false;
+ /** @var int|null An integer ID of the managing LBFactory instance or null */
+ private $ownerId;
/** @var string|bool String if a requested DBO_TRX transaction round is active */
private $trxRoundId = false;
/** @var string Stage of the current transaction round in the transaction round life-cycle */
public function __construct( array $params ) {
if ( !isset( $params['servers'] ) ) {
- throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
+ throw new InvalidArgumentException( __CLASS__ . ': missing "servers" parameter' );
}
$this->servers = $params['servers'];
foreach ( $this->servers as $i => $server ) {
$this->waitTimeout = $params['waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
- $this->readIndex = -1;
$this->conns = [
// Connection were transaction rounds may be applied
self::KEY_LOCAL => [],
self::KEY_FOREIGN_INUSE_NOROUND => [],
self::KEY_FOREIGN_FREE_NOROUND => []
];
- $this->loads = [];
+ $this->genericLoads = [];
$this->waitForPos = false;
$this->allowLagged = false;
$this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ];
foreach ( $params['servers'] as $i => $server ) {
- $this->loads[$i] = $server['load'];
+ $this->genericLoads[$i] = $server['load'];
if ( isset( $server['groupLoads'] ) ) {
foreach ( $server['groupLoads'] as $group => $ratio ) {
if ( !isset( $this->groupLoads[$group] ) ) {
}
$this->defaultGroup = $params['defaultGroup'] ?? null;
+ $this->ownerId = $params['ownerId'] ?? null;
}
public function getLocalDomainID() {
}
public function resolveDomainID( $domain ) {
- return ( $domain !== false ) ? (string)$domain : $this->getLocalDomainID();
+ if ( $domain === $this->localDomainIdAlias || $domain === false ) {
+ // Local connection requested via some backwards-compatibility domain alias
+ return $this->getLocalDomainID();
+ }
+
+ return (string)$domain;
}
/**
return ArrayUtils::pickRandom( $loads );
}
+ /**
+ * @param int $i
+ * @param array $groups
+ * @param string|bool $domain
+ * @return int
+ */
+ private function getConnectionIndex( $i, $groups, $domain ) {
+ // Check one "group" per default: the generic pool
+ $defaultGroups = $this->defaultGroup ? [ $this->defaultGroup ] : [ false ];
+
+ $groups = ( $groups === false || $groups === [] )
+ ? $defaultGroups
+ : (array)$groups;
+
+ if ( $i == self::DB_MASTER ) {
+ $i = $this->getWriterIndex();
+ } elseif ( $i == self::DB_REPLICA ) {
+ # Try to find an available server in any the query groups (in order)
+ foreach ( $groups as $group ) {
+ $groupIndex = $this->getReaderIndex( $group, $domain );
+ if ( $groupIndex !== false ) {
+ $i = $groupIndex;
+ break;
+ }
+ }
+ }
+
+ # Operation-based index
+ if ( $i == self::DB_REPLICA ) {
+ $this->lastError = 'Unknown error'; // reset error string
+ # Try the general server pool if $groups are unavailable.
+ $i = ( $groups === [ false ] )
+ ? false // don't bother with this if that is what was tried above
+ : $this->getReaderIndex( false, $domain );
+ # Couldn't find a working server in getReaderIndex()?
+ if ( $i === false ) {
+ $this->lastError = 'No working replica DB server: ' . $this->lastError;
+ // Throw an exception
+ $this->reportConnectionError();
+ return null; // not reached
+ }
+ }
+
+ return $i;
+ }
+
public function getReaderIndex( $group = false, $domain = false ) {
if ( count( $this->servers ) == 1 ) {
// Skip the load balancing if there's only one server
return $this->getWriterIndex();
- } elseif ( $group === false && $this->readIndex >= 0 ) {
- // Shortcut if the generic reader index was already cached
- return $this->readIndex;
+ }
+
+ $index = $this->getExistingReaderIndex( $group );
+ if ( $index >= 0 ) {
+ // A reader index was already selected and "waitForPos" was handled
+ return $index;
}
if ( $group !== false ) {
}
} else {
// Use the generic load group
- $loads = $this->loads;
+ $loads = $this->genericLoads;
}
// Scale the configured load ratios according to each server's load and state
// Pick a server to use, accounting for weights, load, lag, and "waitForPos"
list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
if ( $i === false ) {
- // Replica DB connection unsuccessful
+ // DB connection unsuccessful
return false;
}
- if ( $this->waitForPos && $i != $this->getWriterIndex() ) {
- // Before any data queries are run, wait for the server to catch up to the
- // specified position. This is used to improve session consistency. Note that
- // when LoadBalancer::waitFor() sets "waitForPos", the waiting triggers here,
- // so update laggedReplicaMode as needed for consistency.
- if ( !$this->doWait( $i ) ) {
- $laggedReplicaMode = true;
- }
+ // If data seen by queries is expected to reflect the transactions committed as of
+ // or after a given replication position then wait for the DB to apply those changes
+ if ( $this->waitForPos && $i != $this->getWriterIndex() && !$this->doWait( $i ) ) {
+ // Data will be outdated compared to what was expected
+ $laggedReplicaMode = true;
}
- if ( $this->readIndex <= 0 && $this->loads[$i] > 0 && $group === false ) {
- // Cache the generic reader index for future ungrouped DB_REPLICA handles
- $this->readIndex = $i;
- // Record if the generic reader index is in "lagged replica DB" mode
- if ( $laggedReplicaMode ) {
- $this->laggedReplicaMode = true;
- }
+ // Cache the reader index for future DB_REPLICA handles
+ $this->setExistingReaderIndex( $group, $i );
+ // Record whether the generic reader index is in "lagged replica DB" mode
+ if ( $group === false && $laggedReplicaMode ) {
+ $this->laggedReplicaMode = true;
}
$serverName = $this->getServerName( $i );
return $i;
}
+ /**
+ * Get the server index chosen by the load balancer for use with the given query group
+ *
+ * @param string|bool $group Query group; use false for the generic group
+ * @return int Server index or -1 if none was chosen
+ */
+ protected function getExistingReaderIndex( $group ) {
+ if ( $group === false ) {
+ $index = $this->genericReadIndex;
+ } else {
+ $index = $this->readIndexByGroup[$group] ?? -1;
+ }
+
+ return $index;
+ }
+
+ /**
+ * Set the server index chosen by the load balancer for use with the given query group
+ *
+ * @param string|bool $group Query group; use false for the generic group
+ * @param int $index The index of a specific server
+ */
+ private function setExistingReaderIndex( $group, $index ) {
+ if ( $index < 0 ) {
+ throw new UnexpectedValueException( "Cannot set a negative read server index" );
+ }
+
+ if ( $group === false ) {
+ $this->genericReadIndex = $index;
+ } else {
+ $this->readIndexByGroup[$group] = $index;
+ }
+ }
+
/**
* @param array $loads List of server weights
* @param string|bool $domain
*/
private function pickReaderIndex( array $loads, $domain = false ) {
if ( $loads === [] ) {
- throw new InvalidArgumentException( "Empty server array given to LoadBalancer" );
+ throw new InvalidArgumentException( "Server configuration array is empty" );
}
/** @var int|bool $i Index of selected server */
try {
$this->waitForPos = $pos;
// If a generic reader connection was already established, then wait now
- $i = $this->readIndex;
- if ( $i > 0 ) {
- if ( !$this->doWait( $i ) ) {
- $this->laggedReplicaMode = true;
- }
+ if ( $this->genericReadIndex > 0 && !$this->doWait( $this->genericReadIndex ) ) {
+ $this->laggedReplicaMode = true;
}
+ // Otherwise, wait until a connection is established in getReaderIndex()
} finally {
// Restore the older position if it was higher since this is used for lag-protection
$this->setWaitForPositionIfHigher( $oldPos );
try {
$this->waitForPos = $pos;
- $i = $this->readIndex;
+ $i = $this->genericReadIndex;
if ( $i <= 0 ) {
// Pick a generic replica DB if there isn't one yet
- $readLoads = $this->loads;
+ $readLoads = $this->genericLoads;
unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
$readLoads = array_filter( $readLoads ); // with non-zero load
$i = ArrayUtils::pickRandom( $readLoads );
$ok = true;
for ( $i = 1; $i < $serverCount; $i++ ) {
- if ( $this->loads[$i] > 0 ) {
+ if ( $this->genericLoads[$i] > 0 ) {
$start = microtime( true );
$ok = $this->doWait( $i, true, $timeout ) && $ok;
$timeout -= intval( microtime( true ) - $start );
public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
if ( $i === null || $i === false ) {
- throw new InvalidArgumentException( 'Attempt to call ' . __METHOD__ .
- ' with invalid server index' );
+ throw new InvalidArgumentException( "Cannot connect without a server index" );
}
- if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
- $domain = false; // local connection requested
- }
+ $domain = $this->resolveDomainID( $domain );
+ $masterOnly = ( $i == self::DB_MASTER || $i == $this->getWriterIndex() );
if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) {
// Assuming all servers are of the same type (or similar), which is overwhelmingly
}
}
- // Check one "group" per default: the generic pool
- $defaultGroups = $this->defaultGroup ? [ $this->defaultGroup ] : [ false ];
-
- $groups = ( $groups === false || $groups === [] )
- ? $defaultGroups
- : (array)$groups;
-
- $masterOnly = ( $i == self::DB_MASTER || $i == $this->getWriterIndex() );
- $oldConnsOpened = $this->connsOpened; // connections open now
-
- if ( $i == self::DB_MASTER ) {
- $i = $this->getWriterIndex();
- } elseif ( $i == self::DB_REPLICA ) {
- # Try to find an available server in any the query groups (in order)
- foreach ( $groups as $group ) {
- $groupIndex = $this->getReaderIndex( $group, $domain );
- if ( $groupIndex !== false ) {
- $i = $groupIndex;
- break;
- }
- }
- }
-
- # Operation-based index
- if ( $i == self::DB_REPLICA ) {
- $this->lastError = 'Unknown error'; // reset error string
- # Try the general server pool if $groups are unavailable.
- $i = ( $groups === [ false ] )
- ? false // don't bother with this if that is what was tried above
- : $this->getReaderIndex( false, $domain );
- # Couldn't find a working server in getReaderIndex()?
- if ( $i === false ) {
- $this->lastError = 'No working replica DB server: ' . $this->lastError;
- // Throw an exception
- $this->reportConnectionError();
- return null; // not reached
- }
- }
-
- # Now we have an explicit index into the servers array
- $conn = $this->openConnection( $i, $domain, $flags );
+ // Number of connections made before getting the server index and handle
+ $priorConnectionsMade = $this->connsOpened;
+ // Decide which server to use (might trigger new connections)
+ $serverIndex = $this->getConnectionIndex( $i, $groups, $domain );
+ // Get an open connection to that server (might trigger a new connection)
+ $conn = $this->openConnection( $serverIndex, $domain, $flags );
if ( !$conn ) {
- // Throw an exception
$this->reportConnectionError();
- return null; // not reached
+ return null; // unreachable due to exception
}
- # Profile any new connections that happen
- if ( $this->connsOpened > $oldConnsOpened ) {
+ // Profile any new connections caused by this method
+ if ( $this->connsOpened > $priorConnectionsMade ) {
$host = $conn->getServer();
$dbname = $conn->getDBname();
$this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
}
- if ( $masterOnly ) {
- # Make master-requested DB handles inherit any read-only mode setting
+ if ( $serverIndex == $this->getWriterIndex() ) {
+ // If the load balancer is read-only, perhaps due to replication lag, then master
+ // DB handles will reflect that. Note that Database::assertIsWritableMaster() takes
+ // care of replica DB handles whereas getReadOnlyReason() would cause infinite loops.
$conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $domain, $conn ) );
}
$domain = $conn->getDomainID();
if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
- throw new InvalidArgumentException( __METHOD__ .
- ": connection $serverIndex/$domain not found; it may have already been freed." );
+ throw new InvalidArgumentException(
+ "Connection $serverIndex/$domain not found; it may have already been freed" );
} elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
- throw new InvalidArgumentException( __METHOD__ .
- ": connection $serverIndex/$domain mismatched; it may have already been freed." );
+ throw new InvalidArgumentException(
+ "Connection $serverIndex/$domain mismatched; it may have already been freed" );
}
$conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
}
}
- public function getConnectionRef( $db, $groups = [], $domain = false, $flags = 0 ) {
+ public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
$domain = $this->resolveDomainID( $domain );
+ $role = $this->getRoleFromIndex( $i );
- return new DBConnRef( $this, $this->getConnection( $db, $groups, $domain, $flags ) );
+ return new DBConnRef( $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
}
- public function getLazyConnectionRef( $db, $groups = [], $domain = false, $flags = 0 ) {
+ public function getLazyConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
$domain = $this->resolveDomainID( $domain );
+ $role = $this->getRoleFromIndex( $i );
- return new DBConnRef( $this, [ $db, $groups, $domain, $flags ] );
+ return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role );
}
- public function getMaintenanceConnectionRef( $db, $groups = [], $domain = false, $flags = 0 ) {
+ public function getMaintenanceConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
$domain = $this->resolveDomainID( $domain );
+ $role = $this->getRoleFromIndex( $i );
return new MaintainableDBConnRef(
- $this, $this->getConnection( $db, $groups, $domain, $flags ) );
+ $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
+ }
+
+ /**
+ * @param int $i Server index or DB_MASTER/DB_REPLICA
+ * @return int One of DB_MASTER/DB_REPLICA
+ */
+ private function getRoleFromIndex( $i ) {
+ return ( $i === self::DB_MASTER || $i === $this->getWriterIndex() )
+ ? self::DB_MASTER
+ : self::DB_REPLICA;
}
public function openConnection( $i, $domain = false, $flags = 0 ) {
- if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
- $domain = false; // local connection requested
- }
+ $domain = $this->resolveDomainID( $domain );
if ( !$this->connectionAttempted && $this->chronologyCallback ) {
- $this->connLogger->debug( __METHOD__ . ': calling initLB() before first connection.' );
// Load any "waitFor" positions before connecting so that doWait() is triggered
+ $this->connLogger->debug( __METHOD__ . ': calling initLB() before first connection.' );
$this->connectionAttempted = true;
( $this->chronologyCallback )( $this );
}
- // Check if an auto-commit connection is being requested. If so, it will not reuse the
- // main set of DB connections but rather its own pool since:
- // a) those are usually set to implicitly use transaction rounds via DBO_TRX
- // b) those must support the use of explicit transaction rounds via beginMasterChanges()
- $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
-
- if ( $domain !== false ) {
- // Connection is to a foreign domain
- $conn = $this->openForeignConnection( $i, $domain, $flags );
- } else {
- // Connection is to the local domain
- $conn = $this->openLocalConnection( $i, $flags );
- }
+ $conn = $this->localDomain->equals( $domain )
+ ? $this->openLocalConnection( $i, $flags )
+ : $this->openForeignConnection( $i, $domain, $flags );
if ( $conn instanceof IDatabase && !$conn->isOpen() ) {
// Connection was made but later unrecoverably lost for some reason.
// Do not return a handle that will just throw exceptions on use,
// but let the calling code (e.g. getReaderIndex) try another server.
- // See DatabaseMyslBase::ping() for how this can happen.
$this->errorConnection = $conn;
$conn = false;
}
- if ( $autoCommit && $conn instanceof IDatabase ) {
+ if (
+ $conn instanceof IDatabase &&
+ ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT )
+ ) {
if ( $conn->trxLevel() ) { // sanity
throw new DBUnexpectedError(
$conn,
- __METHOD__ . ': CONN_TRX_AUTOCOMMIT handle has a transaction.'
+ 'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
);
}
* @return Database
*/
private function openLocalConnection( $i, $flags = 0 ) {
+ // Connection handles required to be in auto-commit mode use a separate connection
+ // pool since the main pool is effected by implicit and explicit transaction rounds
$autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
$connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
$conn = $this->conns[$connKey][$i][0];
} else {
if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
- throw new InvalidArgumentException( "No server with index '$i'." );
+ throw new InvalidArgumentException( "No server with index '$i'" );
}
// Open a new connection
$server = $this->servers[$i];
) {
throw new UnexpectedValueException(
"Got connection to '{$conn->getDomainID()}', " .
- "but expected local domain ('{$this->localDomain}')." );
+ "but expected local domain ('{$this->localDomain}')" );
}
return $conn;
*/
private function openForeignConnection( $i, $domain, $flags = 0 ) {
$domainInstance = DatabaseDomain::newFromId( $domain );
+ // Connection handles required to be in auto-commit mode use a separate connection
+ // pool since the main pool is effected by implicit and explicit transaction rounds
$autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
if ( $autoCommit ) {
}
unset( $this->conns[$connFreeKey][$i][$oldDomain] );
// Note that if $domain is an empty string, getDomainID() might not match it
- $this->conns[$connInUseKey][$i][$conn->getDomainId()] = $conn;
+ $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
$this->connLogger->debug( __METHOD__ .
": reusing free connection from $oldDomain for $domain" );
break;
if ( !$conn ) {
if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
- throw new InvalidArgumentException( "No server with index '$i'." );
+ throw new InvalidArgumentException( "No server with index '$i'" );
}
// Open a new connection
$server = $this->servers[$i];
// Final sanity check to make sure the right domain is selected
if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
throw new UnexpectedValueException(
- "Got connection to '{$conn->getDomainID()}', but expected '$domain'." );
+ "Got connection to '{$conn->getDomainID()}', but expected '$domain'" );
}
// Increment reference count
$refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
}
public function isNonZeroLoad( $i ) {
- return array_key_exists( $i, $this->servers ) && $this->loads[$i] != 0;
+ return array_key_exists( $i, $this->servers ) && $this->genericLoads[$i] != 0;
}
public function getServerCount() {
public function closeConnection( IDatabase $conn ) {
if ( $conn instanceof DBConnRef ) {
// Avoid calling close() but still leaving the handle in the pool
- throw new RuntimeException( __METHOD__ . ': got DBConnRef instance.' );
+ throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' );
}
$serverIndex = $conn->getLBInfo( 'serverIndex' );
$conn->close();
}
- public function commitAll( $fname = __METHOD__ ) {
- $this->commitMasterChanges( $fname );
+ public function commitAll( $fname = __METHOD__, $owner = null ) {
+ $this->commitMasterChanges( $fname, $owner );
$this->flushMasterSnapshots( $fname );
$this->flushReplicaSnapshots( $fname );
}
- public function finalizeMasterChanges() {
+ public function finalizeMasterChanges( $fname = __METHOD__, $owner = null ) {
+ $this->assertOwnership( $fname, $owner );
$this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
$this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
return $total;
}
- public function approveMasterChanges( array $options ) {
+ public function approveMasterChanges( array $options, $fname = __METHOD__, $owner = null ) {
+ $this->assertOwnership( $fname, $owner );
$this->assertTransactionRoundStage( self::ROUND_FINALIZED );
$limit = $options['maxWriteDuration'] ?? 0;
if ( $limit > 0 && $time > $limit ) {
throw new DBTransactionSizeError(
$conn,
- "Transaction spent $time second(s) in writes, exceeding the limit of $limit.",
+ "Transaction spent $time second(s) in writes, exceeding the limit of $limit",
[ $time, $limit ]
);
}
if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
throw new DBTransactionError(
$conn,
- "A connection to the {$conn->getDBname()} database was lost before commit."
+ "A connection to the {$conn->getDBname()} database was lost before commit"
);
}
} );
$this->trxRoundStage = self::ROUND_APPROVED;
}
- public function beginMasterChanges( $fname = __METHOD__ ) {
+ public function beginMasterChanges( $fname = __METHOD__, $owner = null ) {
+ $this->assertOwnership( $fname, $owner );
if ( $this->trxRoundId !== false ) {
throw new DBTransactionError(
null,
- "$fname: Transaction round '{$this->trxRoundId}' already started."
+ "$fname: Transaction round '{$this->trxRoundId}' already started"
);
}
$this->assertTransactionRoundStage( self::ROUND_CURSORY );
$this->trxRoundStage = self::ROUND_CURSORY;
}
- public function commitMasterChanges( $fname = __METHOD__ ) {
+ public function commitMasterChanges( $fname = __METHOD__, $owner = null ) {
+ $this->assertOwnership( $fname, $owner );
$this->assertTransactionRoundStage( self::ROUND_APPROVED );
$failures = [];
$this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
}
- public function runMasterTransactionIdleCallbacks() {
+ public function runMasterTransactionIdleCallbacks( $fname = __METHOD__, $owner = null ) {
+ $this->assertOwnership( $fname, $owner );
if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
$type = IDatabase::TRIGGER_COMMIT;
} elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
return $e;
}
- public function runMasterTransactionListenerCallbacks() {
+ public function runMasterTransactionListenerCallbacks( $fname = __METHOD__, $owner = null ) {
+ $this->assertOwnership( $fname, $owner );
if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
$type = IDatabase::TRIGGER_COMMIT;
} elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
return $e;
}
- public function rollbackMasterChanges( $fname = __METHOD__ ) {
+ public function rollbackMasterChanges( $fname = __METHOD__, $owner = null ) {
+ $this->assertOwnership( $fname, $owner );
+
$restore = ( $this->trxRoundId !== false );
$this->trxRoundId = false;
$this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
/**
* @param string|string[] $stage
+ * @throws DBTransactionError
*/
private function assertTransactionRoundStage( $stage ) {
$stages = (array)$stage;
}
}
+ /**
+ * @param string $fname
+ * @param int|null $owner Owner ID of the caller
+ * @throws DBTransactionError
+ */
+ private function assertOwnership( $fname, $owner ) {
+ if ( $this->ownerId !== null && $owner !== $this->ownerId ) {
+ throw new DBTransactionError(
+ null,
+ "$fname: LoadBalancer is owned by LBFactory #{$this->ownerId} (got '$owner')."
+ );
+ }
+ }
+
/**
* Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round
*
}
public function getLaggedReplicaMode( $domain = false ) {
- // No-op if there is only one DB (also avoids recursion)
- if ( !$this->laggedReplicaMode && $this->getServerCount() > 1 ) {
+ if (
+ // Avoid recursion if there is only one DB
+ $this->getServerCount() > 1 &&
+ // Avoid recursion if the (non-zero load) master DB was picked for generic reads
+ $this->genericReadIndex !== $this->getWriterIndex() &&
+ // Stay in lagged replica mode during the load balancer instance lifetime
+ !$this->laggedReplicaMode
+ ) {
try {
- // See if laggedReplicaMode gets set
- $conn = $this->getConnection( self::DB_REPLICA, false, $domain );
- $this->reuseConnection( $conn );
+ // Calling this method will set "laggedReplicaMode" as needed
+ $this->getReaderIndex( false, $domain );
} catch ( DBConnectionError $e ) {
// Avoid expensive re-connect attempts and failures
$this->allReplicasDownMode = true;
$lagTimes = $this->getLagTimes( $domain );
foreach ( $lagTimes as $i => $lag ) {
- if ( $this->loads[$i] > 0 && $lag > $maxLag ) {
+ if ( $this->genericLoads[$i] > 0 && $lag > $maxLag ) {
$maxLag = $lag;
$host = $this->servers[$i]['host'];
$maxIndex = $i;
if ( $domainsInUse ) {
$domains = implode( ', ', $domainsInUse );
throw new DBUnexpectedError( null,
- "Foreign domain connections are still in use ($domains)." );
+ "Foreign domain connections are still in use ($domains)" );
}
$this->setLocalDomain( new DatabaseDomain(