*
* @file
* @ingroup Cache
- * @author Aaron Schulz
*/
use Psr\Log\LoggerAwareInterface;
*
* The simplest purge method is delete().
*
- * There are two supported ways to handle broadcasted operations:
+ * There are three supported ways to handle broadcasted operations:
* - a) Configure the 'purge' EventRelayer to point to a valid PubSub endpoint
- * that has subscribed listeners on the cache servers applying the cache updates.
+ * that has subscribed listeners on the cache servers applying the cache updates.
* - b) Ignore the 'purge' EventRelayer configuration (default is NullEventRelayer)
- * and set up mcrouter as the underlying cache backend, using one of the memcached
- * BagOStuff classes as 'cache'. Use OperationSelectorRoute in the mcrouter settings
- * to configure 'set' and 'delete' operations to go to all DCs via AllAsyncRoute and
- * configure other operations to go to the local DC via PoolRoute (for reference,
- * see https://github.com/facebook/mcrouter/wiki/List-of-Route-Handles).
+ * and set up mcrouter as the underlying cache backend, using one of the memcached
+ * BagOStuff classes as 'cache'. Use OperationSelectorRoute in the mcrouter settings
+ * to configure 'set' and 'delete' operations to go to all DCs via AllAsyncRoute and
+ * configure other operations to go to the local DC via PoolRoute (for reference,
+ * see https://github.com/facebook/mcrouter/wiki/List-of-Route-Handles).
+ * - c) Ignore the 'purge' EventRelayer configuration (default is NullEventRelayer)
+ * and set up dynomite as cache middleware between the web servers and either
+ * memcached or redis. This will also broadcast all key setting operations, not just purges,
+ * which can be useful for cache warming. Writes are eventually consistent via the
+ * Dynamo replication model (see https://github.com/Netflix/dynomite).
*
* Broadcasted operations like delete() and touchCheckKey() are done asynchronously
* in all datacenters this way, though the local one should likely be near immediate.
private $callbackDepth = 0;
/** @var mixed[] Temporary warm-up cache */
private $warmupCache = [];
+ /** @var integer Key fetched */
+ private $warmupKeyMisses = 0;
/** Max time expected to pass between delete() and DB commit finishing */
const MAX_COMMIT_DELAY = 3;
public static function newEmpty() {
return new self( [
'cache' => new EmptyBagOStuff(),
- 'pool' => 'empty',
- 'relayer' => new EventRelayerNull( [] )
+ 'pool' => 'empty'
] );
}
* @param array $checkKeys List of check keys to apply to all $keys. May also apply "check"
* keys to specific cache keys only by using cache keys as keys in the $checkKeys array.
* @param float[] &$asOfs Map of (key => UNIX timestamp of cached value; null on failure)
- * @return array Map of (key => value) for keys that exist
+ * @return array Map of (key => value) for keys that exist and are not tombstoned
*/
final public function getMulti(
array $keys, &$curTTLs = [], array $checkKeys = [], array &$asOfs = []
$checkKeysForAll = [];
$checkKeysByKey = [];
$checkKeysFlat = [];
- foreach ( $checkKeys as $i => $keys ) {
- $prefixed = self::prefixCacheKeys( (array)$keys, self::TIME_KEY_PREFIX );
+ foreach ( $checkKeys as $i => $checkKeyGroup ) {
+ $prefixed = self::prefixCacheKeys( (array)$checkKeyGroup, self::TIME_KEY_PREFIX );
$checkKeysFlat = array_merge( $checkKeysFlat, $prefixed );
// Is this check keys for a specific cache key, or for all keys being fetched?
if ( is_int( $i ) ) {
if ( $this->warmupCache ) {
$wrappedValues = array_intersect_key( $this->warmupCache, array_flip( $keysGet ) );
$keysGet = array_diff( $keysGet, array_keys( $wrappedValues ) ); // keys left to fetch
+ $this->warmupKeyMisses += count( $keysGet );
} else {
$wrappedValues = [];
}
- $wrappedValues += $this->cache->getMulti( $keysGet );
+ if ( $keysGet ) {
+ $wrappedValues += $this->cache->getMulti( $keysGet );
+ }
// Time used to compare/init "check" keys (derived after getMulti() to be pessimistic)
$now = microtime( true );
$wrapExtra[self::FLD_FLAGS] = self::FLG_STALE; // mark as stale
// Case B: any long-running transaction; ignore this set()
} elseif ( $age > self::MAX_READ_LAG ) {
- $this->logger->warning( "Rejected set() for $key due to snapshot lag." );
+ $this->logger->info( "Rejected set() for $key due to snapshot lag." );
return true; // no-op the write for being unsafe
// Case C: high replication lag; lower TTL instead of ignoring all set()s
$this->logger->warning( "Lowered set() TTL for $key due to replication lag." );
// Case D: medium length request with medium replication lag; ignore this set()
} else {
- $this->logger->warning( "Rejected set() for $key due to high read lag." );
+ $this->logger->info( "Rejected set() for $key due to high read lag." );
return true; // no-op the write for being unsafe
}
}
/**
- * Method to fetch/regenerate multiple cache keys at once
+ * Method to fetch multiple cache keys at once with regeneration
*
* This works the same as getWithSetCallback() except:
* - a) The $keys argument expects the result of WANObjectCache::makeMultiKeys()
* - c) The return value is a map of (cache key => value) in the order of $keyedIds
*
* @see WANObjectCache::getWithSetCallback()
+ * @see WANObjectCache::getMultiWithUnionSetCallback()
*
* Example usage:
* @code
* // Time-to-live (in seconds)
* $cache::TTL_DAY,
* // Function that derives the new key value
- * return function ( $id, $oldValue, &$ttl, array &$setOpts ) {
+ * function ( $id, $oldValue, &$ttl, array &$setOpts ) {
* $dbr = wfGetDB( DB_REPLICA );
* // Account for any snapshot/replica DB lag
* $setOpts += Database::getCacheSetOptions( $dbr );
final public function getMultiWithSetCallback(
ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
) {
- $keysWarmUp = iterator_to_array( $keyedIds, true );
+ $valueKeys = array_keys( $keyedIds->getArrayCopy() );
$checkKeys = isset( $opts['checkKeys'] ) ? $opts['checkKeys'] : [];
- foreach ( $checkKeys as $i => $checkKeyOrKeys ) {
- if ( is_int( $i ) ) {
- $keysWarmUp[] = $checkKeyOrKeys;
- } else {
- $keysWarmUp = array_merge( $keysWarmUp, $checkKeyOrKeys );
+
+ // Load required keys into process cache in one go
+ $this->warmupCache = $this->getRawKeysForWarmup(
+ $this->getNonProcessCachedKeys( $valueKeys, $opts ),
+ $checkKeys
+ );
+ $this->warmupKeyMisses = 0;
+
+ // Wrap $callback to match the getWithSetCallback() format while passing $id to $callback
+ $id = null; // current entity ID
+ $func = function ( $oldValue, &$ttl, &$setOpts, $oldAsOf ) use ( $callback, &$id ) {
+ return $callback( $id, $oldValue, $ttl, $setOpts, $oldAsOf );
+ };
+
+ $values = [];
+ foreach ( $keyedIds as $key => $id ) { // preserve order
+ $values[$key] = $this->getWithSetCallback( $key, $ttl, $func, $opts );
+ }
+
+ $this->warmupCache = [];
+
+ return $values;
+ }
+
+ /**
+ * Method to fetch/regenerate multiple cache keys at once
+ *
+ * This works the same as getWithSetCallback() except:
+ * - a) The $keys argument expects the result of WANObjectCache::makeMultiKeys()
+ * - b) The $callback argument expects a callback returning a map of (ID => new value)
+ * for all entity IDs in $regenById and it takes the following arguments:
+ * - $ids: a list of entity IDs to regenerate
+ * - &$ttls: a reference to the (entity ID => new TTL) map
+ * - &$setOpts: a reference to options for set() which can be altered
+ * - c) The return value is a map of (cache key => value) in the order of $keyedIds
+ * - d) The "lockTSE" and "busyValue" options are ignored
+ *
+ * @see WANObjectCache::getWithSetCallback()
+ * @see WANObjectCache::getMultiWithSetCallback()
+ *
+ * Example usage:
+ * @code
+ * $rows = $cache->getMultiWithUnionSetCallback(
+ * // Map of cache keys to entity IDs
+ * $cache->makeMultiKeys(
+ * $this->fileVersionIds(),
+ * function ( $id, WANObjectCache $cache ) {
+ * return $cache->makeKey( 'file-version', $id );
+ * }
+ * ),
+ * // Time-to-live (in seconds)
+ * $cache::TTL_DAY,
+ * // Function that derives the new key value
+ * function ( array $ids, array &$ttls, array &$setOpts ) {
+ * $dbr = wfGetDB( DB_REPLICA );
+ * // Account for any snapshot/replica DB lag
+ * $setOpts += Database::getCacheSetOptions( $dbr );
+ *
+ * // Load the rows for these files
+ * $rows = [];
+ * $res = $dbr->select( 'file', '*', [ 'id' => $ids ], __METHOD__ );
+ * foreach ( $res as $row ) {
+ * $rows[$row->id] = $row;
+ * $mtime = wfTimestamp( TS_UNIX, $row->timestamp );
+ * $ttls[$row->id] = $this->adaptiveTTL( $mtime, $ttls[$row->id] );
+ * }
+ *
+ * return $rows;
+ * },
+ * ]
+ * );
+ * $files = array_map( [ __CLASS__, 'newFromRow' ], $rows );
+ * @endcode
+ *
+ * @param ArrayIterator $keyedIds Result of WANObjectCache::makeMultiKeys()
+ * @param integer $ttl Seconds to live for key updates
+ * @param callable $callback Callback the yields entity regeneration callbacks
+ * @param array $opts Options map
+ * @return array Map of (cache key => value) in the same order as $keyedIds
+ * @since 1.30
+ */
+ final public function getMultiWithUnionSetCallback(
+ ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
+ ) {
+ $idsByValueKey = $keyedIds->getArrayCopy();
+ $valueKeys = array_keys( $idsByValueKey );
+ $checkKeys = isset( $opts['checkKeys'] ) ? $opts['checkKeys'] : [];
+ unset( $opts['lockTSE'] ); // incompatible
+ unset( $opts['busyValue'] ); // incompatible
+
+ // Load required keys into process cache in one go
+ $keysGet = $this->getNonProcessCachedKeys( $valueKeys, $opts );
+ $this->warmupCache = $this->getRawKeysForWarmup( $keysGet, $checkKeys );
+ $this->warmupKeyMisses = 0;
+
+ // IDs of entities known to be in need of regeneration
+ $idsRegen = [];
+
+ // Find out which keys are missing/deleted/stale
+ $curTTLs = [];
+ $asOfs = [];
+ $curByKey = $this->getMulti( $keysGet, $curTTLs, $checkKeys, $asOfs );
+ foreach ( $keysGet as $key ) {
+ if ( !array_key_exists( $key, $curByKey ) || $curTTLs[$key] < 0 ) {
+ $idsRegen[] = $idsByValueKey[$key];
}
}
- $this->warmupCache = $this->cache->getMulti( $keysWarmUp );
- $this->warmupCache += array_fill_keys( $keysWarmUp, false );
+ // Run the callback to populate the regeneration value map for all required IDs
+ $newSetOpts = [];
+ $newTTLsById = array_fill_keys( $idsRegen, $ttl );
+ $newValsById = $idsRegen ? $callback( $idsRegen, $newTTLsById, $newSetOpts ) : [];
// Wrap $callback to match the getWithSetCallback() format while passing $id to $callback
- $id = null;
- $func = function ( $oldValue, &$ttl, array $setOpts, $oldAsOf ) use ( $callback, &$id ) {
- return $callback( $id, $oldValue, $ttl, $setOpts, $oldAsOf );
+ $id = null; // current entity ID
+ $func = function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
+ use ( $callback, &$id, $newValsById, $newTTLsById, $newSetOpts )
+ {
+ if ( array_key_exists( $id, $newValsById ) ) {
+ // Value was already regerated as expected, so use the value in $newValsById
+ $newValue = $newValsById[$id];
+ $ttl = $newTTLsById[$id];
+ $setOpts = $newSetOpts;
+ } else {
+ // Pre-emptive/popularity refresh and version mismatch cases are not detected
+ // above and thus $newValsById has no entry. Run $callback on this single entity.
+ $ttls = [ $id => $ttl ];
+ $newValue = $callback( [ $id ], $ttls, $setOpts )[$id];
+ $ttl = $ttls[$id];
+ }
+
+ return $newValue;
};
+ // Run the cache-aside logic using warmupCache instead of persistent cache queries
$values = [];
- foreach ( $keyedIds as $key => $id ) {
+ foreach ( $idsByValueKey as $key => $id ) { // preserve order
$values[$key] = $this->getWithSetCallback( $key, $ttl, $func, $opts );
}
return $values;
}
+ /**
+ * Locally set a key to expire soon if it is stale based on $purgeTimestamp
+ *
+ * This sets stale keys' time-to-live at HOLDOFF_TTL seconds, which both avoids
+ * broadcasting in mcrouter setups and also avoids races with new tombstones.
+ *
+ * @param string $key Cache key
+ * @param int $purgeTimestamp UNIX timestamp of purge
+ * @param bool &$isStale Whether the key is stale
+ * @return bool Success
+ * @since 1.28
+ */
+ public function reap( $key, $purgeTimestamp, &$isStale = false ) {
+ $minAsOf = $purgeTimestamp + self::HOLDOFF_TTL;
+ $wrapped = $this->cache->get( self::VALUE_KEY_PREFIX . $key );
+ if ( is_array( $wrapped ) && $wrapped[self::FLD_TIME] < $minAsOf ) {
+ $isStale = true;
+ $this->logger->warning( "Reaping stale value key '$key'." );
+ $ttlReap = self::HOLDOFF_TTL; // avoids races with tombstone creation
+ $ok = $this->cache->changeTTL( self::VALUE_KEY_PREFIX . $key, $ttlReap );
+ if ( !$ok ) {
+ $this->logger->error( "Could not complete reap of key '$key'." );
+ }
+
+ return $ok;
+ }
+
+ $isStale = false;
+
+ return true;
+ }
+
+ /**
+ * Locally set a "check" key to expire soon if it is stale based on $purgeTimestamp
+ *
+ * @param string $key Cache key
+ * @param int $purgeTimestamp UNIX timestamp of purge
+ * @param bool &$isStale Whether the key is stale
+ * @return bool Success
+ * @since 1.28
+ */
+ public function reapCheckKey( $key, $purgeTimestamp, &$isStale = false ) {
+ $purge = $this->parsePurgeValue( $this->cache->get( self::TIME_KEY_PREFIX . $key ) );
+ if ( $purge && $purge[self::FLD_TIME] < $purgeTimestamp ) {
+ $isStale = true;
+ $this->logger->warning( "Reaping stale check key '$key'." );
+ $ok = $this->cache->changeTTL( self::TIME_KEY_PREFIX . $key, 1 );
+ if ( !$ok ) {
+ $this->logger->error( "Could not complete reap of check key '$key'." );
+ }
+
+ return $ok;
+ }
+
+ $isStale = false;
+
+ return false;
+ }
+
/**
* @see BagOStuff::makeKey()
* @param string ... Key component
return (int)min( $maxTTL, max( $minTTL, $factor * $age ) );
}
+ /**
+ * @return integer Number of warmup key cache misses last round
+ * @since 1.30
+ */
+ public function getWarmupKeyMisses() {
+ return $this->warmupKeyMisses;
+ }
+
/**
* Do the actual async bus purge of a key
*
return false;
}
- // Lifecycle is: new, ramp-up refresh chance, full refresh chance
+ // Lifecycle is: new, ramp-up refresh chance, full refresh chance.
+ // Note that the "expected # of refreshes" for the ramp-up time range is half of what it
+ // would be if P(refresh) was at its full value during that time range.
$refreshWindowSec = max( $timeTillRefresh - $ageNew - self::RAMPUP_TTL / 2, 1 );
// P(refresh) * (# hits in $refreshWindowSec) = (expected # of refreshes)
// P(refresh) * ($refreshWindowSec * $popularHitsPerSec) = 1
*
* @param array|string|bool $wrapped
* @param float $now Unix Current timestamp (preferrably pre-query)
- * @return array (mixed; false if absent/invalid, current time left)
+ * @return array (mixed; false if absent/tombstoned/invalid, current time left)
*/
protected function unwrap( $wrapped, $now ) {
// Check if the value is a tombstone
return $this->processCaches[$group];
}
+
+ /**
+ * @param array $keys
+ * @param array $opts
+ * @return array List of keys
+ */
+ private function getNonProcessCachedKeys( array $keys, array $opts ) {
+ $keysFound = [];
+ if ( isset( $opts['pcTTL'] ) && $opts['pcTTL'] > 0 && $this->callbackDepth == 0 ) {
+ $pcGroup = isset( $opts['pcGroup'] ) ? $opts['pcGroup'] : self::PC_PRIMARY;
+ $procCache = $this->getProcessCache( $pcGroup );
+ foreach ( $keys as $key ) {
+ if ( $procCache->get( $key ) !== false ) {
+ $keysFound[] = $key;
+ }
+ }
+ }
+
+ return array_diff( $keys, $keysFound );
+ }
+
+ /**
+ * @param array $keys
+ * @param array $checkKeys
+ * @return array Map of (cache key => mixed)
+ */
+ private function getRawKeysForWarmup( array $keys, array $checkKeys ) {
+ if ( !$keys ) {
+ return [];
+ }
+
+ $keysWarmUp = [];
+ // Get all the value keys to fetch...
+ foreach ( $keys as $key ) {
+ $keysWarmUp[] = self::VALUE_KEY_PREFIX . $key;
+ }
+ // Get all the check keys to fetch...
+ foreach ( $checkKeys as $i => $checkKeyOrKeys ) {
+ if ( is_int( $i ) ) {
+ // Single check key that applies to all value keys
+ $keysWarmUp[] = self::TIME_KEY_PREFIX . $checkKeyOrKeys;
+ } else {
+ // List of check keys that apply to value key $i
+ $keysWarmUp = array_merge(
+ $keysWarmUp,
+ self::prefixCacheKeys( $checkKeyOrKeys, self::TIME_KEY_PREFIX )
+ );
+ }
+ }
+
+ $warmupCache = $this->cache->getMulti( $keysWarmUp );
+ $warmupCache += array_fill_keys( $keysWarmUp, false );
+
+ return $warmupCache;
+ }
}