private $loadMonitorConfig;
/** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
private $tableAliases = [];
+ /** @var string[] Map of (index alias => index) */
+ private $indexAliases = [];
/** @var ILoadMonitor */
private $loadMonitor;
$host = $this->getServerName( $i );
if ( $lag === false && !is_infinite( $maxServerLag ) ) {
$this->replLogger->error(
- "Server {host} is not replicating?", [ 'host' => $host ] );
+ __METHOD__ .
+ ": server {host} is not replicating?", [ 'host' => $host ] );
unset( $loads[$i] );
} elseif ( $lag > $maxServerLag ) {
$this->replLogger->info(
- "Server {host} has {lag} seconds of lag (>= {maxlag})",
+ __METHOD__ .
+ ": server {host} has {lag} seconds of lag (>= {maxlag})",
[ 'host' => $host, 'lag' => $lag, 'maxlag' => $maxServerLag ]
);
unset( $loads[$i] );
}
if ( $i === false && count( $currentLoads ) != 0 ) {
// All replica DBs lagged. Switch to read-only mode
- $this->replLogger->error( "All replica DBs lagged. Switch to read-only mode" );
+ $this->replLogger->error(
+ __METHOD__ . ": all replica DBs lagged. Switch to read-only mode" );
$i = ArrayUtils::pickRandom( $currentLoads );
$laggedReplicaMode = true;
}
// If all servers were down, quit now
if ( !count( $currentLoads ) ) {
- $this->connLogger->error( "All servers down" );
+ $this->connLogger->error( __METHOD__ . ": all servers down" );
}
return [ $i, $laggedReplicaMode ];
if ( $this->loads[$i] > 0 ) {
$start = microtime( true );
$ok = $this->doWait( $i, true, $timeout ) && $ok;
- $timeout -= ( microtime( true ) - $start );
+ $timeout -= intval( microtime( true ) - $start );
if ( $timeout <= 0 ) {
break; // timeout reached
}
$this->replLogger->info(
__METHOD__ .
- ': Waiting for replica DB {dbserver} to catch up...',
+ ': waiting for replica DB {dbserver} to catch up...',
[ 'dbserver' => $server ]
);
);
$ok = false;
} else {
- $this->replLogger->info( __METHOD__ . ": Done" );
+ $this->replLogger->debug( __METHOD__ . ": done waiting" );
$ok = true;
// Remember that the DB reached this point
$this->srvCache->set( $key, $this->waitForPos, BagOStuff::TTL_DAY );
$domain = false; // local connection requested
}
+ if ( ( $flags & self::CONN_TRX_AUTO ) === self::CONN_TRX_AUTO ) {
+ // Assuming all servers are of the same type (or similar), which is overwhelmingly
+ // the case, use the master server information to get the attributes. The information
+ // for $i cannot be used since it might be DB_REPLICA, which might require connection
+ // attempts in order to be resolved into a real server index.
+ $attributes = $this->getServerAttributes( $this->getWriterIndex() );
+ if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
+ // Callers sometimes want to (a) escape REPEATABLE-READ stateness without locking
+ // rows (e.g. FOR UPDATE) or (b) make small commits during a larger transactions
+ // to reduce lock contention. None of these apply for sqlite and using separate
+ // connections just causes self-deadlocks.
+ $flags &= ~self::CONN_TRX_AUTO;
+ $this->connLogger->info( __METHOD__ . ': ignoring CONN_TRX_AUTO to avoid deadlocks.' );
+ }
+ }
+
$groups = ( $groups === false || $groups === [] )
? [ false ] // check one "group": the generic pool
: (array)$groups;
return $conn;
}
- public function reuseConnection( $conn ) {
+ public function reuseConnection( IDatabase $conn ) {
$serverIndex = $conn->getLBInfo( 'serverIndex' );
$refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
if ( $serverIndex === null || $refCount === null ) {
} elseif ( $conn instanceof DBConnRef ) {
// DBConnRef already handles calling reuseConnection() and only passes the live
// Database instance to this method. Any caller passing in a DBConnRef is broken.
- $this->connLogger->error( __METHOD__ . ": got DBConnRef instance.\n" .
+ $this->connLogger->error(
+ __METHOD__ . ": got DBConnRef instance.\n" .
( new RuntimeException() )->getTraceAsString() );
return;
$conn = $this->reallyOpenConnection( $server, $this->localDomain );
$host = $this->getServerName( $i );
if ( $conn->isOpen() ) {
- $this->connLogger->debug( "Connected to database $i at '$host'." );
+ $this->connLogger->debug(
+ __METHOD__ . ": connected to database $i at '$host'." );
$this->conns[$connKey][$i][0] = $conn;
} else {
- $this->connLogger->warning( "Failed to connect to database $i at '$host'." );
+ $this->connLogger->warning(
+ __METHOD__ . ": failed to connect to database $i at '$host'." );
$this->errorConnection = $conn;
$conn = false;
}
return $conn;
}
+ public function getServerAttributes( $i ) {
+ return Database::attributesFromType(
+ $this->getServerType( $i ),
+ isset( $this->servers[$i]['driver'] ) ? $this->servers[$i]['driver'] : null
+ );
+ }
+
/**
* Test if the specified index represents an open connection
*
$this->getLazyConnectionRef( self::DB_MASTER, [], $db->getDomainID() )
);
$db->setTableAliases( $this->tableAliases );
+ $db->setIndexAliases( $this->indexAliases );
if ( $server['serverIndex'] === $this->getWriterIndex() ) {
if ( $this->trxRoundId !== false ) {
if ( $conn instanceof IDatabase ) {
$context['db_server'] = $conn->getServer();
$this->connLogger->warning(
- "Connection error: {last_error} ({db_server})",
+ __METHOD__ . ": connection error: {last_error} ({db_server})",
$context
);
} else {
// No last connection, probably due to all servers being too busy
$this->connLogger->error(
- "LB failure with no last connection. Connection error: {last_error}",
+ __METHOD__ .
+ ": LB failure with no last connection. Connection error: {last_error}",
$context
);
public function closeAll() {
$this->forEachOpenConnection( function ( IDatabase $conn ) {
$host = $conn->getServer();
- $this->connLogger->debug( "Closing connection to database '$host'." );
+ $this->connLogger->debug(
+ __METHOD__ . ": closing connection to database '$host'." );
$conn->close();
} );
foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
if ( $conn === $trackedConn ) {
$host = $this->getServerName( $i );
- $this->connLogger->debug( "Closing connection to database $i at '$host'." );
+ $this->connLogger->debug(
+ __METHOD__ . ": closing connection to database $i at '$host'." );
unset( $this->conns[$type][$serverIndex][$i] );
--$this->connsOpened;
break 2;
$e = null; // first exception
$this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
$conn->setTrxEndCallbackSuppression( false );
- if ( $conn->writesOrCallbacksPending() ) {
- // This happens if onTransactionIdle() callbacks leave callbacks on *another* DB
- // (which finished its callbacks already). Warn and recover in this case. Let the
- // callbacks run in the final commitMasterChanges() in LBFactory::shutdown().
- $this->queryLogger->info( __METHOD__ . ": found writes/callbacks pending." );
+ // Callbacks run in AUTO-COMMIT mode, so make sure no transactions are pending...
+ if ( $conn->writesPending() ) {
+ // This happens if onTransactionIdle() callbacks write to *other* handles
+ // (which already finished their callbacks). Let any callbacks run in the final
+ // commitMasterChanges() in LBFactory::shutdown(), when the transaction is gone.
+ $this->queryLogger->warning( __METHOD__ . ": found writes pending." );
return;
} elseif ( $conn->trxLevel() ) {
// This happens for single-DB setups where DB_REPLICA uses the master DB,
$this->trxRoundId = false;
$this->forEachOpenMasterConnection(
function ( IDatabase $conn ) use ( $fname, $restore ) {
- if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
- $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
- }
+ $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
if ( $restore ) {
$this->undoTransactionRoundFlags( $conn );
}
if ( $pos instanceof DBMasterPos ) {
$result = $conn->masterPosWait( $pos, $timeout );
if ( $result == -1 || is_null( $result ) ) {
- $msg = __METHOD__ . ': Timed out waiting on {host} pos {pos}';
+ $msg = __METHOD__ . ': timed out waiting on {host} pos {pos}';
$this->replLogger->warning( $msg, [
'host' => $conn->getServer(),
'pos' => $pos,
] );
$ok = false;
} else {
- $this->replLogger->info( __METHOD__ . ': Done' );
+ $this->replLogger->debug( __METHOD__ . ': done waiting' );
$ok = true;
}
} else {
$this->tableAliases = $aliases;
}
+ public function setIndexAliases( array $aliases ) {
+ $this->indexAliases = $aliases;
+ }
+
public function setDomainPrefix( $prefix ) {
// Find connections to explicit foreign domains still marked as in-use...
$domainsInUse = [];