* @file
* @ingroup Database
*/
+namespace Wikimedia\Rdbms;
+
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Wikimedia\ScopedCallback;
-use Wikimedia\Rdbms\TransactionProfiler;
-use Wikimedia\Rdbms\ILoadMonitor;
-use Wikimedia\Rdbms\DatabaseDomain;
-use Wikimedia\Rdbms\ILoadBalancer;
-use Wikimedia\Rdbms\DBMasterPos;
+use Database;
+use BagOStuff;
+use EmptyBagOStuff;
+use WANObjectCache;
+use ArrayUtils;
+use DBError;
+use DBAccessError;
+use DBExpectedError;
+use DBUnexpectedError;
+use DBTransactionError;
+use DBTransactionSizeError;
+use DBConnectionError;
+use InvalidArgumentException;
+use RuntimeException;
+use Exception;
/**
* Database connection, tracking, load balancing, and transaction manager for a cluster
class LoadBalancer implements ILoadBalancer {
/** @var array[] Map of (server index => server config array) */
private $mServers;
- /** @var IDatabase[][][] Map of local/foreignUsed/foreignFree => server index => IDatabase array */
+ /** @var \Database[][][] Map of local/foreignUsed/foreignFree => server index => IDatabase array */
private $mConns;
/** @var float[] Map of (server index => weight) */
private $mLoads;
/** @var ILoadMonitor */
private $loadMonitor;
+ /** @var ChronologyProtector|null */
+ private $chronProt;
/** @var BagOStuff */
private $srvCache;
/** @var BagOStuff */
/** @var LoggerInterface */
protected $perfLogger;
- /** @var bool|IDatabase Database connection that caused a problem */
- private $mErrorConnection;
+ /** @var \Database Database connection that caused a problem */
+ private $errorConnection;
/** @var integer The generic (not query grouped) replica DB index (of $mServers) */
private $mReadIndex;
/** @var bool|DBMasterPos False if not set */
/** @var boolean */
private $disabled = false;
+ /** @var boolean */
+ private $chronProtInitialized = false;
/** @var integer Warn when this many connection are held */
const CONN_HELD_WARN_THRESHOLD = 10;
];
$this->mLoads = [];
$this->mWaitForPos = false;
- $this->mErrorConnection = false;
$this->mAllowLagged = false;
if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
: ( gethostname() ?: 'unknown' );
$this->cliMode = isset( $params['cliMode'] ) ? $params['cliMode'] : PHP_SAPI === 'cli';
$this->agent = isset( $params['agent'] ) ? $params['agent'] : '';
+
+ if ( isset( $params['chronologyProtector'] ) ) {
+ $this->chronProt = $params['chronologyProtector'];
+ }
}
/**
private function getLoadMonitor() {
if ( !isset( $this->loadMonitor ) ) {
$compat = [
- 'LoadMonitor' => Wikimedia\Rdbms\LoadMonitor::class,
- 'LoadMonitorNull' => Wikimedia\Rdbms\LoadMonitorNull::class,
- 'LoadMonitorMySQL' => Wikimedia\Rdbms\LoadMonitorMySQL::class,
+ 'LoadMonitor' => LoadMonitor::class,
+ 'LoadMonitorNull' => LoadMonitorNull::class,
+ 'LoadMonitorMySQL' => LoadMonitorMySQL::class,
];
$class = $this->loadMonitorConfig['class'];
return $i;
}
- /**
- * @param DBMasterPos|false $pos
- */
public function waitFor( $pos ) {
+ $oldPos = $this->mWaitForPos;
$this->mWaitForPos = $pos;
- $i = $this->mReadIndex;
+ // If a generic reader connection was already established, then wait now
+ $i = $this->mReadIndex;
if ( $i > 0 ) {
if ( !$this->doWait( $i ) ) {
$this->laggedReplicaMode = true;
}
}
+
+ // Restore the older position if it was higher
+ $this->setWaitForPositionIfHigher( $oldPos );
}
public function waitForOne( $pos, $timeout = null ) {
+ $oldPos = $this->mWaitForPos;
$this->mWaitForPos = $pos;
$i = $this->mReadIndex;
$ok = true; // no applicable loads
}
+ // Restore the older position if it was higher
+ $this->setWaitForPositionIfHigher( $oldPos );
+
return $ok;
}
public function waitForAll( $pos, $timeout = null ) {
+ $oldPos = $this->mWaitForPos;
$this->mWaitForPos = $pos;
$serverCount = count( $this->mServers );
}
}
+ // Restore the older position if it was higher
+ $this->setWaitForPositionIfHigher( $oldPos );
+
return $ok;
}
+ /**
+ * @param DBMasterPos|bool $pos
+ */
+ private function setWaitForPositionIfHigher( $pos ) {
+ if ( !$pos ) {
+ return;
+ }
+
+ if ( !$this->mWaitForPos || $pos->hasReached( $this->mWaitForPos ) ) {
+ $this->mWaitForPos = $pos;
+ }
+ }
+
/**
* @param int $i
* @return IDatabase|bool
// Check if we already know that the DB has reached this point
$server = $this->getServerName( $index );
- $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server );
+ $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server, 'v1' );
/** @var DBMasterPos $knownReachedPos */
$knownReachedPos = $this->srvCache->get( $key );
if (
$domain = false; // local connection requested
}
+ if ( !$this->chronProtInitialized && $this->chronProt ) {
+ $this->connLogger->debug( __METHOD__ . ': calling initLB() before first connection.' );
+ // Load CP positions before connecting so that doWait() triggers later if needed
+ $this->chronProtInitialized = true;
+ $this->chronProt->initLB( $this );
+ }
+
if ( $domain !== false ) {
$conn = $this->openForeignConnection( $i, $domain );
} elseif ( isset( $this->mConns['local'][$i][0] ) ) {
$this->mConns['local'][$i][0] = $conn;
} else {
$this->connLogger->warning( "Failed to connect to database $i at '$serverName'." );
- $this->mErrorConnection = $conn;
+ $this->errorConnection = $conn;
$conn = false;
}
}
- if ( $conn && !$conn->isOpen() ) {
+ if ( $conn instanceof IDatabase && !$conn->isOpen() ) {
// Connection was made but later unrecoverably lost for some reason.
// Do not return a handle that will just throw exceptions on use,
// but let the calling code (e.g. getReaderIndex) try another server.
// See DatabaseMyslBase::ping() for how this can happen.
- $this->mErrorConnection = $conn;
+ $this->errorConnection = $conn;
$conn = false;
}
* it has been freed first with reuseConnection().
*
* On error, returns false, and the connection which caused the
- * error will be available via $this->mErrorConnection.
+ * error will be available via $this->errorConnection.
*
* @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
*
if ( strlen( $dbName ) && !$conn->selectDB( $dbName ) ) {
$this->mLastError = "Error selecting database '$dbName' on server " .
$conn->getServer() . " from client host {$this->host}";
- $this->mErrorConnection = $conn;
+ $this->errorConnection = $conn;
$conn = false;
} else {
$conn->tablePrefix( $prefix );
$conn = $this->reallyOpenConnection( $server, $dbName );
if ( !$conn->isOpen() ) {
$this->connLogger->warning( __METHOD__ . ": connection error for $i/$domain" );
- $this->mErrorConnection = $conn;
+ $this->errorConnection = $conn;
$conn = false;
} else {
$conn->tablePrefix( $prefix );
}
// Increment reference count
- if ( $conn ) {
+ if ( $conn instanceof IDatabase ) {
$refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
$conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
}
* @throws DBConnectionError
*/
private function reportConnectionError() {
- $conn = $this->mErrorConnection; // the connection which caused the error
+ $conn = $this->errorConnection; // the connection which caused the error
$context = [
'method' => __METHOD__,
'last_error' => $this->mLastError,
];
- if ( !is_object( $conn ) ) {
- // No last connection, probably due to all servers being too busy
- $this->connLogger->error(
- "LB failure with no last connection. Connection error: {last_error}",
- $context
- );
-
- // If all servers were busy, mLastError will contain something sensible
- throw new DBConnectionError( null, $this->mLastError );
- } else {
+ if ( $conn instanceof IDatabase ) {
$context['db_server'] = $conn->getServer();
$this->connLogger->warning(
"Connection error: {last_error} ({db_server})",
// throws DBConnectionError
$conn->reportConnectionError( "{$this->mLastError} ({$context['db_server']})" );
+ } else {
+ // No last connection, probably due to all servers being too busy
+ $this->connLogger->error(
+ "LB failure with no last connection. Connection error: {last_error}",
+ $context
+ );
+
+ // If all servers were busy, mLastError will contain something sensible
+ throw new DBConnectionError( null, $this->mLastError );
}
}
/**
* @param string $domain Domain ID, or false for the current domain
- * @param IDatabase|null DB master connectionl used to avoid loops [optional]
+ * @param IDatabase|null $conn DB master connectionl used to avoid loops [optional]
* @return bool
*/
private function masterRunningReadOnly( $domain, IDatabase $conn = null ) {
$this->disable();
}
}
+
+class_alias( LoadBalancer::class, 'LoadBalancer' );