X-Git-Url: https://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Flibs%2Frdbms%2FChronologyProtector.php;h=e11528659d4a9334d3fed6466b4b9b7514e05e38;hb=455f10a8aa01fa4030e5663a5e4dc01ebda95a4e;hp=8121654e3309df8d49fab5c240e132055dbc8005;hpb=5fa4cdf860c79b32ab6ef034c6d9420c2727f695;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/libs/rdbms/ChronologyProtector.php b/includes/libs/rdbms/ChronologyProtector.php index 8121654e33..e11528659d 100644 --- a/includes/libs/rdbms/ChronologyProtector.php +++ b/includes/libs/rdbms/ChronologyProtector.php @@ -43,10 +43,10 @@ class ChronologyProtector implements LoggerAwareInterface { protected $key; /** @var string Hash of client parameters */ protected $clientId; - /** @var float|null Minimum UNIX timestamp of 1+ expected startup positions */ - protected $waitForPosTime; + /** @var int|null Expected minimum index of the last write to the position store */ + protected $waitForPosIndex; /** @var int Max seconds to wait on positions to appear */ - protected $waitForPosTimeout = self::POS_WAIT_TIMEOUT; + protected $waitForPosStoreTimeout = self::POS_STORE_WAIT_TIMEOUT; /** @var bool Whether to no-op all method calls */ protected $enabled = true; /** @var bool Whether to check and wait on positions */ @@ -64,19 +64,19 @@ class ChronologyProtector implements LoggerAwareInterface { /** @var int Seconds to store positions */ const POSITION_TTL = 60; /** @var int Max time to wait for positions to appear */ - const POS_WAIT_TIMEOUT = 5; + const POS_STORE_WAIT_TIMEOUT = 5; /** * @param BagOStuff $store - * @param array $client Map of (ip: , agent: ) - * @param float $posTime UNIX timestamp + * @param array[] $client Map of (ip: , agent: ) + * @param int|null $posIndex Write counter index [optional] * @since 1.27 */ - public function __construct( BagOStuff $store, array $client, $posTime = null ) { + public function __construct( BagOStuff $store, array $client, $posIndex = null ) { $this->store = $store; $this->clientId = md5( $client['ip'] . "\n" . $client['agent'] ); - $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v1' ); - $this->waitForPosTime = $posTime; + $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' ); + $this->waitForPosIndex = $posIndex; $this->logger = new NullLogger(); } @@ -124,7 +124,7 @@ class ChronologyProtector implements LoggerAwareInterface { $this->startupPositions[$masterName] instanceof DBMasterPos ) { $pos = $this->startupPositions[$masterName]; - $this->logger->info( __METHOD__ . ": LB for '$masterName' set to pos $pos\n" ); + $this->logger->debug( __METHOD__ . ": LB for '$masterName' set to pos $pos\n" ); $lb->waitFor( $pos ); } } @@ -147,10 +147,12 @@ class ChronologyProtector implements LoggerAwareInterface { $masterName = $lb->getServerName( $lb->getWriterIndex() ); if ( $lb->getServerCount() > 1 ) { $pos = $lb->getMasterPos(); - $this->logger->info( __METHOD__ . ": LB for '$masterName' has pos $pos\n" ); - $this->shutdownPositions[$masterName] = $pos; + if ( $pos ) { + $this->logger->debug( __METHOD__ . ": LB for '$masterName' has pos $pos\n" ); + $this->shutdownPositions[$masterName] = $pos; + } } else { - $this->logger->info( __METHOD__ . ": DB '$masterName' touched\n" ); + $this->logger->debug( __METHOD__ . ": DB '$masterName' touched\n" ); } $this->shutdownTouchDBs[$masterName] = 1; } @@ -161,9 +163,10 @@ class ChronologyProtector implements LoggerAwareInterface { * * @param callable|null $workCallback Work to do instead of waiting on syncing positions * @param string $mode One of (sync, async); whether to wait on remote datacenters + * @param int|null &$cpIndex DB position key write counter; incremented on update * @return DBMasterPos[] Empty on success; returns the (db name => position) map on failure */ - public function shutdown( callable $workCallback = null, $mode = 'sync' ) { + public function shutdown( callable $workCallback = null, $mode = 'sync', &$cpIndex = null ) { if ( !$this->enabled ) { return []; } @@ -183,7 +186,7 @@ class ChronologyProtector implements LoggerAwareInterface { return []; // nothing to save } - $this->logger->info( __METHOD__ . ": saving master pos for " . + $this->logger->debug( __METHOD__ . ": saving master pos for " . implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n" ); @@ -198,13 +201,18 @@ class ChronologyProtector implements LoggerAwareInterface { } $ok = $store->set( $this->key, - self::mergePositions( $store->get( $this->key ), $this->shutdownPositions ), + $this->mergePositions( + $store->get( $this->key ), + $this->shutdownPositions, + $cpIndex + ), self::POSITION_TTL, ( $mode === 'sync' ) ? $store::WRITE_SYNC : 0 ); $store->unlock( $this->key ); } else { $ok = false; + $cpIndex = null; // nothing saved } if ( !$ok ) { @@ -254,28 +262,36 @@ class ChronologyProtector implements LoggerAwareInterface { $this->initialized = true; if ( $this->wait ) { - // If there is an expectation to see master positions with a certain min - // timestamp, then block until they appear, or until a timeout is reached. - if ( $this->waitForPosTime > 0.0 ) { + // If there is an expectation to see master positions from a certain write + // index or higher, then block until it appears, or until a timeout is reached. + // Since the write index restarts each time the key is created, it is possible that + // a lagged store has a matching key write index. However, in that case, it should + // already be expired and thus treated as non-existing, maintaining correctness. + if ( $this->waitForPosIndex > 0 ) { $data = null; $loop = new WaitConditionLoop( function () use ( &$data ) { $data = $this->store->get( $this->key ); + if ( !is_array( $data ) ) { + return WaitConditionLoop::CONDITION_CONTINUE; // not found yet + } elseif ( !isset( $data['writeIndex'] ) ) { + return WaitConditionLoop::CONDITION_REACHED; // b/c + } - return ( self::minPosTime( $data ) >= $this->waitForPosTime ) + return ( $data['writeIndex'] >= $this->waitForPosIndex ) ? WaitConditionLoop::CONDITION_REACHED : WaitConditionLoop::CONDITION_CONTINUE; }, - $this->waitForPosTimeout + $this->waitForPosStoreTimeout ); $result = $loop->invoke(); $waitedMs = $loop->getLastWaitTime() * 1e3; if ( $result == $loop::CONDITION_REACHED ) { - $msg = "expected and found pos time {$this->waitForPosTime} ({$waitedMs}ms)"; + $msg = "expected and found pos index {$this->waitForPosIndex} ({$waitedMs}ms)"; $this->logger->debug( $msg ); } else { - $msg = "expected but missed pos time {$this->waitForPosTime} ({$waitedMs}ms)"; + $msg = "expected but missed pos index {$this->waitForPosIndex} ({$waitedMs}ms)"; $this->logger->info( $msg ); } } else { @@ -283,55 +299,38 @@ class ChronologyProtector implements LoggerAwareInterface { } $this->startupPositions = $data ? $data['positions'] : []; - $this->logger->info( __METHOD__ . ": key is {$this->key} (read)\n" ); + $this->logger->debug( __METHOD__ . ": key is {$this->key} (read)\n" ); } else { $this->startupPositions = []; - $this->logger->info( __METHOD__ . ": key is {$this->key} (unread)\n" ); + $this->logger->debug( __METHOD__ . ": key is {$this->key} (unread)\n" ); } } - /** - * @param array|bool $data - * @return float|null - */ - private static function minPosTime( $data ) { - if ( !isset( $data['positions'] ) ) { - return null; - } - - $min = null; - foreach ( $data['positions'] as $pos ) { - if ( $pos instanceof DBMasterPos ) { - $min = $min ? min( $pos->asOfTime(), $min ) : $pos->asOfTime(); - } - } - - return $min; - } - /** * @param array|bool $curValue * @param DBMasterPos[] $shutdownPositions + * @param int|null &$cpIndex * @return array */ - private static function mergePositions( $curValue, array $shutdownPositions ) { + protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) { /** @var DBMasterPos[] $curPositions */ - if ( $curValue === false ) { - $curPositions = $shutdownPositions; - } else { - $curPositions = $curValue['positions']; - // Use the newest positions for each DB master - foreach ( $shutdownPositions as $db => $pos ) { - if ( - !isset( $curPositions[$db] ) || - !( $curPositions[$db] instanceof DBMasterPos ) || - $pos->asOfTime() > $curPositions[$db]->asOfTime() - ) { - $curPositions[$db] = $pos; - } + $curPositions = isset( $curValue['positions'] ) ? $curValue['positions'] : []; + // Use the newest positions for each DB master + foreach ( $shutdownPositions as $db => $pos ) { + if ( + !isset( $curPositions[$db] ) || + !( $curPositions[$db] instanceof DBMasterPos ) || + $pos->asOfTime() > $curPositions[$db]->asOfTime() + ) { + $curPositions[$db] = $pos; } } - return [ 'positions' => $curPositions ]; + $cpIndex = isset( $curValue['writeIndex'] ) ? $curValue['writeIndex'] : 0; + + return [ + 'positions' => $curPositions, + 'writeIndex' => ++$cpIndex + ]; } }