private $mLoadMonitorClass;
/** @var LoadMonitor */
private $mLoadMonitor;
+ /** @var BagOStuff */
+ private $srvCache;
/** @var bool|DatabaseBase Database connection that caused a problem */
private $mErrorConnection;
/** @var integer Warn when this many connection are held */
const CONN_HELD_WARN_THRESHOLD = 10;
/** @var integer Default 'max lag' when unspecified */
- const MAX_LAG = 30;
+ const MAX_LAG = 10;
+ /** @var integer Max time to wait for a slave to catch up (e.g. ChronologyProtector) */
+ const POS_WAIT_TIMEOUT = 10;
/**
* @param array $params Array with keys:
throw new MWException( __CLASS__ . ': missing servers parameter' );
}
$this->mServers = $params['servers'];
- $this->mWaitTimeout = 10;
+ $this->mWaitTimeout = self::POS_WAIT_TIMEOUT;
$this->mReadIndex = -1;
$this->mWriteIndex = -1;
}
}
}
+
+ $this->srvCache = ObjectCache::getLocalServerInstance();
}
/**
return false;
}
- # wfDebugLog( 'connect', var_export( $loads, true ) );
-
# Return a random representative of the remainder
return ArrayUtils::pickRandom( $loads );
}
$nonErrorLoads = $this->mGroupLoads[$group];
} else {
# No loads for this group, return false and the caller can use some other group
- wfDebug( __METHOD__ . ": no loads for group $group\n" );
+ wfDebugLog( 'connect', __METHOD__ . ": no loads for group $group\n" );
return false;
}
}
}
$serverName = $this->getServerName( $i );
- wfDebug( __METHOD__ . ": using server $serverName for group '$group'\n" );
+ wfDebugLog( 'connect', __METHOD__ .
+ ": using server $serverName for group '$group'\n" );
}
return $i;
protected function doWait( $index, $open = false, $timeout = null ) {
$close = false; // close the connection afterwards
- # Find a connection to wait on, creating one if needed and allowed
+ // Check if we already know that the DB has reached this point
+ $server = $this->getServerName( $index );
+ $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server );
+ /** @var DBMasterPos $knownReachedPos */
+ $knownReachedPos = $this->srvCache->get( $key );
+ if ( $knownReachedPos && $knownReachedPos->hasReached( $this->mWaitForPos ) ) {
+ wfDebugLog( 'replication', __METHOD__ .
+ ": slave $server known to be caught up (pos >= $knownReachedPos).\n" );
+ return true;
+ }
+
+ // Find a connection to wait on, creating one if needed and allowed
$conn = $this->getAnyOpenConnection( $index );
if ( !$conn ) {
if ( !$open ) {
- wfDebug( __METHOD__ . ": no connection open\n" );
+ wfDebugLog( 'replication', __METHOD__ . ": no connection open for $server\n" );
return false;
} else {
$conn = $this->openConnection( $index, '' );
if ( !$conn ) {
- wfDebug( __METHOD__ . ": failed to open connection\n" );
+ wfDebugLog( 'replication', __METHOD__ . ": failed to connect to $server\n" );
return false;
}
}
}
- wfDebug( __METHOD__ . ": Waiting for slave #$index to catch up...\n" );
+ wfDebugLog( 'replication', __METHOD__ . ": Waiting for slave $server to catch up...\n" );
$timeout = $timeout ?: $this->mWaitTimeout;
$result = $conn->masterPosWait( $this->mWaitForPos, $timeout );
if ( $result == -1 || is_null( $result ) ) {
- # Timed out waiting for slave, use master instead
- $server = $server = $this->getServerName( $index );
+ // Timed out waiting for slave, use master instead
$msg = __METHOD__ . ": Timed out waiting on $server pos {$this->mWaitForPos}";
- wfDebug( "$msg\n" );
+ wfDebugLog( 'replication', "$msg\n" );
wfDebugLog( 'DBPerformance', "$msg:\n" . wfBacktrace( true ) );
$ok = false;
} else {
- wfDebug( __METHOD__ . ": Done\n" );
+ wfDebugLog( 'replication', __METHOD__ . ": Done\n" );
$ok = true;
+ // Remember that the DB reached this point
+ $this->srvCache->set( $key, $this->mWaitForPos, BagOStuff::TTL_DAY );
}
if ( $close ) {
$refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
if ( $serverIndex === null || $refCount === null ) {
wfDebug( __METHOD__ . ": this connection was not opened as a foreign connection\n" );
-
/**
* This can happen in code like:
* foreach ( $dbs as $db ) {
* When a connection to the local DB is opened in this way, reuseConnection()
* should be ignored
*/
-
return;
}
$conn = $this->reallyOpenConnection( $server, false );
$serverName = $this->getServerName( $i );
if ( $conn->isOpen() ) {
- wfDebug( "Connected to database $i at $serverName\n" );
+ wfDebugLog( 'connect', "Connected to database $i at $serverName\n" );
$this->mConns['local'][$i][0] = $conn;
} else {
- wfDebug( "Failed to connect to database $i at $serverName\n" );
+ wfDebugLog( 'connect', "Failed to connect to database $i at $serverName\n" );
$this->mErrorConnection = $conn;
$conn = false;
}
for ( $i = 1; $i < $serverCount; $i++ ) {
$conn = $this->getAnyOpenConnection( $i );
if ( $conn ) {
- wfDebug( "Master pos fetched from slave\n" );
-
return $conn->getSlavePos();
}
}
} else {
- wfDebug( "Master pos fetched from master\n" );
-
return $masterConn->getMasterPos();
}
/**
* Commit transactions on all open connections
+ * @param string $fname Caller name
*/
- public function commitAll() {
+ public function commitAll( $fname = __METHOD__ ) {
foreach ( $this->mConns as $conns2 ) {
foreach ( $conns2 as $conns3 ) {
/** @var DatabaseBase[] $conns3 */
foreach ( $conns3 as $conn ) {
if ( $conn->trxLevel() ) {
- $conn->commit( __METHOD__, 'flush' );
+ $conn->commit( $fname, 'flush' );
}
}
}
}
/**
- * Issue COMMIT only on master, only if queries were done on connection
+ * Issue COMMIT only on master, only if queries were done on connection
+ * @param string $fname Caller name
*/
- public function commitMasterChanges() {
+ public function commitMasterChanges( $fname = __METHOD__ ) {
$masterIndex = $this->getWriterIndex();
foreach ( $this->mConns as $conns2 ) {
if ( empty( $conns2[$masterIndex] ) ) {
/** @var DatabaseBase $conn */
foreach ( $conns2[$masterIndex] as $conn ) {
if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
- $conn->commit( __METHOD__, 'flush' );
+ $conn->commit( $fname, 'flush' );
}
}
}
/**
* Issue ROLLBACK only on master, only if queries were done on connection
+ * @param string $fname Caller name
+ * @throws DBExpectedError
* @since 1.23
*/
- public function rollbackMasterChanges() {
+ public function rollbackMasterChanges( $fname = __METHOD__ ) {
$failedServers = array();
$masterIndex = $this->getWriterIndex();
foreach ( $conns2[$masterIndex] as $conn ) {
if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
try {
- $conn->rollback( __METHOD__, 'flush' );
+ $conn->rollback( $fname, 'flush' );
} catch ( DBError $e ) {
MWExceptionHandler::logException( $e );
$failedServers[] = $conn->getServer();
if ( !$this->laggedSlaveMode && $this->getServerCount() > 1 ) {
try {
// See if laggedSlaveMode gets set
- $this->getConnection( DB_SLAVE, false, $wiki );
+ $conn = $this->getConnection( DB_SLAVE, false, $wiki );
+ $this->reuseConnection( $conn );
} catch ( DBConnectionError $e ) {
// Avoid expensive re-connect attempts and failures
$this->slavesDownMode = true;