objectcache: add "sessionConsistencyWindow" option to ReplicatedBagOStuff
authorAaron Schulz <aschulz@wikimedia.org>
Sat, 20 Jul 2019 21:59:45 +0000 (14:59 -0700)
committerKrinkle <krinklemail@gmail.com>
Sun, 1 Sep 2019 21:55:25 +0000 (21:55 +0000)
Change-Id: I25af780f063879eda2de1b9a1168e37115f823ed

includes/libs/objectcache/ReplicatedBagOStuff.php

index 0b5ac46..ff87829 100644 (file)
@@ -33,16 +33,26 @@ use Wikimedia\ObjectFactory;
  */
 class ReplicatedBagOStuff extends BagOStuff {
        /** @var BagOStuff */
  */
 class ReplicatedBagOStuff extends BagOStuff {
        /** @var BagOStuff */
-       protected $writeStore;
+       private $writeStore;
        /** @var BagOStuff */
        /** @var BagOStuff */
-       protected $readStore;
+       private $readStore;
+
+       /** @var int Seconds to read from the master source for a key after writing to it */
+       private $consistencyWindow;
+       /** @var float[] Map of (key => UNIX timestamp) */
+       private $lastKeyWrites = [];
+
+       /** @var int Max expected delay (in seconds) for writes to reach replicas */
+       const MAX_WRITE_DELAY = 5;
 
        /**
         * Constructor. Parameters are:
 
        /**
         * Constructor. Parameters are:
-        *   - writeFactory : ObjectFactory::getObjectFromSpec array yeilding BagOStuff.
-        *                    This object will be used for writes (e.g. the master DB).
-        *   - readFactory  : ObjectFactory::getObjectFromSpec array yeilding BagOStuff.
-        *                    This object will be used for reads (e.g. a replica DB).
+        *   - writeFactory: ObjectFactory::getObjectFromSpec array yeilding BagOStuff.
+        *      This object will be used for writes (e.g. the master DB).
+        *   - readFactory: ObjectFactory::getObjectFromSpec array yeilding BagOStuff.
+        *      This object will be used for reads (e.g. a replica DB).
+        *   - sessionConsistencyWindow: Seconds to read from the master source for a key
+        *      after writing to it. [Default: ReplicatedBagOStuff::MAX_WRITE_DELAY]
         *
         * @param array $params
         * @throws InvalidArgumentException
         *
         * @param array $params
         * @throws InvalidArgumentException
@@ -53,19 +63,18 @@ class ReplicatedBagOStuff extends BagOStuff {
                if ( !isset( $params['writeFactory'] ) ) {
                        throw new InvalidArgumentException(
                                __METHOD__ . ': the "writeFactory" parameter is required' );
                if ( !isset( $params['writeFactory'] ) ) {
                        throw new InvalidArgumentException(
                                __METHOD__ . ': the "writeFactory" parameter is required' );
-               }
-               if ( !isset( $params['readFactory'] ) ) {
+               } elseif ( !isset( $params['readFactory'] ) ) {
                        throw new InvalidArgumentException(
                                __METHOD__ . ': the "readFactory" parameter is required' );
                }
 
                        throw new InvalidArgumentException(
                                __METHOD__ . ': the "readFactory" parameter is required' );
                }
 
-               $opts = [ 'reportDupes' => false ]; // redundant
+               $this->consistencyWindow = $params['sessionConsistencyWindow'] ?? self::MAX_WRITE_DELAY;
                $this->writeStore = ( $params['writeFactory'] instanceof BagOStuff )
                        ? $params['writeFactory']
                $this->writeStore = ( $params['writeFactory'] instanceof BagOStuff )
                        ? $params['writeFactory']
-                       : ObjectFactory::getObjectFromSpec( $opts + $params['writeFactory'] );
+                       : ObjectFactory::getObjectFromSpec( $params['writeFactory'] );
                $this->readStore = ( $params['readFactory'] instanceof BagOStuff )
                        ? $params['readFactory']
                $this->readStore = ( $params['readFactory'] instanceof BagOStuff )
                        ? $params['readFactory']
-                       : ObjectFactory::getObjectFromSpec( $opts + $params['readFactory'] );
+                       : ObjectFactory::getObjectFromSpec( $params['readFactory'] );
                $this->attrMap = $this->mergeFlagMaps( [ $this->readStore, $this->writeStore ] );
        }
 
                $this->attrMap = $this->mergeFlagMaps( [ $this->readStore, $this->writeStore ] );
        }
 
@@ -76,28 +85,41 @@ class ReplicatedBagOStuff extends BagOStuff {
        }
 
        public function get( $key, $flags = 0 ) {
        }
 
        public function get( $key, $flags = 0 ) {
-               return $this->fieldHasFlags( $flags, self::READ_LATEST )
+               return (
+                       $this->hadRecentSessionWrite( [ $key ] ) ||
+                       $this->fieldHasFlags( $flags, self::READ_LATEST )
+               )
                        ? $this->writeStore->get( $key, $flags )
                        : $this->readStore->get( $key, $flags );
        }
 
        public function set( $key, $value, $exptime = 0, $flags = 0 ) {
                        ? $this->writeStore->get( $key, $flags )
                        : $this->readStore->get( $key, $flags );
        }
 
        public function set( $key, $value, $exptime = 0, $flags = 0 ) {
+               $this->remarkRecentSessionWrite( [ $key ] );
+
                return $this->writeStore->set( $key, $value, $exptime, $flags );
        }
 
        public function delete( $key, $flags = 0 ) {
                return $this->writeStore->set( $key, $value, $exptime, $flags );
        }
 
        public function delete( $key, $flags = 0 ) {
+               $this->remarkRecentSessionWrite( [ $key ] );
+
                return $this->writeStore->delete( $key, $flags );
        }
 
        public function add( $key, $value, $exptime = 0, $flags = 0 ) {
                return $this->writeStore->delete( $key, $flags );
        }
 
        public function add( $key, $value, $exptime = 0, $flags = 0 ) {
+               $this->remarkRecentSessionWrite( [ $key ] );
+
                return $this->writeStore->add( $key, $value, $exptime, $flags );
        }
 
        public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
                return $this->writeStore->add( $key, $value, $exptime, $flags );
        }
 
        public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
+               $this->remarkRecentSessionWrite( [ $key ] );
+
                return $this->writeStore->merge( $key, $callback, $exptime, $attempts, $flags );
        }
 
        public function changeTTL( $key, $exptime = 0, $flags = 0 ) {
                return $this->writeStore->merge( $key, $callback, $exptime, $attempts, $flags );
        }
 
        public function changeTTL( $key, $exptime = 0, $flags = 0 ) {
+               $this->remarkRecentSessionWrite( [ $key ] );
+
                return $this->writeStore->changeTTL( $key, $exptime, $flags );
        }
 
                return $this->writeStore->changeTTL( $key, $exptime, $flags );
        }
 
@@ -118,37 +140,52 @@ class ReplicatedBagOStuff extends BagOStuff {
        }
 
        public function getMulti( array $keys, $flags = 0 ) {
        }
 
        public function getMulti( array $keys, $flags = 0 ) {
-               return $this->fieldHasFlags( $flags, self::READ_LATEST )
+               return (
+                       $this->hadRecentSessionWrite( $keys ) ||
+                       $this->fieldHasFlags( $flags, self::READ_LATEST )
+               )
                        ? $this->writeStore->getMulti( $keys, $flags )
                        : $this->readStore->getMulti( $keys, $flags );
        }
 
        public function setMulti( array $data, $exptime = 0, $flags = 0 ) {
                        ? $this->writeStore->getMulti( $keys, $flags )
                        : $this->readStore->getMulti( $keys, $flags );
        }
 
        public function setMulti( array $data, $exptime = 0, $flags = 0 ) {
+               $this->remarkRecentSessionWrite( array_keys( $data ) );
+
                return $this->writeStore->setMulti( $data, $exptime, $flags );
        }
 
        public function deleteMulti( array $keys, $flags = 0 ) {
                return $this->writeStore->setMulti( $data, $exptime, $flags );
        }
 
        public function deleteMulti( array $keys, $flags = 0 ) {
+               $this->remarkRecentSessionWrite( $keys );
+
                return $this->writeStore->deleteMulti( $keys, $flags );
        }
 
        public function changeTTLMulti( array $keys, $exptime, $flags = 0 ) {
                return $this->writeStore->deleteMulti( $keys, $flags );
        }
 
        public function changeTTLMulti( array $keys, $exptime, $flags = 0 ) {
+               $this->remarkRecentSessionWrite( $keys );
+
                return $this->writeStore->changeTTLMulti( $keys, $exptime, $flags );
        }
 
        public function incr( $key, $value = 1, $flags = 0 ) {
                return $this->writeStore->changeTTLMulti( $keys, $exptime, $flags );
        }
 
        public function incr( $key, $value = 1, $flags = 0 ) {
+               $this->remarkRecentSessionWrite( [ $key ] );
+
                return $this->writeStore->incr( $key, $value, $flags );
        }
 
        public function decr( $key, $value = 1, $flags = 0 ) {
                return $this->writeStore->incr( $key, $value, $flags );
        }
 
        public function decr( $key, $value = 1, $flags = 0 ) {
+               $this->remarkRecentSessionWrite( [ $key ] );
+
                return $this->writeStore->decr( $key, $value, $flags );
        }
 
        public function incrWithInit( $key, $exptime, $value = 1, $init = null, $flags = 0 ) {
                return $this->writeStore->decr( $key, $value, $flags );
        }
 
        public function incrWithInit( $key, $exptime, $value = 1, $init = null, $flags = 0 ) {
+               $this->remarkRecentSessionWrite( [ $key ] );
+
                return $this->writeStore->incrWithInit( $key, $exptime, $value, $init, $flags );
        }
 
        public function getLastError() {
                return $this->writeStore->incrWithInit( $key, $exptime, $value, $init, $flags );
        }
 
        public function getLastError() {
-               return ( $this->writeStore->getLastError() != self::ERR_NONE )
+               return ( $this->writeStore->getLastError() !== self::ERR_NONE )
                        ? $this->writeStore->getLastError()
                        : $this->readStore->getLastError();
        }
                        ? $this->writeStore->getLastError()
                        : $this->readStore->getLastError();
        }
@@ -179,4 +216,40 @@ class ReplicatedBagOStuff extends BagOStuff {
                $this->writeStore->setMockTime( $time );
                $this->readStore->setMockTime( $time );
        }
                $this->writeStore->setMockTime( $time );
                $this->readStore->setMockTime( $time );
        }
+
+       /**
+        * @param string[] $keys
+        * @return bool
+        */
+       private function hadRecentSessionWrite( array $keys ) {
+               $now = $this->getCurrentTime();
+               foreach ( $keys as $key ) {
+                       $ts = $this->lastKeyWrites[$key] ?? 0;
+                       if ( $ts && ( $now - $ts ) <= $this->consistencyWindow ) {
+                               return true;
+                       }
+               }
+
+               return false;
+       }
+
+       /**
+        * @param string[] $keys
+        */
+       private function remarkRecentSessionWrite( array $keys ) {
+               $now = $this->getCurrentTime();
+               foreach ( $keys as $key ) {
+                       unset( $this->lastKeyWrites[$key] ); // move to the end
+                       $this->lastKeyWrites[$key] = $now;
+               }
+               // Prune out the map if the first key is obsolete
+               if ( ( $now - reset( $this->lastKeyWrites ) ) > $this->consistencyWindow ) {
+                       $this->lastKeyWrites = array_filter(
+                               $this->lastKeyWrites,
+                               function ( $timestamp ) use ( $now ) {
+                                       return ( ( $now - $timestamp ) <= $this->consistencyWindow );
+                               }
+                       );
+               }
+       }
 }
 }