objectcache: add WANObjectCacheReaper for assuring purges
authorAaron Schulz <aschulz@wikimedia.org>
Sat, 3 Sep 2016 04:43:16 +0000 (21:43 -0700)
committerTim Starling <tstarling@wikimedia.org>
Fri, 27 Jan 2017 02:21:06 +0000 (02:21 +0000)
* This fixes keys based on some sort of change log.
  Updates are wrapped in a mutex and keep track of the
  last known good position.
* Make WANObjectReapUpdate class that cleans up title
  related keys using the recentchanges table. This triggers
  as a deferred updates on RC view.

Change-Id: I7f14b9ca2533032147e62b1a3cc004a23da86579

autoload.php
includes/DefaultSettings.php
includes/cache/LinkCache.php
includes/deferred/WANCacheReapUpdate.php [new file with mode: 0644]
includes/filerepo/file/LocalFile.php
includes/libs/objectcache/WANObjectCache.php
includes/libs/objectcache/WANObjectCacheReaper.php [new file with mode: 0644]
includes/page/WikiPage.php
includes/specialpage/ChangesListSpecialPage.php
includes/user/User.php
tests/phpunit/includes/libs/objectcache/WANObjectCacheTest.php

index e7c97ad..329cdb3 100644 (file)
@@ -1541,7 +1541,9 @@ $wgAutoloadLocalClasses = [
        'ViewCLI' => __DIR__ . '/maintenance/view.php',
        'VirtualRESTService' => __DIR__ . '/includes/libs/virtualrest/VirtualRESTService.php',
        'VirtualRESTServiceClient' => __DIR__ . '/includes/libs/virtualrest/VirtualRESTServiceClient.php',
+       'WANCacheReapUpdate' => __DIR__ . '/includes/deferred/WANCacheReapUpdate.php',
        'WANObjectCache' => __DIR__ . '/includes/libs/objectcache/WANObjectCache.php',
+       'WANObjectCacheReaper' => __DIR__ . '/includes/libs/objectcache/WANObjectCacheReaper.php',
        'WantedCategoriesPage' => __DIR__ . '/includes/specials/SpecialWantedcategories.php',
        'WantedFilesPage' => __DIR__ . '/includes/specials/SpecialWantedfiles.php',
        'WantedPagesPage' => __DIR__ . '/includes/specials/SpecialWantedpages.php',
index 086b615..58ddb69 100644 (file)
@@ -2342,6 +2342,19 @@ $wgWANObjectCaches = [
        */
 ];
 
+/**
+ * Verify and enforce WAN cache purges using reliable DB sources as streams.
+ *
+ * These secondary cache purges are de-duplicated via simple cache mutexes.
+ * This improves consistency when cache purges are lost, which becomes more likely
+ * as more cache servers are added or if there are multiple datacenters. Only keys
+ * related to important mutable content will be checked.
+ *
+ * @var bool
+ * @since 1.29
+ */
+$wgEnableWANCacheReaper = false;
+
 /**
  * Main object stash type. This should be a fast storage system for storing
  * lightweight data like hit counters and user activity. Sites with multiple
index 23cc26d..b720dec 100644 (file)
@@ -278,6 +278,20 @@ class LinkCache {
                return $id;
        }
 
+       /**
+        * @param WANObjectCache $cache
+        * @param TitleValue $t
+        * @return string[]
+        * @since 1.28
+        */
+       public function getMutableCacheKeys( WANObjectCache $cache, TitleValue $t ) {
+               if ( $this->isCacheable( $t ) ) {
+                       return [ $cache->makeKey( 'page', $t->getNamespace(), sha1( $t->getDBkey() ) ) ];
+               }
+
+               return [];
+       }
+
        private function isCacheable( LinkTarget $title ) {
                return ( $title->inNamespace( NS_TEMPLATE ) || $title->inNamespace( NS_FILE ) );
        }
diff --git a/includes/deferred/WANCacheReapUpdate.php b/includes/deferred/WANCacheReapUpdate.php
new file mode 100644 (file)
index 0000000..33ddc59
--- /dev/null
@@ -0,0 +1,126 @@
+<?php
+
+use Psr\Log\LoggerInterface;
+
+/**
+ * Class for fixing stale WANObjectCache keys using a purge event source
+ *
+ * This is useful for expiring keys that missed fire-and-forget purges. This uses the
+ * recentchanges table as a reliable stream to make certain keys reach consistency
+ * as soon as the underlying replica database catches up. These means that critical
+ * keys will not escape getting purged simply due to brief hiccups in the network,
+ * which are more prone to happen accross datacenters.
+ *
+ * ----
+ * "I was trying to cheat death. I was only trying to surmount for a little while the
+ * darkness that all my life I surely knew was going to come rolling in on me some day
+ * and obliterate me. I was only to stay alive a little brief while longer, after I was
+ * already gone. To stay in the light, to be with the living, a little while past my time."
+ *   -- Notes for "Blues of a Lifetime", by [[Cornell Woolrich]]
+ *
+ * @since 1.28
+ */
+class WANCacheReapUpdate implements DeferrableUpdate {
+       /** @var IDatabase */
+       private $db;
+       /** @var LoggerInterface */
+       private $logger;
+
+       /**
+        * @param IDatabase $db
+        * @param LoggerInterface $logger
+        */
+       public function __construct( IDatabase $db, LoggerInterface $logger ) {
+               $this->db = $db;
+               $this->logger = $logger;
+       }
+
+       function doUpdate() {
+               $reaper = new WANObjectCacheReaper(
+                       ObjectCache::getMainWANInstance(),
+                       ObjectCache::getLocalClusterInstance(),
+                       [ $this, 'getTitleChangeEvents' ],
+                       [ $this, 'getEventAffectedKeys' ],
+                       [
+                               'channel' => 'table:recentchanges:' . $this->db->getWikiID(),
+                               'logger' => $this->logger
+                       ]
+               );
+
+               $reaper->invoke( 100 );
+       }
+
+       /**
+        * @see WANObjectCacheRepear
+        *
+        * @param int $start
+        * @param int $id
+        * @param int $end
+        * @param int $limit
+        * @return TitleValue[]
+        */
+       public function getTitleChangeEvents( $start, $id, $end, $limit ) {
+               $db = $this->db;
+               $encStart = $db->addQuotes( $db->timestamp( $start ) );
+               $encEnd = $db->addQuotes( $db->timestamp( $end ) );
+               $id = (int)$id; // cast NULL => 0 since rc_id is an integer
+
+               $res = $db->select(
+                       'recentchanges',
+                       [ 'rc_namespace', 'rc_title', 'rc_timestamp', 'rc_id' ],
+                       [
+                               $db->makeList( [
+                                       "rc_timestamp > $encStart",
+                                       "rc_timestamp = $encStart AND rc_id > " . $db->addQuotes( $id )
+                               ], LIST_OR ),
+                               "rc_timestamp < $encEnd"
+                       ],
+                       __METHOD__,
+                       [ 'ORDER BY' => 'rc_timestamp ASC, rc_id ASC', 'LIMIT' => $limit ]
+               );
+
+               $events = [];
+               foreach ( $res as $row ) {
+                       $events[] = [
+                               'id' => (int)$row->rc_id,
+                               'pos' => (int)wfTimestamp( TS_UNIX, $row->rc_timestamp ),
+                               'item' => new TitleValue( (int)$row->rc_namespace, $row->rc_title )
+                       ];
+               }
+
+               return $events;
+       }
+
+       /**
+        * Gets a list of important cache keys associated with a title
+        *
+        * @see WANObjectCacheRepear
+        * @param WANObjectCache $cache
+        * @param TitleValue $t
+        * @returns string[]
+        */
+       public function getEventAffectedKeys( WANObjectCache $cache, TitleValue $t ) {
+               /** @var WikiPage[]|LocalFile[]|User[] $entities */
+               $entities = [];
+
+               $entities[] = WikiPage::factory( Title::newFromTitleValue( $t ) );
+               if ( $t->inNamespace( NS_FILE ) ) {
+                       $entities[] = wfLocalFile( $t->getText() );
+               }
+               if ( $t->inNamespace( NS_USER ) ) {
+                       $entities[] = User::newFromName( $t->getText(), false );
+               }
+
+               $keys = [];
+               foreach ( $entities as $entity ) {
+                       if ( $entity ) {
+                               $keys = array_merge( $keys, $entity->getMutableCacheKeys( $cache ) );
+                       }
+               }
+               if ( $keys ) {
+                       $this->logger->debug( __CLASS__ . ': got key(s) ' . implode( ', ', $keys ) );
+               }
+
+               return $keys;
+       }
+}
index 16fe72d..be0751f 100644 (file)
@@ -240,6 +240,15 @@ class LocalFile extends File {
                return $this->repo->getSharedCacheKey( 'file', sha1( $this->getName() ) );
        }
 
+       /**
+        * @param WANObjectCache $cache
+        * @return string[]
+        * @since 1.28
+        */
+       public function getMutableCacheKeys( WANObjectCache $cache ) {
+               return [ $this->getCacheKey() ];
+       }
+
        /**
         * Try to load file metadata from memcached, falling back to the database
         */
index 171c291..75c79a9 100644 (file)
@@ -1127,6 +1127,65 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
                return $values;
        }
 
+       /**
+        * Locally set a key to expire soon if it is stale based on $purgeTimestamp
+        *
+        * This sets stale keys' time-to-live at HOLDOFF_TTL seconds, which both avoids
+        * broadcasting in mcrouter setups and also avoids races with new tombstones.
+        *
+        * @param string $key Cache key
+        * @param int $purgeTimestamp UNIX timestamp of purge
+        * @param bool &$isStale Whether the key is stale
+        * @return bool Success
+        * @since 1.28
+        */
+       public function reap( $key, $purgeTimestamp, &$isStale = false ) {
+               $minAsOf = $purgeTimestamp + self::HOLDOFF_TTL;
+               $wrapped = $this->cache->get( self::VALUE_KEY_PREFIX . $key );
+               if ( is_array( $wrapped ) && $wrapped[self::FLD_TIME] < $minAsOf ) {
+                       $isStale = true;
+                       $this->logger->warning( "Reaping stale value key '$key'." );
+                       $ttlReap = self::HOLDOFF_TTL; // avoids races with tombstone creation
+                       $ok = $this->cache->changeTTL( self::VALUE_KEY_PREFIX . $key, $ttlReap );
+                       if ( !$ok ) {
+                               $this->logger->error( "Could not complete reap of key '$key'." );
+                       }
+
+                       return $ok;
+               }
+
+               $isStale = false;
+
+               return true;
+       }
+
+       /**
+        * Locally set a "check" key to expire soon if it is stale based on $purgeTimestamp
+        *
+        * @param string $key Cache key
+        * @param int $purgeTimestamp UNIX timestamp of purge
+        * @param bool &$isStale Whether the key is stale
+        * @return bool Success
+        * @since 1.28
+        */
+       public function reapCheckKey( $key, $purgeTimestamp, &$isStale = false ) {
+               $purge = $this->parsePurgeValue( $this->cache->get( self::TIME_KEY_PREFIX . $key ) );
+               if ( $purge && $purge[self::FLD_TIME] < $purgeTimestamp ) {
+                       $isStale = true;
+                       $this->logger->warning( "Reaping stale check key '$key'." );
+                       $ok = $this->cache->changeTTL( self::TIME_KEY_PREFIX . $key, 1 );
+                       if ( !$ok ) {
+                               $this->logger->error( "Could not complete reap of check key '$key'." );
+                       }
+
+                       return $ok;
+               }
+
+               $isStale = false;
+
+               return false;
+       }
+
        /**
         * @see BagOStuff::makeKey()
         * @param string ... Key component
diff --git a/includes/libs/objectcache/WANObjectCacheReaper.php b/includes/libs/objectcache/WANObjectCacheReaper.php
new file mode 100644 (file)
index 0000000..62e4536
--- /dev/null
@@ -0,0 +1,204 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Cache
+ * @author Aaron Schulz
+ */
+
+use Psr\Log\LoggerAwareInterface;
+use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
+
+/**
+ * Class for scanning through chronological, log-structured data or change logs
+ * and locally purging cache keys related to entities that appear in this data.
+ *
+ * This is useful for repairing cache when purges are missed by using a reliable
+ * stream, such as Kafka or a replicated MySQL table. Purge loss between datacenters
+ * is expected to be more common than within them.
+ *
+ * @since 1.28
+ */
+class WANObjectCacheReaper implements LoggerAwareInterface {
+       /** @var WANObjectCache */
+       protected $cache;
+       /** @var BagOStuff */
+       protected $store;
+       /** @var callable */
+       protected $logChunkCallback;
+       /** @var callable */
+       protected $keyListCallback;
+       /** @var LoggerInterface */
+       protected $logger;
+
+       /** @var string */
+       protected $channel;
+       /** @var integer */
+       protected $initialStartWindow;
+
+       /**
+        * @param WANObjectCache $cache Cache to reap bad keys from
+        * @param BagOStuff $store Cache to store positions use for locking
+        * @param callable $logCallback Callback taking arguments:
+        *          - The starting position as a UNIX timestamp
+        *          - The starting unique ID used for breaking timestamp collisions or null
+        *          - The ending position as a UNIX timestamp
+        *          - The maximum number of results to return
+        *        It returns a list of maps of (key: cache key, pos: UNIX timestamp, id: unique ID)
+        *        for each key affected, with the corrosponding event timestamp/ID information.
+        *        The events should be in ascending order, by (timestamp,id).
+        * @param callable $keyCallback Callback taking arguments:
+        *          - The WANObjectCache instance
+        *          - An object from the event log
+        *        It should return a list of WAN cache keys.
+        *        The callback must fully duck-type test the object, since can be any model class.
+        * @param array $params Additional options:
+        *          - channel: the name of the update event stream.
+        *            Default: WANObjectCache::DEFAULT_PURGE_CHANNEL.
+        *          - initialStartWindow: seconds back in time to start if the position is lost.
+        *            Default: 1 hour.
+        *          - logger: an SPL monolog instance [optional]
+        */
+       public function __construct(
+               WANObjectCache $cache,
+               BagOStuff $store,
+               callable $logCallback,
+               callable $keyCallback,
+               array $params
+       ) {
+               $this->cache = $cache;
+               $this->store = $store;
+
+               $this->logChunkCallback = $logCallback;
+               $this->keyListCallback = $keyCallback;
+               if ( isset( $params['channel'] ) ) {
+                       $this->channel = $params['channel'];
+               } else {
+                       throw new UnexpectedValueException( "No channel specified." );
+               }
+
+               $this->initialStartWindow = isset( $params['initialStartWindow'] )
+                       ? $params['initialStartWindow']
+                       : 3600;
+               $this->logger = isset( $params['logger'] )
+                       ? $params['logger']
+                       : new NullLogger();
+       }
+
+       public function setLogger( LoggerInterface $logger ) {
+               $this->logger = $logger;
+       }
+
+       /**
+        * Check and reap stale keys based on a chunk of events
+        *
+        * @param int $n Number of events
+        * @return int Number of keys checked
+        */
+       final public function invoke( $n = 100 ) {
+               $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
+               $scopeLock = $this->store->getScopedLock( "$posKey:busy", 0 );
+               if ( !$scopeLock ) {
+                       return 0;
+               }
+
+               $now = time();
+               $status = $this->store->get( $posKey );
+               if ( !$status ) {
+                       $status = [ 'pos' => $now - $this->initialStartWindow, 'id' => null ];
+               }
+
+               // Get events for entities who's keys tombstones/hold-off should have expired by now
+               $events = call_user_func_array(
+                       $this->logChunkCallback,
+                       [ $status['pos'], $status['id'], $now - WANObjectCache::HOLDOFF_TTL - 1, $n ]
+               );
+
+               $event = null;
+               $keyEvents = [];
+               foreach ( $events as $event ) {
+                       $keys = call_user_func_array(
+                               $this->keyListCallback,
+                               [ $this->cache, $event['item'] ]
+                       );
+                       foreach ( $keys as $key ) {
+                               unset( $keyEvents[$key] ); // use only the latest per key
+                               $keyEvents[$key] = [
+                                       'pos' => $event['pos'],
+                                       'id' => $event['id']
+                               ];
+                       }
+               }
+
+               $purgeCount = 0;
+               $lastOkEvent = null;
+               foreach ( $keyEvents as $key => $keyEvent ) {
+                       if ( !$this->cache->reap( $key, $keyEvent['pos'] ) ) {
+                               break;
+                       }
+                       ++$purgeCount;
+                       $lastOkEvent = $event;
+               }
+
+               if ( $lastOkEvent ) {
+                       $ok = $this->store->merge(
+                               $posKey,
+                               function ( $bag, $key, $curValue ) use ( $lastOkEvent ) {
+                                       if ( !$curValue ) {
+                                               // Use new position
+                                       } else {
+                                               $curCoord = [ $curValue['pos'], $curValue['id'] ];
+                                               $newCoord = [ $lastOkEvent['pos'], $lastOkEvent['id'] ];
+                                               if ( $newCoord < $curCoord ) {
+                                                       // Keep prior position instead of rolling it back
+                                                       return $curValue;
+                                               }
+                                       }
+
+                                       return [
+                                               'pos' => $lastOkEvent['pos'],
+                                               'id' => $lastOkEvent['id'],
+                                               'ctime' => $curValue ? $curValue['ctime'] : date( 'c' )
+                                       ];
+                               },
+                               IExpiringStore::TTL_INDEFINITE
+                       );
+
+                       $pos = $lastOkEvent['pos'];
+                       $id = $lastOkEvent['id'];
+                       if ( $ok ) {
+                               $this->logger->info( "Updated cache reap position ($pos, $id)." );
+                       } else {
+                               $this->logger->error( "Could not update cache reap position ($pos, $id)." );
+                       }
+               }
+
+               ScopedCallback::consume( $scopeLock );
+
+               return $purgeCount;
+       }
+
+       /**
+        * @return array|bool Returns (pos, id) map or false if not set
+        */
+       public function getState() {
+               $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
+
+               return $this->store->get( $posKey );
+       }
+}
index ab95eea..1c1412a 100644 (file)
@@ -3652,4 +3652,15 @@ class WikiPage implements Page, IDBAccessObject {
        public function getSourceURL() {
                return $this->getTitle()->getCanonicalURL();
        }
+
+       /*
+        * @param WANObjectCache $cache
+        * @return string[]
+        * @since 1.28
+        */
+       public function getMutableCacheKeys( WANObjectCache $cache ) {
+               $linkCache = MediaWikiServices::getInstance()->getLinkCache();
+
+               return $linkCache->getMutableCacheKeys( $cache, $this->getTitle()->getTitleValue() );
+       }
 }
index 00efeae..00439a1 100644 (file)
@@ -20,6 +20,7 @@
  * @file
  * @ingroup SpecialPage
  */
+use MediaWiki\Logger\LoggerFactory;
 
 /**
  * Special page which uses a ChangesList to show query results.
@@ -77,6 +78,14 @@ abstract class ChangesListSpecialPage extends SpecialPage {
                $this->webOutput( $rows, $opts );
 
                $rows->free();
+
+               if ( $this->getConfig()->get( 'EnableWANCacheReaper' ) ) {
+                       // Clean up any bad page entries for titles showing up in RC
+                       DeferredUpdates::addUpdate( new WANCacheReapUpdate(
+                               $this->getDB(),
+                               LoggerFactory::getInstance( 'objectcache' )
+                       ) );
+               }
        }
 
        /**
index fed64c2..6763ec1 100644 (file)
@@ -468,6 +468,17 @@ class User implements IDBAccessObject {
                return $cache->makeGlobalKey( 'user', 'id', wfWikiID(), $this->mId );
        }
 
+       /**
+        * @param WANObjectCache $cache
+        * @return string[]
+        * @since 1.28
+        */
+       public function getMutableCacheKeys( WANObjectCache $cache ) {
+               $id = $this->getId();
+
+               return $id ? [ $this->getCacheKey( $cache ) ] : [];
+       }
+
        /**
         * Load user data from shared cache, given mId has already been set.
         *
index aa46c96..d7ed4bd 100644 (file)
@@ -872,6 +872,62 @@ class WANObjectCacheTest extends PHPUnit_Framework_TestCase  {
                $this->assertGreaterThan( -5.1, $curTTL, "Correct CTL" );
        }
 
+       /**
+        * @covers WANObjectCache::reap()
+        * @covers WANObjectCache::reapCheckKey()
+        */
+       public function testReap() {
+               $vKey1 = wfRandomString();
+               $vKey2 = wfRandomString();
+               $tKey1 = wfRandomString();
+               $tKey2 = wfRandomString();
+               $value = 'moo';
+
+               $knownPurge = time() - 60;
+               $goodTime = microtime( true ) - 5;
+               $badTime = microtime( true ) - 300;
+
+               $this->internalCache->set(
+                       WANObjectCache::VALUE_KEY_PREFIX . $vKey1,
+                       [
+                               WANObjectCache::FLD_VERSION => WANObjectCache::VERSION,
+                               WANObjectCache::FLD_VALUE => $value,
+                               WANObjectCache::FLD_TTL => 3600,
+                               WANObjectCache::FLD_TIME => $goodTime
+                       ]
+               );
+               $this->internalCache->set(
+                       WANObjectCache::VALUE_KEY_PREFIX . $vKey2,
+                       [
+                               WANObjectCache::FLD_VERSION => WANObjectCache::VERSION,
+                               WANObjectCache::FLD_VALUE => $value,
+                               WANObjectCache::FLD_TTL => 3600,
+                               WANObjectCache::FLD_TIME => $badTime
+                       ]
+               );
+               $this->internalCache->set(
+                       WANObjectCache::TIME_KEY_PREFIX . $tKey1,
+                       WANObjectCache::PURGE_VAL_PREFIX . $goodTime
+               );
+               $this->internalCache->set(
+                       WANObjectCache::TIME_KEY_PREFIX . $tKey2,
+                       WANObjectCache::PURGE_VAL_PREFIX . $badTime
+               );
+
+               $this->assertEquals( $value, $this->cache->get( $vKey1 ) );
+               $this->assertEquals( $value, $this->cache->get( $vKey2 ) );
+               $this->cache->reap( $vKey1, $knownPurge, $bad1 );
+               $this->cache->reap( $vKey2, $knownPurge, $bad2 );
+
+               $this->assertFalse( $bad1 );
+               $this->assertTrue( $bad2 );
+
+               $this->cache->reapCheckKey( $tKey1, $knownPurge, $tBad1 );
+               $this->cache->reapCheckKey( $tKey2, $knownPurge, $tBad2 );
+               $this->assertFalse( $tBad1 );
+               $this->assertTrue( $tBad2 );
+       }
+
        /**
         * @covers WANObjectCache::set()
         */
@@ -926,6 +982,8 @@ class WANObjectCacheTest extends PHPUnit_Framework_TestCase  {
                $wanCache->getMulti( [ 'x', 'y' ], $ctls, [ 'check2' ] );
                $wanCache->getWithSetCallback( 'p', 30, $valFunc );
                $wanCache->getCheckKeyTime( 'zzz' );
+               $wanCache->reap( 'x', time() - 300 );
+               $wanCache->reap( 'zzz', time() - 300 );
        }
 
        /**