From 9a9944e225276ad8ca8d63007b8d2dcef5362fa2 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Sat, 20 Jul 2019 14:59:45 -0700 Subject: [PATCH] objectcache: add "sessionConsistencyWindow" option to ReplicatedBagOStuff Change-Id: I25af780f063879eda2de1b9a1168e37115f823ed --- .../libs/objectcache/ReplicatedBagOStuff.php | 101 +++++++++++++++--- 1 file changed, 87 insertions(+), 14 deletions(-) diff --git a/includes/libs/objectcache/ReplicatedBagOStuff.php b/includes/libs/objectcache/ReplicatedBagOStuff.php index 0b5ac46f6a..ff87829086 100644 --- a/includes/libs/objectcache/ReplicatedBagOStuff.php +++ b/includes/libs/objectcache/ReplicatedBagOStuff.php @@ -33,16 +33,26 @@ use Wikimedia\ObjectFactory; */ class ReplicatedBagOStuff extends BagOStuff { /** @var BagOStuff */ - protected $writeStore; + private $writeStore; /** @var BagOStuff */ - protected $readStore; + private $readStore; + + /** @var int Seconds to read from the master source for a key after writing to it */ + private $consistencyWindow; + /** @var float[] Map of (key => UNIX timestamp) */ + private $lastKeyWrites = []; + + /** @var int Max expected delay (in seconds) for writes to reach replicas */ + const MAX_WRITE_DELAY = 5; /** * Constructor. Parameters are: - * - writeFactory : ObjectFactory::getObjectFromSpec array yeilding BagOStuff. - * This object will be used for writes (e.g. the master DB). - * - readFactory : ObjectFactory::getObjectFromSpec array yeilding BagOStuff. - * This object will be used for reads (e.g. a replica DB). + * - writeFactory: ObjectFactory::getObjectFromSpec array yeilding BagOStuff. + * This object will be used for writes (e.g. the master DB). + * - readFactory: ObjectFactory::getObjectFromSpec array yeilding BagOStuff. + * This object will be used for reads (e.g. a replica DB). + * - sessionConsistencyWindow: Seconds to read from the master source for a key + * after writing to it. [Default: ReplicatedBagOStuff::MAX_WRITE_DELAY] * * @param array $params * @throws InvalidArgumentException @@ -53,19 +63,18 @@ class ReplicatedBagOStuff extends BagOStuff { if ( !isset( $params['writeFactory'] ) ) { throw new InvalidArgumentException( __METHOD__ . ': the "writeFactory" parameter is required' ); - } - if ( !isset( $params['readFactory'] ) ) { + } elseif ( !isset( $params['readFactory'] ) ) { throw new InvalidArgumentException( __METHOD__ . ': the "readFactory" parameter is required' ); } - $opts = [ 'reportDupes' => false ]; // redundant + $this->consistencyWindow = $params['sessionConsistencyWindow'] ?? self::MAX_WRITE_DELAY; $this->writeStore = ( $params['writeFactory'] instanceof BagOStuff ) ? $params['writeFactory'] - : ObjectFactory::getObjectFromSpec( $opts + $params['writeFactory'] ); + : ObjectFactory::getObjectFromSpec( $params['writeFactory'] ); $this->readStore = ( $params['readFactory'] instanceof BagOStuff ) ? $params['readFactory'] - : ObjectFactory::getObjectFromSpec( $opts + $params['readFactory'] ); + : ObjectFactory::getObjectFromSpec( $params['readFactory'] ); $this->attrMap = $this->mergeFlagMaps( [ $this->readStore, $this->writeStore ] ); } @@ -76,28 +85,41 @@ class ReplicatedBagOStuff extends BagOStuff { } public function get( $key, $flags = 0 ) { - return $this->fieldHasFlags( $flags, self::READ_LATEST ) + return ( + $this->hadRecentSessionWrite( [ $key ] ) || + $this->fieldHasFlags( $flags, self::READ_LATEST ) + ) ? $this->writeStore->get( $key, $flags ) : $this->readStore->get( $key, $flags ); } public function set( $key, $value, $exptime = 0, $flags = 0 ) { + $this->remarkRecentSessionWrite( [ $key ] ); + return $this->writeStore->set( $key, $value, $exptime, $flags ); } public function delete( $key, $flags = 0 ) { + $this->remarkRecentSessionWrite( [ $key ] ); + return $this->writeStore->delete( $key, $flags ); } public function add( $key, $value, $exptime = 0, $flags = 0 ) { + $this->remarkRecentSessionWrite( [ $key ] ); + return $this->writeStore->add( $key, $value, $exptime, $flags ); } public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) { + $this->remarkRecentSessionWrite( [ $key ] ); + return $this->writeStore->merge( $key, $callback, $exptime, $attempts, $flags ); } public function changeTTL( $key, $exptime = 0, $flags = 0 ) { + $this->remarkRecentSessionWrite( [ $key ] ); + return $this->writeStore->changeTTL( $key, $exptime, $flags ); } @@ -118,37 +140,52 @@ class ReplicatedBagOStuff extends BagOStuff { } public function getMulti( array $keys, $flags = 0 ) { - return $this->fieldHasFlags( $flags, self::READ_LATEST ) + return ( + $this->hadRecentSessionWrite( $keys ) || + $this->fieldHasFlags( $flags, self::READ_LATEST ) + ) ? $this->writeStore->getMulti( $keys, $flags ) : $this->readStore->getMulti( $keys, $flags ); } public function setMulti( array $data, $exptime = 0, $flags = 0 ) { + $this->remarkRecentSessionWrite( array_keys( $data ) ); + return $this->writeStore->setMulti( $data, $exptime, $flags ); } public function deleteMulti( array $keys, $flags = 0 ) { + $this->remarkRecentSessionWrite( $keys ); + return $this->writeStore->deleteMulti( $keys, $flags ); } public function changeTTLMulti( array $keys, $exptime, $flags = 0 ) { + $this->remarkRecentSessionWrite( $keys ); + return $this->writeStore->changeTTLMulti( $keys, $exptime, $flags ); } public function incr( $key, $value = 1, $flags = 0 ) { + $this->remarkRecentSessionWrite( [ $key ] ); + return $this->writeStore->incr( $key, $value, $flags ); } public function decr( $key, $value = 1, $flags = 0 ) { + $this->remarkRecentSessionWrite( [ $key ] ); + return $this->writeStore->decr( $key, $value, $flags ); } public function incrWithInit( $key, $exptime, $value = 1, $init = null, $flags = 0 ) { + $this->remarkRecentSessionWrite( [ $key ] ); + return $this->writeStore->incrWithInit( $key, $exptime, $value, $init, $flags ); } public function getLastError() { - return ( $this->writeStore->getLastError() != self::ERR_NONE ) + return ( $this->writeStore->getLastError() !== self::ERR_NONE ) ? $this->writeStore->getLastError() : $this->readStore->getLastError(); } @@ -179,4 +216,40 @@ class ReplicatedBagOStuff extends BagOStuff { $this->writeStore->setMockTime( $time ); $this->readStore->setMockTime( $time ); } + + /** + * @param string[] $keys + * @return bool + */ + private function hadRecentSessionWrite( array $keys ) { + $now = $this->getCurrentTime(); + foreach ( $keys as $key ) { + $ts = $this->lastKeyWrites[$key] ?? 0; + if ( $ts && ( $now - $ts ) <= $this->consistencyWindow ) { + return true; + } + } + + return false; + } + + /** + * @param string[] $keys + */ + private function remarkRecentSessionWrite( array $keys ) { + $now = $this->getCurrentTime(); + foreach ( $keys as $key ) { + unset( $this->lastKeyWrites[$key] ); // move to the end + $this->lastKeyWrites[$key] = $now; + } + // Prune out the map if the first key is obsolete + if ( ( $now - reset( $this->lastKeyWrites ) ) > $this->consistencyWindow ) { + $this->lastKeyWrites = array_filter( + $this->lastKeyWrites, + function ( $timestamp ) use ( $now ) { + return ( ( $now - $timestamp ) <= $this->consistencyWindow ); + } + ); + } + } } -- 2.20.1