protected $key;
/** @var string Hash of client parameters */
protected $clientId;
- /** @var float|null Minimum UNIX timestamp of 1+ expected startup positions */
- protected $waitForPosTime;
+ /** @var int|null Expected minimum index of the last write to the position store */
+ protected $waitForPosIndex;
/** @var int Max seconds to wait on positions to appear */
- protected $waitForPosTimeout = self::POS_WAIT_TIMEOUT;
+ protected $waitForPosStoreTimeout = self::POS_STORE_WAIT_TIMEOUT;
/** @var bool Whether to no-op all method calls */
protected $enabled = true;
/** @var bool Whether to check and wait on positions */
/** @var int Seconds to store positions */
const POSITION_TTL = 60;
/** @var int Max time to wait for positions to appear */
- const POS_WAIT_TIMEOUT = 5;
+ const POS_STORE_WAIT_TIMEOUT = 5;
/**
* @param BagOStuff $store
- * @param array $client Map of (ip: <IP>, agent: <user-agent>)
- * @param float $posTime UNIX timestamp
+ * @param array[] $client Map of (ip: <IP>, agent: <user-agent>)
+ * @param int|null $posIndex Write counter index [optional]
* @since 1.27
*/
- public function __construct( BagOStuff $store, array $client, $posTime = null ) {
+ public function __construct( BagOStuff $store, array $client, $posIndex = null ) {
$this->store = $store;
$this->clientId = md5( $client['ip'] . "\n" . $client['agent'] );
- $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v1' );
- $this->waitForPosTime = $posTime;
+ $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
+ $this->waitForPosIndex = $posIndex;
$this->logger = new NullLogger();
}
$this->startupPositions[$masterName] instanceof DBMasterPos
) {
$pos = $this->startupPositions[$masterName];
- $this->logger->info( __METHOD__ . ": LB for '$masterName' set to pos $pos\n" );
+ $this->logger->debug( __METHOD__ . ": LB for '$masterName' set to pos $pos\n" );
$lb->waitFor( $pos );
}
}
$masterName = $lb->getServerName( $lb->getWriterIndex() );
if ( $lb->getServerCount() > 1 ) {
$pos = $lb->getMasterPos();
- $this->logger->info( __METHOD__ . ": LB for '$masterName' has pos $pos\n" );
- $this->shutdownPositions[$masterName] = $pos;
+ if ( $pos ) {
+ $this->logger->debug( __METHOD__ . ": LB for '$masterName' has pos $pos\n" );
+ $this->shutdownPositions[$masterName] = $pos;
+ }
} else {
- $this->logger->info( __METHOD__ . ": DB '$masterName' touched\n" );
+ $this->logger->debug( __METHOD__ . ": DB '$masterName' touched\n" );
}
$this->shutdownTouchDBs[$masterName] = 1;
}
*
* @param callable|null $workCallback Work to do instead of waiting on syncing positions
* @param string $mode One of (sync, async); whether to wait on remote datacenters
+ * @param int|null &$cpIndex DB position key write counter; incremented on update
* @return DBMasterPos[] Empty on success; returns the (db name => position) map on failure
*/
- public function shutdown( callable $workCallback = null, $mode = 'sync' ) {
+ public function shutdown( callable $workCallback = null, $mode = 'sync', &$cpIndex = null ) {
if ( !$this->enabled ) {
return [];
}
return []; // nothing to save
}
- $this->logger->info( __METHOD__ . ": saving master pos for " .
+ $this->logger->debug( __METHOD__ . ": saving master pos for " .
implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
);
}
$ok = $store->set(
$this->key,
- self::mergePositions( $store->get( $this->key ), $this->shutdownPositions ),
+ $this->mergePositions(
+ $store->get( $this->key ),
+ $this->shutdownPositions,
+ $cpIndex
+ ),
self::POSITION_TTL,
( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
);
$store->unlock( $this->key );
} else {
$ok = false;
+ $cpIndex = null; // nothing saved
}
if ( !$ok ) {
$this->initialized = true;
if ( $this->wait ) {
- // If there is an expectation to see master positions with a certain min
- // timestamp, then block until they appear, or until a timeout is reached.
- if ( $this->waitForPosTime > 0.0 ) {
+ // If there is an expectation to see master positions from a certain write
+ // index or higher, then block until it appears, or until a timeout is reached.
+ // Since the write index restarts each time the key is created, it is possible that
+ // a lagged store has a matching key write index. However, in that case, it should
+ // already be expired and thus treated as non-existing, maintaining correctness.
+ if ( $this->waitForPosIndex > 0 ) {
$data = null;
$loop = new WaitConditionLoop(
function () use ( &$data ) {
$data = $this->store->get( $this->key );
+ if ( !is_array( $data ) ) {
+ return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
+ } elseif ( !isset( $data['writeIndex'] ) ) {
+ return WaitConditionLoop::CONDITION_REACHED; // b/c
+ }
- return ( self::minPosTime( $data ) >= $this->waitForPosTime )
+ return ( $data['writeIndex'] >= $this->waitForPosIndex )
? WaitConditionLoop::CONDITION_REACHED
: WaitConditionLoop::CONDITION_CONTINUE;
},
- $this->waitForPosTimeout
+ $this->waitForPosStoreTimeout
);
$result = $loop->invoke();
$waitedMs = $loop->getLastWaitTime() * 1e3;
if ( $result == $loop::CONDITION_REACHED ) {
- $msg = "expected and found pos time {$this->waitForPosTime} ({$waitedMs}ms)";
+ $msg = "expected and found pos index {$this->waitForPosIndex} ({$waitedMs}ms)";
$this->logger->debug( $msg );
} else {
- $msg = "expected but missed pos time {$this->waitForPosTime} ({$waitedMs}ms)";
+ $msg = "expected but missed pos index {$this->waitForPosIndex} ({$waitedMs}ms)";
$this->logger->info( $msg );
}
} else {
}
$this->startupPositions = $data ? $data['positions'] : [];
- $this->logger->info( __METHOD__ . ": key is {$this->key} (read)\n" );
+ $this->logger->debug( __METHOD__ . ": key is {$this->key} (read)\n" );
} else {
$this->startupPositions = [];
- $this->logger->info( __METHOD__ . ": key is {$this->key} (unread)\n" );
+ $this->logger->debug( __METHOD__ . ": key is {$this->key} (unread)\n" );
}
}
- /**
- * @param array|bool $data
- * @return float|null
- */
- private static function minPosTime( $data ) {
- if ( !isset( $data['positions'] ) ) {
- return null;
- }
-
- $min = null;
- foreach ( $data['positions'] as $pos ) {
- if ( $pos instanceof DBMasterPos ) {
- $min = $min ? min( $pos->asOfTime(), $min ) : $pos->asOfTime();
- }
- }
-
- return $min;
- }
-
/**
* @param array|bool $curValue
* @param DBMasterPos[] $shutdownPositions
+ * @param int|null &$cpIndex
* @return array
*/
- private static function mergePositions( $curValue, array $shutdownPositions ) {
+ protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) {
/** @var DBMasterPos[] $curPositions */
- if ( $curValue === false ) {
- $curPositions = $shutdownPositions;
- } else {
- $curPositions = $curValue['positions'];
- // Use the newest positions for each DB master
- foreach ( $shutdownPositions as $db => $pos ) {
- if (
- !isset( $curPositions[$db] ) ||
- !( $curPositions[$db] instanceof DBMasterPos ) ||
- $pos->asOfTime() > $curPositions[$db]->asOfTime()
- ) {
- $curPositions[$db] = $pos;
- }
+ $curPositions = isset( $curValue['positions'] ) ? $curValue['positions'] : [];
+ // Use the newest positions for each DB master
+ foreach ( $shutdownPositions as $db => $pos ) {
+ if (
+ !isset( $curPositions[$db] ) ||
+ !( $curPositions[$db] instanceof DBMasterPos ) ||
+ $pos->asOfTime() > $curPositions[$db]->asOfTime()
+ ) {
+ $curPositions[$db] = $pos;
}
}
- return [ 'positions' => $curPositions ];
+ $cpIndex = isset( $curValue['writeIndex'] ) ? $curValue['writeIndex'] : 0;
+
+ return [
+ 'positions' => $curPositions,
+ 'writeIndex' => ++$cpIndex
+ ];
}
}