Merge "Introduce RevisionStoreFactory & Tests"
[lhc/web/wiklou.git] / includes / Storage / NameTableStore.php
1 <?php
2 /**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20
21 namespace MediaWiki\Storage;
22
23 use IExpiringStore;
24 use Psr\Log\LoggerInterface;
25 use WANObjectCache;
26 use Wikimedia\Assert\Assert;
27 use Wikimedia\Rdbms\Database;
28 use Wikimedia\Rdbms\IDatabase;
29 use Wikimedia\Rdbms\ILoadBalancer;
30 use Wikimedia\Rdbms\LoadBalancer;
31
32 /**
33 * @author Addshore
34 * @since 1.31
35 */
36 class NameTableStore {
37
38 /** @var LoadBalancer */
39 private $loadBalancer;
40
41 /** @var WANObjectCache */
42 private $cache;
43
44 /** @var LoggerInterface */
45 private $logger;
46
47 /** @var string[] */
48 private $tableCache = null;
49
50 /** @var bool|string */
51 private $wikiId = false;
52
53 /** @var int */
54 private $cacheTTL;
55
56 /** @var string */
57 private $table;
58 /** @var string */
59 private $idField;
60 /** @var string */
61 private $nameField;
62 /** @var null|callable */
63 private $normalizationCallback = null;
64 /** @var null|callable */
65 private $insertCallback = null;
66
67 /**
68 * @param ILoadBalancer $dbLoadBalancer A load balancer for acquiring database connections
69 * @param WANObjectCache $cache A cache manager for caching data
70 * @param LoggerInterface $logger
71 * @param string $table
72 * @param string $idField
73 * @param string $nameField
74 * @param callable|null $normalizationCallback Normalization to be applied to names before being
75 * saved or queried. This should be a callback that accepts and returns a single string.
76 * @param bool|string $wikiId The ID of the target wiki database. Use false for the local wiki.
77 * @param callable|null $insertCallback Callback to change insert fields accordingly.
78 * This parameter was introduced in 1.32
79 */
80 public function __construct(
81 ILoadBalancer $dbLoadBalancer,
82 WANObjectCache $cache,
83 LoggerInterface $logger,
84 $table,
85 $idField,
86 $nameField,
87 callable $normalizationCallback = null,
88 $wikiId = false,
89 callable $insertCallback = null
90 ) {
91 $this->loadBalancer = $dbLoadBalancer;
92 $this->cache = $cache;
93 $this->logger = $logger;
94 $this->table = $table;
95 $this->idField = $idField;
96 $this->nameField = $nameField;
97 $this->normalizationCallback = $normalizationCallback;
98 $this->wikiId = $wikiId;
99 $this->cacheTTL = IExpiringStore::TTL_MONTH;
100 $this->insertCallback = $insertCallback;
101 }
102
103 /**
104 * @param int $index A database index, like DB_MASTER or DB_REPLICA
105 * @param int $flags Database connection flags
106 *
107 * @return IDatabase
108 */
109 private function getDBConnection( $index, $flags = 0 ) {
110 return $this->loadBalancer->getConnection( $index, [], $this->wikiId, $flags );
111 }
112
113 /**
114 * Gets the cache key for names.
115 *
116 * The cache key is constructed based on the wiki ID passed to the constructor, and allows
117 * sharing of name tables cached for a specific database between wikis.
118 *
119 * @return string
120 */
121 private function getCacheKey() {
122 return $this->cache->makeGlobalKey(
123 'NameTableSqlStore',
124 $this->table,
125 $this->loadBalancer->resolveDomainID( $this->wikiId )
126 );
127 }
128
129 /**
130 * @param string $name
131 * @return string
132 */
133 private function normalizeName( $name ) {
134 if ( $this->normalizationCallback === null ) {
135 return $name;
136 }
137 return call_user_func( $this->normalizationCallback, $name );
138 }
139
140 /**
141 * Acquire the id of the given name.
142 * This creates a row in the table if it doesn't already exist.
143 *
144 * @param string $name
145 * @throws NameTableAccessException
146 * @return int
147 */
148 public function acquireId( $name ) {
149 Assert::parameterType( 'string', $name, '$name' );
150 $name = $this->normalizeName( $name );
151
152 $table = $this->getTableFromCachesOrReplica();
153 $searchResult = array_search( $name, $table, true );
154 if ( $searchResult === false ) {
155 $id = $this->store( $name );
156 if ( $id === null ) {
157 // RACE: $name was already in the db, probably just inserted, so load from master
158 // Use DBO_TRX to avoid missing inserts due to other threads or REPEATABLE-READs
159 $table = $this->loadTable(
160 $this->getDBConnection( DB_MASTER, LoadBalancer::CONN_TRX_AUTOCOMMIT )
161 );
162 $searchResult = array_search( $name, $table, true );
163 if ( $searchResult === false ) {
164 // Insert failed due to IGNORE flag, but DB_MASTER didn't give us the data
165 $m = "No insert possible but master didn't give us a record for " .
166 "'{$name}' in '{$this->table}'";
167 $this->logger->error( $m );
168 throw new NameTableAccessException( $m );
169 }
170 $this->purgeWANCache(
171 function () {
172 $this->cache->reap( $this->getCacheKey(), INF );
173 }
174 );
175 } else {
176 $table[$id] = $name;
177 $searchResult = $id;
178 // As store returned an ID we know we inserted so delete from WAN cache
179 $this->purgeWANCache(
180 function () {
181 $this->cache->delete( $this->getCacheKey() );
182 }
183 );
184 }
185 $this->tableCache = $table;
186 }
187
188 return $searchResult;
189 }
190
191 /**
192 * Get the id of the given name.
193 * If the name doesn't exist this will throw.
194 * This should be used in cases where we believe the name already exists or want to check for
195 * existence.
196 *
197 * @param string $name
198 * @throws NameTableAccessException The name does not exist
199 * @return int Id
200 */
201 public function getId( $name ) {
202 Assert::parameterType( 'string', $name, '$name' );
203 $name = $this->normalizeName( $name );
204
205 $table = $this->getTableFromCachesOrReplica();
206 $searchResult = array_search( $name, $table, true );
207
208 if ( $searchResult !== false ) {
209 return $searchResult;
210 }
211
212 throw NameTableAccessException::newFromDetails( $this->table, 'name', $name );
213 }
214
215 /**
216 * Get the name of the given id.
217 * If the id doesn't exist this will throw.
218 * This should be used in cases where we believe the id already exists.
219 *
220 * Note: Calls to this method will result in a master select for non existing IDs.
221 *
222 * @param int $id
223 * @throws NameTableAccessException The id does not exist
224 * @return string name
225 */
226 public function getName( $id ) {
227 Assert::parameterType( 'integer', $id, '$id' );
228
229 $table = $this->getTableFromCachesOrReplica();
230 if ( array_key_exists( $id, $table ) ) {
231 return $table[$id];
232 }
233
234 $table = $this->cache->getWithSetCallback(
235 $this->getCacheKey(),
236 $this->cacheTTL,
237 function ( $oldValue, &$ttl, &$setOpts ) use ( $id ) {
238 // Check if cached value is up-to-date enough to have $id
239 if ( is_array( $oldValue ) && array_key_exists( $id, $oldValue ) ) {
240 // Completely leave the cache key alone
241 $ttl = WANObjectCache::TTL_UNCACHEABLE;
242 // Use the old value
243 return $oldValue;
244 }
245 // Regenerate from replica DB, and master DB if needed
246 foreach ( [ DB_REPLICA, DB_MASTER ] as $source ) {
247 // Log a fallback to master
248 if ( $source === DB_MASTER ) {
249 $this->logger->info(
250 __METHOD__ . 'falling back to master select from ' .
251 $this->table . ' with id ' . $id
252 );
253 }
254 $db = $this->getDBConnection( $source );
255 $cacheSetOpts = Database::getCacheSetOptions( $db );
256 $table = $this->loadTable( $db );
257 if ( array_key_exists( $id, $table ) ) {
258 break; // found it
259 }
260 }
261 // Use the value from last source checked
262 $setOpts += $cacheSetOpts;
263
264 return $table;
265 },
266 [ 'minAsOf' => INF ] // force callback run
267 );
268
269 $this->tableCache = $table;
270
271 if ( array_key_exists( $id, $table ) ) {
272 return $table[$id];
273 }
274
275 throw NameTableAccessException::newFromDetails( $this->table, 'id', $id );
276 }
277
278 /**
279 * Get the whole table, in no particular order as a map of ids to names.
280 * This method could be subject to DB or cache lag.
281 *
282 * @return string[] keys are the name ids, values are the names themselves
283 * Example: [ 1 => 'foo', 3 => 'bar' ]
284 */
285 public function getMap() {
286 return $this->getTableFromCachesOrReplica();
287 }
288
289 /**
290 * @return string[]
291 */
292 private function getTableFromCachesOrReplica() {
293 if ( $this->tableCache !== null ) {
294 return $this->tableCache;
295 }
296
297 $table = $this->cache->getWithSetCallback(
298 $this->getCacheKey(),
299 $this->cacheTTL,
300 function ( $oldValue, &$ttl, &$setOpts ) {
301 $dbr = $this->getDBConnection( DB_REPLICA );
302 $setOpts += Database::getCacheSetOptions( $dbr );
303 return $this->loadTable( $dbr );
304 }
305 );
306
307 $this->tableCache = $table;
308
309 return $table;
310 }
311
312 /**
313 * Reap the WANCache entry for this table.
314 *
315 * @param callable $purgeCallback callback to 'purge' the WAN cache
316 */
317 private function purgeWANCache( $purgeCallback ) {
318 // If the LB has no DB changes don't both with onTransactionPreCommitOrIdle
319 if ( !$this->loadBalancer->hasOrMadeRecentMasterChanges() ) {
320 $purgeCallback();
321 return;
322 }
323
324 $this->getDBConnection( DB_MASTER )
325 ->onTransactionPreCommitOrIdle( $purgeCallback, __METHOD__ );
326 }
327
328 /**
329 * Gets the table from the db
330 *
331 * @param IDatabase $db
332 *
333 * @return string[]
334 */
335 private function loadTable( IDatabase $db ) {
336 $result = $db->select(
337 $this->table,
338 [
339 'id' => $this->idField,
340 'name' => $this->nameField
341 ],
342 [],
343 __METHOD__,
344 [ 'ORDER BY' => 'id' ]
345 );
346
347 $assocArray = [];
348 foreach ( $result as $row ) {
349 $assocArray[$row->id] = $row->name;
350 }
351
352 return $assocArray;
353 }
354
355 /**
356 * Stores the given name in the DB, returning the ID when an insert occurs.
357 *
358 * @param string $name
359 * @return int|null int if we know the ID, null if we don't
360 */
361 private function store( $name ) {
362 Assert::parameterType( 'string', $name, '$name' );
363 Assert::parameter( $name !== '', '$name', 'should not be an empty string' );
364 // Note: this is only called internally so normalization of $name has already occurred.
365
366 $dbw = $this->getDBConnection( DB_MASTER );
367
368 $dbw->insert(
369 $this->table,
370 $this->getFieldsToStore( $name ),
371 __METHOD__,
372 [ 'IGNORE' ]
373 );
374
375 if ( $dbw->affectedRows() === 0 ) {
376 $this->logger->info(
377 'Tried to insert name into table ' . $this->table . ', but value already existed.'
378 );
379 return null;
380 }
381
382 return $dbw->insertId();
383 }
384
385 /**
386 * @param string $name
387 * @return array
388 */
389 private function getFieldsToStore( $name ) {
390 $fields = [ $this->nameField => $name ];
391 if ( $this->insertCallback !== null ) {
392 $fields = call_user_func( $this->insertCallback, $fields );
393 }
394 return $fields;
395 }
396
397 }