Merge "Perform a permission check on the title when changing the page language"
[lhc/web/wiklou.git] / includes / libs / objectcache / WANObjectCacheReaper.php
1 <?php
2 /**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 * @ingroup Cache
20 */
21
22 use Psr\Log\LoggerAwareInterface;
23 use Psr\Log\LoggerInterface;
24 use Psr\Log\NullLogger;
25 use Wikimedia\ScopedCallback;
26
27 /**
28 * Class for scanning through chronological, log-structured data or change logs
29 * and locally purging cache keys related to entities that appear in this data.
30 *
31 * This is useful for repairing cache when purges are missed by using a reliable
32 * stream, such as Kafka or a replicated MySQL table. Purge loss between datacenters
33 * is expected to be more common than within them.
34 *
35 * @since 1.28
36 */
37 class WANObjectCacheReaper implements LoggerAwareInterface {
38 /** @var WANObjectCache */
39 protected $cache;
40 /** @var BagOStuff */
41 protected $store;
42 /** @var callable */
43 protected $logChunkCallback;
44 /** @var callable */
45 protected $keyListCallback;
46 /** @var LoggerInterface */
47 protected $logger;
48
49 /** @var string */
50 protected $channel;
51 /** @var integer */
52 protected $initialStartWindow;
53
54 /**
55 * @param WANObjectCache $cache Cache to reap bad keys from
56 * @param BagOStuff $store Cache to store positions use for locking
57 * @param callable $logCallback Callback taking arguments:
58 * - The starting position as a UNIX timestamp
59 * - The starting unique ID used for breaking timestamp collisions or null
60 * - The ending position as a UNIX timestamp
61 * - The maximum number of results to return
62 * It returns a list of maps of (key: cache key, pos: UNIX timestamp, id: unique ID)
63 * for each key affected, with the corrosponding event timestamp/ID information.
64 * The events should be in ascending order, by (timestamp,id).
65 * @param callable $keyCallback Callback taking arguments:
66 * - The WANObjectCache instance
67 * - An object from the event log
68 * It should return a list of WAN cache keys.
69 * The callback must fully duck-type test the object, since can be any model class.
70 * @param array $params Additional options:
71 * - channel: the name of the update event stream.
72 * Default: WANObjectCache::DEFAULT_PURGE_CHANNEL.
73 * - initialStartWindow: seconds back in time to start if the position is lost.
74 * Default: 1 hour.
75 * - logger: an SPL monolog instance [optional]
76 */
77 public function __construct(
78 WANObjectCache $cache,
79 BagOStuff $store,
80 callable $logCallback,
81 callable $keyCallback,
82 array $params
83 ) {
84 $this->cache = $cache;
85 $this->store = $store;
86
87 $this->logChunkCallback = $logCallback;
88 $this->keyListCallback = $keyCallback;
89 if ( isset( $params['channel'] ) ) {
90 $this->channel = $params['channel'];
91 } else {
92 throw new UnexpectedValueException( "No channel specified." );
93 }
94
95 $this->initialStartWindow = isset( $params['initialStartWindow'] )
96 ? $params['initialStartWindow']
97 : 3600;
98 $this->logger = isset( $params['logger'] )
99 ? $params['logger']
100 : new NullLogger();
101 }
102
103 public function setLogger( LoggerInterface $logger ) {
104 $this->logger = $logger;
105 }
106
107 /**
108 * Check and reap stale keys based on a chunk of events
109 *
110 * @param int $n Number of events
111 * @return int Number of keys checked
112 */
113 final public function invoke( $n = 100 ) {
114 $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
115 $scopeLock = $this->store->getScopedLock( "$posKey:busy", 0 );
116 if ( !$scopeLock ) {
117 return 0;
118 }
119
120 $now = time();
121 $status = $this->store->get( $posKey );
122 if ( !$status ) {
123 $status = [ 'pos' => $now - $this->initialStartWindow, 'id' => null ];
124 }
125
126 // Get events for entities who's keys tombstones/hold-off should have expired by now
127 $events = call_user_func_array(
128 $this->logChunkCallback,
129 [ $status['pos'], $status['id'], $now - WANObjectCache::HOLDOFF_TTL - 1, $n ]
130 );
131
132 $event = null;
133 $keyEvents = [];
134 foreach ( $events as $event ) {
135 $keys = call_user_func_array(
136 $this->keyListCallback,
137 [ $this->cache, $event['item'] ]
138 );
139 foreach ( $keys as $key ) {
140 unset( $keyEvents[$key] ); // use only the latest per key
141 $keyEvents[$key] = [
142 'pos' => $event['pos'],
143 'id' => $event['id']
144 ];
145 }
146 }
147
148 $purgeCount = 0;
149 $lastOkEvent = null;
150 foreach ( $keyEvents as $key => $keyEvent ) {
151 if ( !$this->cache->reap( $key, $keyEvent['pos'] ) ) {
152 break;
153 }
154 ++$purgeCount;
155 $lastOkEvent = $event;
156 }
157
158 if ( $lastOkEvent ) {
159 $ok = $this->store->merge(
160 $posKey,
161 function ( $bag, $key, $curValue ) use ( $lastOkEvent ) {
162 if ( !$curValue ) {
163 // Use new position
164 } else {
165 $curCoord = [ $curValue['pos'], $curValue['id'] ];
166 $newCoord = [ $lastOkEvent['pos'], $lastOkEvent['id'] ];
167 if ( $newCoord < $curCoord ) {
168 // Keep prior position instead of rolling it back
169 return $curValue;
170 }
171 }
172
173 return [
174 'pos' => $lastOkEvent['pos'],
175 'id' => $lastOkEvent['id'],
176 'ctime' => $curValue ? $curValue['ctime'] : date( 'c' )
177 ];
178 },
179 IExpiringStore::TTL_INDEFINITE
180 );
181
182 $pos = $lastOkEvent['pos'];
183 $id = $lastOkEvent['id'];
184 if ( $ok ) {
185 $this->logger->info( "Updated cache reap position ($pos, $id)." );
186 } else {
187 $this->logger->error( "Could not update cache reap position ($pos, $id)." );
188 }
189 }
190
191 ScopedCallback::consume( $scopeLock );
192
193 return $purgeCount;
194 }
195
196 /**
197 * @return array|bool Returns (pos, id) map or false if not set
198 */
199 public function getState() {
200 $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
201
202 return $this->store->get( $posKey );
203 }
204 }