objectcache: add mcrouter support to WANObjectCache
authorAaron Schulz <aschulz@wikimedia.org>
Fri, 12 Aug 2016 02:27:50 +0000 (19:27 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Thu, 18 Aug 2016 22:42:11 +0000 (22:42 +0000)
* Update documentation about relay methods.
* Change interim key set() to add() to avoid broadcasting it.
* Remove the behavior of doing purges synchronously in the
  local DC first before relay. In both the event relayer and
  mcrouter case, they will be asynchronous. It was hardly
  even possible to use such behavior since loads come from
  slave DBs, which do not see changes right after COMMIT.

Bug: T97562
Change-Id: I7759c82ae6e1b72fc227882a99c9a712a46374f6

includes/libs/objectcache/EmptyBagOStuff.php
includes/libs/objectcache/WANObjectCache.php
tests/phpunit/includes/libs/objectcache/WANObjectCacheTest.php

index 408212a..3f66c06 100644 (file)
@@ -31,6 +31,10 @@ class EmptyBagOStuff extends BagOStuff {
                return false;
        }
 
+       public function add( $key, $value, $exp = 0 ) {
+               return true;
+       }
+
        public function set( $key, $value, $exp = 0, $flags = 0 ) {
                return true;
        }
index 4fd40e2..e3d50c6 100644 (file)
@@ -43,16 +43,22 @@ use Psr\Log\NullLogger;
  *
  * The simplest purge method is delete().
  *
- * Instances of this class must be configured to point to a valid
- * PubSub endpoint, and there must be listeners on the cache servers
- * that subscribe to the endpoint and update the caches.
+ * There are two supported ways to handle broadcasted operations:
+ *   - a) Configure the 'purge' EventRelayer to point to a valid PubSub endpoint
+ *        that has subscribed listeners on the cache servers applying the cache updates.
+ *   - b) Ignore the 'purge' EventRelayer configuration (default is NullEventRelayer)
+ *        and set up mcrouter as the underlying cache backend. Using OperationSelectorRoute,
+ *        configure 'set' and 'delete' operations to go to all DCs via AllAsyncRoute and
+ *        configure other operations to go to the local DC via PoolRoute (for reference,
+ *        see https://github.com/facebook/mcrouter/wiki/List-of-Route-Handles).
  *
- * Broadcasted operations like delete() and touchCheckKey() are done
- * synchronously in the local datacenter, but are relayed asynchronously.
- * This means that callers in other datacenters will see older values
- * for however many milliseconds the datacenters are apart. As with
- * any cache, this should not be relied on for cases where reads are
- * used to determine writes to source (e.g. non-cache) data stores.
+ * Broadcasted operations like delete() and touchCheckKey() are done asynchronously
+ * in all datacenters this way, though the local one should likely be near immediate.
+ *
+ * This means that callers in all datacenters may see older values for however many
+ * milliseconds the the purge took to reach that datacenter. As with any cache, this
+ * should not be relied on for cases where reads are used to determine writes to source
+ * (e.g. non-cache) data stores.
  *
  * All values are wrapped in metadata arrays. Keys use a "WANCache:" prefix
  * to avoid collisions with keys that are not wrapped as metadata arrays. The
@@ -60,6 +66,7 @@ use Psr\Log\NullLogger;
  *   - a) "WANCache:v" : used for regular value keys
  *   - b) "WANCache:i" : used for temporarily storing values of tombstoned keys
  *   - c) "WANCache:t" : used for storing timestamp "check" keys
+ *   - d) "WANCache:m" : used for temporary mutex keys to avoid cache stampedes
  *
  * @ingroup Cache
  * @since 1.26
@@ -129,6 +136,7 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
        const VALUE_KEY_PREFIX = 'WANCache:v:';
        const INTERIM_KEY_PREFIX = 'WANCache:i:';
        const TIME_KEY_PREFIX = 'WANCache:t:';
+       const MUTEX_KEY_PREFIX = 'WANCache:m:';
 
        const PURGE_VAL_PREFIX = 'PURGED:';
 
@@ -456,8 +464,8 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
         *
         * When using potentially long-running ACID transactions, a good pattern is
         * to use a pre-commit hook to issue the delete. This means that immediately
-        * after commit, callers will see the tombstone in cache in the local datacenter
-        * and in the others upon relay. It also avoids the following race condition:
+        * after commit, callers will see the tombstone in cache upon purge relay.
+        * It also avoids the following race condition:
         *   - a) T1 begins, changes a row, and calls delete()
         *   - b) The HOLDOFF_TTL passes, expiring the delete() tombstone
         *   - c) T2 starts, reads the row and calls set() due to a cache miss
@@ -495,18 +503,11 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
                $key = self::VALUE_KEY_PREFIX . $key;
 
                if ( $ttl <= 0 ) {
-                       // Update the local datacenter immediately
-                       $ok = $this->cache->delete( $key );
                        // Publish the purge to all datacenters
-                       $ok = $this->relayDelete( $key ) && $ok;
+                       $ok = $this->relayDelete( $key );
                } else {
-                       // Update the local datacenter immediately
-                       $ok = $this->cache->set( $key,
-                               $this->makePurgeValue( microtime( true ), self::HOLDOFF_NONE ),
-                               $ttl
-                       );
                        // Publish the purge to all datacenters
-                       $ok = $this->relayPurge( $key, $ttl, self::HOLDOFF_NONE ) && $ok;
+                       $ok = $this->relayPurge( $key, $ttl, self::HOLDOFF_NONE );
                }
 
                return $ok;
@@ -559,8 +560,9 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
         * keys, the relevant "check" keys must be supplied for this to work.
         *
         * The "check" key essentially represents a last-modified field.
-        * When touched, keys using it via get(), getMulti(), or getWithSetCallback()
-        * will be invalidated. It is treated as being HOLDOFF_TTL seconds in the future
+        * When touched, the field will be updated on all cache servers.
+        * Keys using it via get(), getMulti(), or getWithSetCallback() will
+        * be invalidated. It is treated as being HOLDOFF_TTL seconds in the future
         * by those methods to avoid race conditions where dependent keys get updated
         * with stale values (e.g. from a DB slave).
         *
@@ -569,7 +571,8 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
         * When a few important keys get a large number of hits, a high cache
         * time is usually desired as well as "lockTSE" logic. The resetCheckKey()
         * method is less appropriate in such cases since the "time since expiry"
-        * cannot be inferred.
+        * cannot be inferred, causing any get() after the reset to treat the key
+        * as being "hot", resulting in more stale value usage.
         *
         * Note that "check" keys won't collide with other regular keys.
         *
@@ -582,14 +585,8 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
         * @return bool True if the item was purged or not found, false on failure
         */
        final public function touchCheckKey( $key, $holdoff = self::HOLDOFF_TTL ) {
-               $key = self::TIME_KEY_PREFIX . $key;
-               // Update the local datacenter immediately
-               $ok = $this->cache->set( $key,
-                       $this->makePurgeValue( microtime( true ), $holdoff ),
-                       self::CHECK_KEY_TTL
-               );
                // Publish the purge to all datacenters
-               return $this->relayPurge( $key, self::CHECK_KEY_TTL, $holdoff ) && $ok;
+               return $this->relayPurge( self::TIME_KEY_PREFIX . $key, self::CHECK_KEY_TTL, $holdoff );
        }
 
        /**
@@ -597,11 +594,14 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
         *
         * This is similar to touchCheckKey() in that keys using it via get(), getMulti(),
         * or getWithSetCallback() will be invalidated. The differences are:
-        *   - a) The timestamp will be deleted from all caches and lazily
+        *   - a) The "check" key will be deleted from all caches and lazily
         *        re-initialized when accessed (rather than set everywhere)
         *   - b) Thus, dependent keys will be known to be invalid, but not
         *        for how long (they are treated as "just" purged), which
         *        effects any lockTSE logic in getWithSetCallback()
+        *   - c) Since "check" keys are initialized only on the server the key hashes
+        *        to, any temporary ejection of that server will cause the value to be
+        *        seen as purged as a new server will initialize the "check" key.
         *
         * The advantage is that this does not place high TTL keys on every cache
         * server, making it better for code that will cache many different keys
@@ -620,11 +620,8 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
         * @return bool True if the item was purged or not found, false on failure
         */
        final public function resetCheckKey( $key ) {
-               $key = self::TIME_KEY_PREFIX . $key;
-               // Update the local datacenter immediately
-               $ok = $this->cache->delete( $key );
                // Publish the purge to all datacenters
-               return $this->relayDelete( $key ) && $ok;
+               return $this->relayDelete( self::TIME_KEY_PREFIX . $key );
        }
 
        /**
@@ -902,7 +899,7 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
                $lockAcquired = false;
                if ( $useMutex ) {
                        // Acquire a datacenter-local non-blocking lock
-                       if ( $this->cache->lock( $key, 0, self::LOCK_TTL ) ) {
+                       if ( $this->cache->add( self::MUTEX_KEY_PREFIX . $key, 1, self::LOCK_TTL ) ) {
                                // Lock acquired; this thread should update the key
                                $lockAcquired = true;
                        } elseif ( $value !== false && $this->isValid( $value, $versioned, $asOf, $minTime ) ) {
@@ -939,11 +936,15 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
                if ( ( $isTombstone && $lockTSE > 0 ) && $value !== false && $ttl >= 0 ) {
                        $tempTTL = max( 1, (int)$lockTSE ); // set() expects seconds
                        $wrapped = $this->wrap( $value, $tempTTL, $asOf );
-                       $this->cache->set( self::INTERIM_KEY_PREFIX . $key, $wrapped, $tempTTL );
-               }
-
-               if ( $lockAcquired ) {
-                       $this->cache->unlock( $key );
+                       // Avoid using set() to avoid pointless mcrouter broadcasting
+                       $this->cache->merge(
+                               self::INTERIM_KEY_PREFIX . $key,
+                               function () use ( $wrapped ) {
+                                       return $wrapped;
+                               },
+                               $tempTTL,
+                               1
+                       );
                }
 
                if ( $value !== false && $ttl >= 0 ) {
@@ -952,6 +953,11 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
                        $this->set( $key, $value, $ttl, $setOpts );
                }
 
+               if ( $lockAcquired ) {
+                       // Avoid using delete() to avoid pointless mcrouter broadcasting
+                       $this->cache->changeTTL( self::MUTEX_KEY_PREFIX . $key, 1 );
+               }
+
                return $value;
        }
 
@@ -1039,17 +1045,25 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
         * @return bool Success
         */
        protected function relayPurge( $key, $ttl, $holdoff ) {
-               $event = $this->cache->modifySimpleRelayEvent( [
-                       'cmd' => 'set',
-                       'key' => $key,
-                       'val' => 'PURGED:$UNIXTIME$:' . (int)$holdoff,
-                       'ttl' => max( $ttl, 1 ),
-                       'sbt' => true, // substitute $UNIXTIME$ with actual microtime
-               ] );
-
-               $ok = $this->purgeRelayer->notify( $this->purgeChannel, $event );
-               if ( !$ok ) {
-                       $this->lastRelayError = self::ERR_RELAY;
+               if ( $this->purgeRelayer instanceof EventRelayerNull ) {
+                       // This handles the mcrouter and the single-DC case
+                       $ok = $this->cache->set( $key,
+                               $this->makePurgeValue( microtime( true ), self::HOLDOFF_NONE ),
+                               $ttl
+                       );
+               } else {
+                       $event = $this->cache->modifySimpleRelayEvent( [
+                               'cmd' => 'set',
+                               'key' => $key,
+                               'val' => 'PURGED:$UNIXTIME$:' . (int)$holdoff,
+                               'ttl' => max( $ttl, 1 ),
+                               'sbt' => true, // substitute $UNIXTIME$ with actual microtime
+                       ] );
+
+                       $ok = $this->purgeRelayer->notify( $this->purgeChannel, $event );
+                       if ( !$ok ) {
+                               $this->lastRelayError = self::ERR_RELAY;
+                       }
                }
 
                return $ok;
@@ -1062,14 +1076,19 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
         * @return bool Success
         */
        protected function relayDelete( $key ) {
-               $event = $this->cache->modifySimpleRelayEvent( [
-                       'cmd' => 'delete',
-                       'key' => $key,
-               ] );
-
-               $ok = $this->purgeRelayer->notify( $this->purgeChannel, $event );
-               if ( !$ok ) {
-                       $this->lastRelayError = self::ERR_RELAY;
+               if ( $this->purgeRelayer instanceof EventRelayerNull ) {
+                       // This handles the mcrouter and the single-DC case
+                       $ok = $this->cache->delete( $key );
+               } else {
+                       $event = $this->cache->modifySimpleRelayEvent( [
+                               'cmd' => 'delete',
+                               'key' => $key,
+                       ] );
+
+                       $ok = $this->purgeRelayer->notify( $this->purgeChannel, $event );
+                       if ( !$ok ) {
+                               $this->lastRelayError = self::ERR_RELAY;
+                       }
                }
 
                return $ok;
index 6a3cd15..35005f5 100644 (file)
@@ -206,8 +206,10 @@ class WANObjectCacheTest extends MediaWikiTestCase {
                $value = wfRandomString();
 
                $calls = 0;
-               $func = function() use ( &$calls, $value ) {
+               $func = function() use ( &$calls, $value, $cache, $key ) {
                        ++$calls;
+                       // Immediately kill any mutex rather than waiting a second
+                       $cache->delete( $cache::MUTEX_KEY_PREFIX . $key );
                        return $value;
                };
 
@@ -216,7 +218,7 @@ class WANObjectCacheTest extends MediaWikiTestCase {
                $this->assertEquals( 1, $calls, 'Value was populated' );
 
                // Acquire a lock to verify that getWithSetCallback uses lockTSE properly
-               $this->internalCache->lock( $key, 0 );
+               $this->internalCache->add( $cache::MUTEX_KEY_PREFIX . $key, 1, 0 );
 
                $checkKeys = [ wfRandomString() ]; // new check keys => force misses
                $ret = $cache->getWithSetCallback( $key, 30, $func,
@@ -246,9 +248,11 @@ class WANObjectCacheTest extends MediaWikiTestCase {
                $value = wfRandomString();
 
                $calls = 0;
-               $func = function( $oldValue, &$ttl, &$setOpts ) use ( &$calls, $value ) {
+               $func = function( $oldValue, &$ttl, &$setOpts ) use ( &$calls, $value, $cache, $key ) {
                        ++$calls;
                        $setOpts['since'] = microtime( true ) - 10;
+                       // Immediately kill any mutex rather than waiting a second
+                       $cache->delete( $cache::MUTEX_KEY_PREFIX . $key );
                        return $value;
                };
 
@@ -261,7 +265,7 @@ class WANObjectCacheTest extends MediaWikiTestCase {
                $this->assertEquals( 1, $calls, 'Value was generated' );
 
                // Acquire a lock to verify that getWithSetCallback uses lockTSE properly
-               $this->internalCache->lock( $key, 0 );
+               $this->internalCache->add( $cache::MUTEX_KEY_PREFIX . $key, 1, 0 );
                $ret = $cache->getWithSetCallback( $key, 30, $func, [ 'lockTSE' => 5 ] );
                $this->assertEquals( $value, $ret );
                $this->assertEquals( 1, $calls, 'Callback was not used' );
@@ -278,8 +282,10 @@ class WANObjectCacheTest extends MediaWikiTestCase {
                $busyValue = wfRandomString();
 
                $calls = 0;
-               $func = function() use ( &$calls, $value ) {
+               $func = function() use ( &$calls, $value, $cache, $key ) {
                        ++$calls;
+                       // Immediately kill any mutex rather than waiting a second
+                       $cache->delete( $cache::MUTEX_KEY_PREFIX . $key );
                        return $value;
                };
 
@@ -288,7 +294,7 @@ class WANObjectCacheTest extends MediaWikiTestCase {
                $this->assertEquals( 1, $calls, 'Value was populated' );
 
                // Acquire a lock to verify that getWithSetCallback uses busyValue properly
-               $this->internalCache->lock( $key, 0 );
+               $this->internalCache->add( $cache::MUTEX_KEY_PREFIX . $key, 1, 0 );
 
                $checkKeys = [ wfRandomString() ]; // new check keys => force misses
                $ret = $cache->getWithSetCallback( $key, 30, $func,
@@ -307,13 +313,13 @@ class WANObjectCacheTest extends MediaWikiTestCase {
                $this->assertEquals( $busyValue, $ret, 'Callback was not used; used busy value' );
                $this->assertEquals( 2, $calls, 'Callback was not used; used busy value' );
 
-               $this->internalCache->unlock( $key );
+               $this->internalCache->delete( $cache::MUTEX_KEY_PREFIX . $key );
                $ret = $cache->getWithSetCallback( $key, 30, $func,
                        [ 'lockTSE' => 30, 'busyValue' => $busyValue, 'checkKeys' => $checkKeys ] );
                $this->assertEquals( $value, $ret, 'Callback was used; saved interim' );
                $this->assertEquals( 3, $calls, 'Callback was used; saved interim' );
 
-               $this->internalCache->lock( $key, 0 );
+               $this->internalCache->add( $cache::MUTEX_KEY_PREFIX . $key, 1, 0 );
                $ret = $cache->getWithSetCallback( $key, 30, $func,
                        [ 'busyValue' => $busyValue, 'checkKeys' => $checkKeys ] );
                $this->assertEquals( $value, $ret, 'Callback was not used; used interim' );
@@ -694,4 +700,26 @@ class WANObjectCacheTest extends MediaWikiTestCase {
                $this->cache->set( $key, $value, 30, $opts );
                $this->assertEquals( false, $this->cache->get( $key ), "Pending value not written." );
        }
+
+       public function testMcRouterSupport() {
+               $localBag = $this->getMock( 'EmptyBagOStuff', [ 'set', 'delete' ] );
+               $localBag->expects( $this->never() )->method( 'set' );
+               $localBag->expects( $this->never() )->method( 'delete' );
+               $wanCache = new WANObjectCache( [
+                       'cache' => $localBag,
+                       'pool' => 'testcache-hash',
+                       'relayer' => new EventRelayerNull( [] )
+               ] );
+               $valFunc = function () {
+                       return 1;
+               };
+
+               // None of these should use broadcasting commands (e.g. SET, DELETE)
+               $wanCache->get( 'x' );
+               $wanCache->get( 'x', $ctl, [ 'check1' ] );
+               $wanCache->getMulti( [ 'x', 'y' ] );
+               $wanCache->getMulti( [ 'x', 'y' ], $ctls, [ 'check2' ] );
+               $wanCache->getWithSetCallback( 'p', 30, $valFunc );
+               $wanCache->getCheckKeyTime( 'zzz' );
+       }
 }