Make WANObjectCache sets account for slave lag
authorAaron Schulz <aschulz@wikimedia.org>
Thu, 1 Oct 2015 02:40:09 +0000 (19:40 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Mon, 5 Oct 2015 23:45:13 +0000 (16:45 -0700)
* This gets lag information that is useful when
  the calling code is about to run queries that
  will have their results cached.
* This is now used in place of trxTimestamp() for
  WANObjectCache set() and getWithSetCallback().
* The WAN cache will use a low TTL if the lag is
  too high to avoid caching stale data for weeks.
* Bumped MAX_COMMIT_DELAY as nothing enforces it.

Bug: T113204
Change-Id: I2a95b4088cff42d6e980351555f81a4b13519e24

12 files changed:
includes/User.php
includes/actions/InfoAction.php
includes/changetags/ChangeTags.php
includes/db/DBConnRef.php
includes/db/Database.php
includes/db/DatabaseMysqlBase.php
includes/db/IDatabase.php
includes/filerepo/LocalRepo.php
includes/filerepo/file/LocalFile.php
includes/interwiki/Interwiki.php
includes/libs/objectcache/WANObjectCache.php
tests/phpunit/includes/utils/BatchRowUpdateTest.php

index e523b68..20b75bf 100644 (file)
@@ -458,7 +458,7 @@ class User implements IDBAccessObject {
                $data['mVersion'] = self::VERSION;
                $key = wfMemcKey( 'user', 'id', $this->mId );
 
-               $opts = array( 'since' => wfGetDB( DB_SLAVE )->trxTimestamp() );
+               $opts = DatabaseBase::getCacheSetOptions( wfGetDB( DB_SLAVE ) );
                ObjectCache::getMainWANInstance()->set( $key, $data, 3600, $opts );
        }
 
index 9e3fe40..6560083 100644 (file)
@@ -674,7 +674,11 @@ class InfoAction extends FormlessAction {
                                $title = $page->getTitle();
                                $id = $title->getArticleID();
 
+                               $dbr = wfGetDB( DB_SLAVE );
                                $dbrWatchlist = wfGetDB( DB_SLAVE, 'watchlist' );
+
+                               $setOpts += DatabaseBase::getCacheSetOptions( $dbr, $dbrWatchlist );
+
                                $result = array();
 
                                // Number of page watchers
@@ -709,7 +713,6 @@ class InfoAction extends FormlessAction {
                                        $result['visitingWatchers'] = $visitingWatchers;
                                }
 
-                               $dbr = wfGetDB( DB_SLAVE );
                                // Total number of edits
                                $edits = (int)$dbr->selectField(
                                        'revision',
@@ -808,8 +811,6 @@ class InfoAction extends FormlessAction {
                                        $fname
                                );
 
-                               $setOpts = array( 'since' => $dbr->trxTimestamp() );
-
                                return $result;
                        },
                        86400 * 7
index 12f738f..e1b9b27 100644 (file)
@@ -1090,7 +1090,9 @@ class ChangeTags {
        public static function listExtensionActivatedTags() {
                return ObjectCache::getMainWANInstance()->getWithSetCallback(
                        wfMemcKey( 'active-tags' ),
-                       function() {
+                       function ( $oldValue, &$ttl, array &$setOpts ) {
+                               $setOpts += DatabaseBase::getCacheSetOptions( wfGetDB( DB_SLAVE ) );
+
                                // Ask extensions which tags they consider active
                                $extensionActive = array();
                                Hooks::run( 'ChangeTagsListActive', array( &$extensionActive ) );
@@ -1130,10 +1132,12 @@ class ChangeTags {
 
                return ObjectCache::getMainWANInstance()->getWithSetCallback(
                        wfMemcKey( 'valid-tags-db' ),
-                       function() use ( $fname ) {
+                       function ( $oldValue, &$ttl, array &$setOpts ) use ( $fname ) {
                                $dbr = wfGetDB( DB_SLAVE );
-                               $tags = $dbr->selectFieldValues(
-                                       'valid_tag', 'vt_tag', array(), $fname );
+
+                               $setOpts += DatabaseBase::getCacheSetOptions( $dbr );
+
+                               $tags = $dbr->selectFieldValues( 'valid_tag', 'vt_tag', array(), $fname );
 
                                return array_filter( array_unique( $tags ) );
                        },
@@ -1155,7 +1159,9 @@ class ChangeTags {
        public static function listExtensionDefinedTags() {
                return ObjectCache::getMainWANInstance()->getWithSetCallback(
                        wfMemcKey( 'valid-tags-hook' ),
-                       function() {
+                       function ( $oldValue, &$ttl, array &$setOpts ) {
+                               $setOpts += DatabaseBase::getCacheSetOptions( wfGetDB( DB_SLAVE ) );
+
                                $tags = array();
                                Hooks::run( 'ListDefinedTags', array( &$tags ) );
                                return array_filter( array_unique( $tags ) );
@@ -1212,10 +1218,11 @@ class ChangeTags {
                $fname = __METHOD__;
                $cachedStats = ObjectCache::getMainWANInstance()->getWithSetCallback(
                        wfMemcKey( 'change-tag-statistics' ),
-                       function() use ( $fname ) {
-                               $out = array();
-
+                       function ( $oldValue, &$ttl, array &$setOpts ) use ( $fname ) {
                                $dbr = wfGetDB( DB_SLAVE, 'vslow' );
+
+                               $setOpts += DatabaseBase::getCacheSetOptions( $dbr );
+
                                $res = $dbr->select(
                                        'change_tag',
                                        array( 'ct_tag', 'hitcount' => 'count(*)' ),
@@ -1224,6 +1231,7 @@ class ChangeTags {
                                        array( 'GROUP BY' => 'ct_tag', 'ORDER BY' => 'hitcount DESC' )
                                );
 
+                               $out = array();
                                foreach ( $res as $row ) {
                                        $out[$row->ct_tag] = $row->hitcount;
                                }
index ffada49..4195719 100644 (file)
@@ -457,6 +457,10 @@ class DBConnRef implements IDatabase {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
 
+       public function getSessionLagStatus() {
+               return $this->__call( __FUNCTION__, func_get_args() );
+       }
+
        public function maxListLen() {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
index 2bd436e..ddd6d31 100644 (file)
@@ -45,6 +45,9 @@ abstract class DatabaseBase implements IDatabase {
 
        protected $mServer, $mUser, $mPassword, $mDBname;
 
+       /** @var BagOStuff APC cache */
+       protected $srvCache;
+
        /** @var resource Database connection */
        protected $mConn = null;
        protected $mOpened = false;
@@ -96,6 +99,9 @@ abstract class DatabaseBase implements IDatabase {
         */
        private $mTrxTimestamp = null;
 
+       /** @var float Lag estimate at the time of BEGIN */
+       private $mTrxSlaveLag = null;
+
        /**
         * Remembers the function name given for starting the most recent transaction via begin().
         * Used to provide additional context for error reporting.
@@ -601,6 +607,9 @@ abstract class DatabaseBase implements IDatabase {
        function __construct( array $params ) {
                global $wgDBprefix, $wgDBmwschema, $wgCommandLineMode;
 
+               $this->mTrxAtomicLevels = new SplStack;
+               $this->srvCache = ObjectCache::newAccelerator( 'hash' );
+
                $server = $params['host'];
                $user = $params['user'];
                $password = $params['password'];
@@ -3466,6 +3475,11 @@ abstract class DatabaseBase implements IDatabase {
                $this->mTrxPreCommitCallbacks = array();
                $this->mTrxShortId = wfRandomString( 12 );
                $this->mTrxWriteDuration = 0.0;
+               // First SELECT after BEGIN will establish the snapshot in REPEATABLE-READ.
+               // Get an estimate of the slave lag before then, treating estimate staleness
+               // as lag itself just to be safe
+               $status = $this->getApproximateLagStatus();
+               $this->mTrxSlaveLag = $status['lag'] + ( microtime( true ) - $status['since'] );
        }
 
        /**
@@ -3742,6 +3756,80 @@ abstract class DatabaseBase implements IDatabase {
                return true;
        }
 
+       /**
+        * Get the slave lag when the current transaction started
+        * or a general lag estimate if not transaction is active
+        *
+        * This is useful when transactions might use snapshot isolation
+        * (e.g. REPEATABLE-READ in innodb), so the "real" lag of that data
+        * is this lag plus transaction duration. If they don't, it is still
+        * safe to be pessimistic. In AUTO-COMMIT mode, this still gives an
+        * indication of the staleness of subsequent reads.
+        *
+        * @return array ('lag': seconds, 'since': UNIX timestamp of BEGIN)
+        * @since 1.27
+        */
+       public function getSessionLagStatus() {
+               return $this->getTransactionLagStatus() ?: $this->getApproximateLagStatus();
+       }
+
+       /**
+        * Get the slave lag when the current transaction started
+        *
+        * This is useful when transactions might use snapshot isolation
+        * (e.g. REPEATABLE-READ in innodb), so the "real" lag of that data
+        * is this lag plus transaction duration. If they don't, it is still
+        * safe to be pessimistic. This returns null if there is no transaction.
+        *
+        * @return array|null ('lag': seconds, 'since': UNIX timestamp of BEGIN)
+        * @since 1.27
+        */
+       public function getTransactionLagStatus() {
+               return $this->mTrxLevel
+                       ? array( 'lag' => $this->mTrxSlaveLag, 'since' => $this->trxTimestamp() )
+                       : null;
+       }
+
+       /**
+        * Get a slave lag estimate for this server
+        *
+        * @return array ('lag': seconds, 'since': UNIX timestamp of estimate)
+        * @since 1.27
+        */
+       public function getApproximateLagStatus() {
+               return array(
+                       'lag'   => $this->getLBInfo( 'slave' ) ? $this->getLag() : 0,
+                       'since' => microtime( true )
+               );
+       }
+
+       /**
+        * Merge the result of getSessionLagStatus() for several DBs
+        * using the most pessimistic values to estimate the lag of
+        * any data derived from them in combination
+        *
+        * This is information is useful for caching modules
+        *
+        * @see WANObjectCache::set()
+        * @see WANObjectCache::getWithSetCallback()
+        *
+        * @param IDatabase $db1
+        * @param IDatabase ...
+        * @return array ('lag': highest lag, 'since': lowest estimate UNIX timestamp)
+        * @since 1.27
+        */
+       public static function getCacheSetOptions( IDatabase $db1 ) {
+               $res = array( 'lag' => 0, 'since' => INF );
+               foreach ( func_get_args() as $db ) {
+                       /** @var IDatabase $db */
+                       $status = $db->getSessionLagStatus();
+                       $res['lag'] = max( $res['lag'], $status['lag'] );
+                       $res['since'] = min( $res['since'], $status['since'] );
+               }
+
+               return $res;
+       }
+
        /**
         * Get slave lag. Currently supported only by MySQL.
         *
index d2ccbf4..097665d 100644 (file)
@@ -35,9 +35,6 @@ abstract class DatabaseMysqlBase extends DatabaseBase {
        /** @var string Method to detect slave lag */
        protected $lagDetectionMethod;
 
-       /** @var BagOStuff APC cache */
-       protected $srvCache;
-
        /** @var string|null */
        private $serverVersion = null;
 
@@ -55,8 +52,6 @@ abstract class DatabaseMysqlBase extends DatabaseBase {
                $this->lagDetectionMethod = isset( $params['lagDetectionMethod'] )
                        ? $params['lagDetectionMethod']
                        : 'Seconds_Behind_Master';
-
-               $this->srvCache = ObjectCache::newAccelerator( 'hash' );
        }
 
        /**
@@ -684,6 +679,24 @@ abstract class DatabaseMysqlBase extends DatabaseBase {
                return false;
        }
 
+       public function getApproximateLagStatus() {
+               if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
+                       // Disable caching since this is fast enough and we don't wan't
+                       // to be *too* pessimistic by having both the cache TTL and the
+                       // pt-heartbeat interval count as lag in getSessionLagStatus()
+                       return parent::getApproximateLagStatus();
+               }
+
+               $key = wfGlobalCacheKey( 'mysql-lag', $this->getServer() );
+               $approxLag = $this->srvCache->get( $key );
+               if ( !$approxLag ) {
+                       $approxLag = parent::getApproximateLagStatus();
+                       $this->srvCache->set( $key, $approxLag, 1 );
+               }
+
+               return $approxLag;
+       }
+
        /**
         * Wait for the slave to catch up to a given master position.
         * @todo Return values for this and base class are rubbish
index cb0b25f..51c4bfe 100644 (file)
@@ -1365,6 +1365,21 @@ interface IDatabase {
         */
        public function getLag();
 
+       /**
+        * Get the slave lag when the current transaction started
+        * or a general lag estimate if not transaction is active
+        *
+        * This is useful when transactions might use snapshot isolation
+        * (e.g. REPEATABLE-READ in innodb), so the "real" lag of that data
+        * is this lag plus transaction duration. If they don't, it is still
+        * safe to be pessimistic. In AUTO-COMMIT mode, this still gives an
+        * indication of the staleness of subsequent reads.
+        *
+        * @return array ('lag': seconds, 'since': UNIX timestamp of BEGIN)
+        * @since 1.27
+        */
+       public function getSessionLagStatus();
+
        /**
         * Return the maximum number of items allowed in a list, or 0 for unlimited.
         *
index c7ca4c2..389f081 100644 (file)
@@ -205,7 +205,7 @@ class LocalRepo extends FileRepo {
                        function ( $oldValue, &$ttl, array &$setOpts ) use ( $that, $title ) {
                                $dbr = $that->getSlaveDB(); // possibly remote DB
 
-                               $setOpts = array( 'since' => $dbr->trxTimestamp() );
+                               $setOpts += DatabaseBase::getCacheSetOptions( $dbr );
 
                                if ( $title instanceof Title ) {
                                        $row = $dbr->selectRow(
index 1bccf81..6208f2f 100644 (file)
@@ -309,7 +309,7 @@ class LocalFile extends File {
 
                // Cache presence for 1 week and negatives for 1 day
                $ttl = $this->fileExists ? 86400 * 7 : 86400;
-               $opts = array( 'since' => wfGetDB( DB_SLAVE )->trxTimestamp() );
+               $opts = DatabaseBase::getCacheSetOptions( $this->repo->getSlaveDB() );
                ObjectCache::getMainWANInstance()->set( $key, $cacheVal, $ttl, $opts );
        }
 
index 2bfe756..89aeaae 100644 (file)
@@ -221,6 +221,8 @@ class Interwiki {
                        function ( $oldValue, &$ttl, array &$setOpts ) use ( $prefix ) {
                                $dbr = wfGetDB( DB_SLAVE );
 
+                               $setOpts += DatabaseBase::getCacheSetOptions( $dbr );
+
                                $row = $dbr->selectRow(
                                        'interwiki',
                                        Interwiki::selectFields(),
@@ -228,8 +230,6 @@ class Interwiki {
                                        __METHOD__
                                );
 
-                               $setOpts = array( 'since' => $dbr->trxTimestamp() );
-
                                return $row ? (array)$row : '!NONEXISTENT';
                        },
                        $wgInterwikiExpiry
index 435d69b..b1d3ec2 100644 (file)
@@ -69,13 +69,13 @@ class WANObjectCache {
        protected $lastRelayError = self::ERR_NONE;
 
        /** Max time expected to pass between delete() and DB commit finishing */
-       const MAX_COMMIT_DELAY = 1;
-       /** Max expected replication lag for a reasonable storage setup */
-       const MAX_REPLICA_LAG = 7;
+       const MAX_COMMIT_DELAY = 3;
+       /** Max replication lag before applying TTL_LAGGED to set() */
+       const MAX_REPLICA_LAG = 5;
        /** Max time since snapshot transaction start to avoid no-op of set() */
-       const MAX_SNAPSHOT_LAG = 6;
+       const MAX_SNAPSHOT_LAG = 5;
        /** Seconds to tombstone keys on delete() */
-       const HOLDOFF_TTL = 14; // MAX_COMMIT_DELAY + MAX_REPLICA_LAG + MAX_SNAPSHOT_LAG
+       const HOLDOFF_TTL = 14; // MAX_COMMIT_DELAY + MAX_REPLICA_LAG + MAX_SNAPSHOT_LAG + 1
 
        /** Seconds to keep dependency purge keys around */
        const CHECK_KEY_TTL = 31536000; // 1 year
@@ -92,6 +92,8 @@ class WANObjectCache {
        const TTL_UNCACHEABLE = -1;
        /** Idiom for getWithSetCallback() callbacks to 'lockTSE' logic */
        const TSE_NONE = -1;
+       /** Max TTL to store keys when a data sourced is lagged */
+       const TTL_LAGGED = 30;
 
        /** Cache format version number */
        const VERSION = 1;
@@ -270,18 +272,21 @@ class WANObjectCache {
         * Example usage:
         * @code
         *     $dbr = wfGetDB( DB_SLAVE );
+        *     $setOpts = DatabaseBase::getCacheSetOptions( $dbr );
         *     // Fetch the row from the DB
         *     $row = $dbr->selectRow( ... );
         *     $key = wfMemcKey( 'building', $buildingId );
-        *     // Give the age of the transaction snapshot the data came from
-        *     $opts = array( 'since' => $dbr->trxTimestamp() );
-        *     $cache->set( $key, $row, 86400, $opts );
+        *     $cache->set( $key, $row, 86400, $setOpts );
         * @endcode
         *
         * @param string $key Cache key
         * @param mixed $value
         * @param integer $ttl Seconds to live [0=forever]
         * @param array $opts Options map:
+        *   - lag     : Seconds of slave lag. Typically, this is either the slave lag
+        *               before the data was read or, if applicable, the slave lag before
+        *               the snapshot-isolated transaction the data was read from started.
+        *               [Default: 0 seconds]
         *   - since   : UNIX timestamp of the data in $value. Typically, this is either
         *               the current time the data was read or (if applicable) the time when
         *               the snapshot-isolated transaction the data was read from started.
@@ -296,6 +301,12 @@ class WANObjectCache {
        final public function set( $key, $value, $ttl = 0, array $opts = array() ) {
                $lockTSE = isset( $opts['lockTSE'] ) ? $opts['lockTSE'] : self::TSE_NONE;
                $age = isset( $opts['since'] ) ? max( 0, microtime( true ) - $opts['since'] ) : 0;
+               $lag = isset( $opts['lag'] ) ? $opts['lag'] : 0;
+
+               if ( $lag > self::MAX_REPLICA_LAG ) {
+                       // Too much lag detected; lower TTL so it converges faster
+                       $ttl = $ttl ? min( $ttl, self::TTL_LAGGED ) : self::TTL_LAGGED;
+               }
 
                if ( $age > self::MAX_SNAPSHOT_LAG ) {
                        if ( $lockTSE >= 0 ) {
@@ -515,15 +526,12 @@ class WANObjectCache {
         *         // Key to store the cached value under
         *         wfMemcKey( 'cat-attributes', $catId ),
         *         // Function that derives the new key value
-        *         function( $oldValue, &$ttl, array &$setOpts ) {
-        *             // Fetch row from the DB
+        *         function ( $oldValue, &$ttl, array &$setOpts ) {
         *             $dbr = wfGetDB( DB_SLAVE );
-        *             $row = $dbr->selectRow( ... );
-        *
-        *             // Set age of the transaction snapshot the data came from
-        *             $setOpts = array( 'since' => $dbr->trxTimestamp() );
+        *             // Account for any snapshot/slave lag
+        *             $setOpts += DatabaseBase::getCacheSetOptions( $dbr );
         *
-        *             return $row;
+        *             return $dbr->selectRow( ... );
         *        },
         *        // Time-to-live (seconds)
         *        60
@@ -536,15 +544,12 @@ class WANObjectCache {
         *         // Key to store the cached value under
         *         wfMemcKey( 'site-cat-config' ),
         *         // Function that derives the new key value
-        *         function( $oldValue, &$ttl, array &$setOpts ) {
-        *             // Fetch row from the DB
+        *         function ( $oldValue, &$ttl, array &$setOpts ) {
         *             $dbr = wfGetDB( DB_SLAVE );
-        *             $config = CatConfig::newFromRow( $dbr->selectRow( ... ) );
+        *             // Account for any snapshot/slave lag
+        *             $setOpts += DatabaseBase::getCacheSetOptions( $dbr );
         *
-        *             // Set age of the transaction snapshot the data came from
-        *             $setOpts = array( 'since' => $dbr->trxTimestamp() );
-        *
-        *             return $config;
+        *             return CatConfig::newFromRow( $dbr->selectRow( ... ) );
         *        },
         *        // Time-to-live (seconds)
         *        86400,
@@ -561,15 +566,13 @@ class WANObjectCache {
         *         // Key to store the cached value under
         *         wfMemcKey( 'cat-state', $cat->getId() ),
         *         // Function that derives the new key value
-        *         function( $oldValue, &$ttl, array &$setOpts ) {
+        *         function ( $oldValue, &$ttl, array &$setOpts ) {
         *             // Determine new value from the DB
         *             $dbr = wfGetDB( DB_SLAVE );
-        *             $state = CatState::newFromResults( $dbr->select( ... ) );
-        *
-        *             // Set age of the transaction snapshot the data came from
-        *             $setOpts = array( 'since' => $dbr->trxTimestamp() );
+        *             // Account for any snapshot/slave lag
+        *             $setOpts += DatabaseBase::getCacheSetOptions( $dbr );
         *
-        *             return $state;
+        *             return CatState::newFromResults( $dbr->select( ... ) );
         *        },
         *        // Time-to-live (seconds)
         *        900,
@@ -589,20 +592,18 @@ class WANObjectCache {
         *         // Key to store the cached value under
         *         wfMemcKey( 'cat-last-actions', 100 ),
         *         // Function that derives the new key value
-        *         function( $oldValue, &$ttl, array &$setOpts ) {
+        *         function ( $oldValue, &$ttl, array &$setOpts ) {
         *             $dbr = wfGetDB( DB_SLAVE );
+        *             // Account for any snapshot/slave lag
+        *             $setOpts += DatabaseBase::getCacheSetOptions( $dbr );
+        *
         *             // Start off with the last cached list
         *             $list = $oldValue ?: array();
         *             // Fetch the last 100 relevant rows in descending order;
         *             // only fetch rows newer than $list[0] to reduce scanning
         *             $rows = iterator_to_array( $dbr->select( ... ) );
         *             // Merge them and get the new "last 100" rows
-        *             $list = array_slice( array_merge( $new, $list ), 0, 100 );
-        *
-        *             // Set age of the transaction snapshot the data came from
-        *             $setOpts = array( 'since' => $dbr->trxTimestamp() );
-        *
-        *             return $list;
+        *             return array_slice( array_merge( $new, $list ), 0, 100 );
         *        },
         *        // Time-to-live (seconds)
         *        10,
index 082ac82..d224af8 100644 (file)
@@ -238,12 +238,16 @@ class BatchRowUpdateTest extends MediaWikiTestCase {
        protected function mockDb() {
                // Cant mock from DatabaseType or DatabaseBase, they dont
                // have the full gamut of methods
+               // FIXME: the constructor normally sets mAtomicLevels and mSrvCache
                $databaseMysql = $this->getMockBuilder( 'DatabaseMysql' )
                        ->disableOriginalConstructor()
                        ->getMock();
                $databaseMysql->expects( $this->any() )
                        ->method( 'isOpen' )
                        ->will( $this->returnValue( true ) );
+               $databaseMysql->expects( $this->any() )
+                       ->method( 'getApproximateLagStatus' )
+                       ->will( $this->returnValue( array( 'lag' => 0, 'since' => 0 ) ) );
                return $databaseMysql;
        }
 }