cache = $cache; $this->store = $store; $this->logChunkCallback = $logCallback; $this->keyListCallback = $keyCallback; if ( isset( $params['channel'] ) ) { $this->channel = $params['channel']; } else { throw new UnexpectedValueException( "No channel specified." ); } $this->initialStartWindow = isset( $params['initialStartWindow'] ) ? $params['initialStartWindow'] : 3600; $this->logger = isset( $params['logger'] ) ? $params['logger'] : new NullLogger(); } public function setLogger( LoggerInterface $logger ) { $this->logger = $logger; } /** * Check and reap stale keys based on a chunk of events * * @param int $n Number of events * @return int Number of keys checked */ final public function invoke( $n = 100 ) { $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel ); $scopeLock = $this->store->getScopedLock( "$posKey:busy", 0 ); if ( !$scopeLock ) { return 0; } $now = time(); $status = $this->store->get( $posKey ); if ( !$status ) { $status = [ 'pos' => $now - $this->initialStartWindow, 'id' => null ]; } // Get events for entities who's keys tombstones/hold-off should have expired by now $events = call_user_func_array( $this->logChunkCallback, [ $status['pos'], $status['id'], $now - WANObjectCache::HOLDOFF_TTL - 1, $n ] ); $event = null; $keyEvents = []; foreach ( $events as $event ) { $keys = call_user_func_array( $this->keyListCallback, [ $this->cache, $event['item'] ] ); foreach ( $keys as $key ) { unset( $keyEvents[$key] ); // use only the latest per key $keyEvents[$key] = [ 'pos' => $event['pos'], 'id' => $event['id'] ]; } } $purgeCount = 0; $lastOkEvent = null; foreach ( $keyEvents as $key => $keyEvent ) { if ( !$this->cache->reap( $key, $keyEvent['pos'] ) ) { break; } ++$purgeCount; $lastOkEvent = $event; } if ( $lastOkEvent ) { $ok = $this->store->merge( $posKey, function ( $bag, $key, $curValue ) use ( $lastOkEvent ) { if ( !$curValue ) { // Use new position } else { $curCoord = [ $curValue['pos'], $curValue['id'] ]; $newCoord = [ $lastOkEvent['pos'], $lastOkEvent['id'] ]; if ( $newCoord < $curCoord ) { // Keep prior position instead of rolling it back return $curValue; } } return [ 'pos' => $lastOkEvent['pos'], 'id' => $lastOkEvent['id'], 'ctime' => $curValue ? $curValue['ctime'] : date( 'c' ) ]; }, IExpiringStore::TTL_INDEFINITE ); $pos = $lastOkEvent['pos']; $id = $lastOkEvent['id']; if ( $ok ) { $this->logger->info( "Updated cache reap position ($pos, $id)." ); } else { $this->logger->error( "Could not update cache reap position ($pos, $id)." ); } } ScopedCallback::consume( $scopeLock ); return $purgeCount; } /** * @return array|bool Returns (pos, id) map or false if not set */ public function getState() { $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel ); return $this->store->get( $posKey ); } }