/** @var int Seconds to store positions */
const POSITION_TTL = 60;
+ /** @var int Seconds to store position write index cookies (safely less than POSITION_TTL) */
+ const POSITION_COOKIE_TTL = 60;
/** @var int Max time to wait for positions to appear */
const POS_STORE_WAIT_TIMEOUT = 5;
/**
* @param BagOStuff $store
- * @param array[] $client Map of (ip: <IP>, agent: <user-agent>)
+ * @param array[] $client Map of (ip: <IP>, agent: <user-agent> [, clientId: <hash>] )
* @param int|null $posIndex Write counter index [optional]
* @since 1.27
*/
public function __construct( BagOStuff $store, array $client, $posIndex = null ) {
$this->store = $store;
- $this->clientId = md5( $client['ip'] . "\n" . $client['agent'] );
+ $this->clientId = isset( $client['clientId'] )
+ ? $client['clientId']
+ : md5( $client['ip'] . "\n" . $client['agent'] );
$this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
$this->waitForPosIndex = $posIndex;
$this->logger = new NullLogger();
$this->logger = $logger;
}
+ /**
+ * @return string Client ID hash
+ * @since 1.32
+ */
+ public function getClientId() {
+ return $this->clientId;
+ }
+
/**
* @param bool $enabled Whether to no-op all method calls
* @since 1.27
implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
);
- // CP-protected writes should overwhemingly go to the master datacenter, so get DC-local
- // lock to merge the values. Use a DC-local get() and a synchronous all-DC set(). This
- // makes it possible for the BagOStuff class to write in parallel to all DCs with one RTT.
+ // CP-protected writes should overwhelmingly go to the master datacenter, so use a
+ // DC-local lock to merge the values. Use a DC-local get() and a synchronous all-DC
+ // set(). This makes it possible for the BagOStuff class to write in parallel to all
+ // DCs with one RTT. The use of WRITE_SYNC avoids needing READ_LATEST for the get().
if ( $store->lock( $this->key, 3 ) ) {
if ( $workCallback ) {
- // Let the store run the work before blocking on a replication sync barrier. By the
- // time it's done with the work, the barrier should be fast if replication caught up.
+ // Let the store run the work before blocking on a replication sync barrier.
+ // If replication caught up while the work finished, the barrier will be fast.
$store->addBusyCallback( $workCallback );
}
$ok = $store->set(
$store->unlock( $this->key );
} else {
$ok = false;
- $cpIndex = null; // nothing saved
}
if ( !$ok ) {
+ $cpIndex = null; // nothing saved
$bouncedPositions = $this->shutdownPositions;
// Raced out too many times or stash is down
$this->logger->warning( __METHOD__ . ": failed to save master pos for " .
// already be expired and thus treated as non-existing, maintaining correctness.
if ( $this->waitForPosIndex > 0 ) {
$data = null;
+ $indexReached = null; // highest index reached in the position store
$loop = new WaitConditionLoop(
- function () use ( &$data ) {
+ function () use ( &$data, &$indexReached ) {
$data = $this->store->get( $this->key );
if ( !is_array( $data ) ) {
return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
} elseif ( !isset( $data['writeIndex'] ) ) {
return WaitConditionLoop::CONDITION_REACHED; // b/c
}
+ $indexReached = max( $data['writeIndex'], $indexReached );
return ( $data['writeIndex'] >= $this->waitForPosIndex )
? WaitConditionLoop::CONDITION_REACHED
$waitedMs = $loop->getLastWaitTime() * 1e3;
if ( $result == $loop::CONDITION_REACHED ) {
- $msg = "expected and found pos index {$this->waitForPosIndex} ({$waitedMs}ms)";
- $this->logger->debug( $msg );
+ $this->logger->debug(
+ __METHOD__ . ": expected and found position index.",
+ [
+ 'cpPosIndex' => $this->waitForPosIndex,
+ 'waitTimeMs' => $waitedMs
+ ]
+ );
} else {
- $msg = "expected but missed pos index {$this->waitForPosIndex} ({$waitedMs}ms)";
- $this->logger->info( $msg );
+ $this->logger->warning(
+ __METHOD__ . ": expected but failed to find position index.",
+ [
+ 'cpPosIndex' => $this->waitForPosIndex,
+ 'indexReached' => $indexReached,
+ 'waitTimeMs' => $waitedMs
+ ]
+ );
}
} else {
$data = $this->store->get( $this->key );
*/
protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) {
/** @var DBMasterPos[] $curPositions */
- $curPositions = isset( $curValue['positions'] ) ? $curValue['positions'] : [];
+ $curPositions = $curValue['positions'] ?? [];
// Use the newest positions for each DB master
foreach ( $shutdownPositions as $db => $pos ) {
if (
}
}
- $cpIndex = isset( $curValue['writeIndex'] ) ? $curValue['writeIndex'] : 0;
+ $cpIndex = $curValue['writeIndex'] ?? 0;
return [
'positions' => $curPositions,