Merge "Revert "Log the reason why revision->getContent() returns null""
[lhc/web/wiklou.git] / includes / Storage / NameTableStore.php
1 <?php
2 /**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20
21 namespace MediaWiki\Storage;
22
23 use IExpiringStore;
24 use Psr\Log\LoggerInterface;
25 use WANObjectCache;
26 use Wikimedia\Assert\Assert;
27 use Wikimedia\Rdbms\Database;
28 use Wikimedia\Rdbms\IDatabase;
29 use Wikimedia\Rdbms\LoadBalancer;
30
31 /**
32 * @author Addshore
33 * @since 1.31
34 */
35 class NameTableStore {
36
37 /** @var LoadBalancer */
38 private $loadBalancer;
39
40 /** @var WANObjectCache */
41 private $cache;
42
43 /** @var LoggerInterface */
44 private $logger;
45
46 /** @var string[] */
47 private $tableCache = null;
48
49 /** @var bool|string */
50 private $wikiId = false;
51
52 /** @var int */
53 private $cacheTTL;
54
55 /** @var string */
56 private $table;
57 /** @var string */
58 private $idField;
59 /** @var string */
60 private $nameField;
61 /** @var null|callable */
62 private $normalizationCallback = null;
63
64 /**
65 * @param LoadBalancer $dbLoadBalancer A load balancer for acquiring database connections
66 * @param WANObjectCache $cache A cache manager for caching data
67 * @param LoggerInterface $logger
68 * @param string $table
69 * @param string $idField
70 * @param string $nameField
71 * @param callable $normalizationCallback Normalization to be applied to names before being
72 * saved or queried. This should be a callback that accepts and returns a single string.
73 * @param bool|string $wikiId The ID of the target wiki database. Use false for the local wiki.
74 */
75 public function __construct(
76 LoadBalancer $dbLoadBalancer,
77 WANObjectCache $cache,
78 LoggerInterface $logger,
79 $table,
80 $idField,
81 $nameField,
82 callable $normalizationCallback = null,
83 $wikiId = false
84 ) {
85 $this->loadBalancer = $dbLoadBalancer;
86 $this->cache = $cache;
87 $this->logger = $logger;
88 $this->table = $table;
89 $this->idField = $idField;
90 $this->nameField = $nameField;
91 $this->normalizationCallback = $normalizationCallback;
92 $this->wikiId = $wikiId;
93 $this->cacheTTL = IExpiringStore::TTL_MONTH;
94 }
95
96 /**
97 * @param int $index A database index, like DB_MASTER or DB_REPLICA
98 * @param int $flags Database connection flags
99 *
100 * @return IDatabase
101 */
102 private function getDBConnection( $index, $flags = 0 ) {
103 return $this->loadBalancer->getConnection( $index, [], $this->wikiId, $flags );
104 }
105
106 private function getCacheKey() {
107 return $this->cache->makeKey( 'NameTableSqlStore', $this->table, $this->wikiId );
108 }
109
110 /**
111 * @param string $name
112 * @return string
113 */
114 private function normalizeName( $name ) {
115 if ( $this->normalizationCallback === null ) {
116 return $name;
117 }
118 return call_user_func( $this->normalizationCallback, $name );
119 }
120
121 /**
122 * Acquire the id of the given name.
123 * This creates a row in the table if it doesn't already exist.
124 *
125 * @param string $name
126 * @throws NameTableAccessException
127 * @return int
128 */
129 public function acquireId( $name ) {
130 Assert::parameterType( 'string', $name, '$name' );
131 $name = $this->normalizeName( $name );
132
133 $table = $this->getTableFromCachesOrReplica();
134 $searchResult = array_search( $name, $table, true );
135 if ( $searchResult === false ) {
136 $id = $this->store( $name );
137 if ( $id === null ) {
138 // RACE: $name was already in the db, probably just inserted, so load from master
139 // Use DBO_TRX to avoid missing inserts due to other threads or REPEATABLE-READs
140 $table = $this->loadTable(
141 $this->getDBConnection( DB_MASTER, LoadBalancer::CONN_TRX_AUTO )
142 );
143 $searchResult = array_search( $name, $table, true );
144 if ( $searchResult === false ) {
145 // Insert failed due to IGNORE flag, but DB_MASTER didn't give us the data
146 $m = "No insert possible but master didn't give us a record for " .
147 "'{$name}' in '{$this->table}'";
148 $this->logger->error( $m );
149 throw new NameTableAccessException( $m );
150 }
151 $this->purgeWANCache(
152 function () {
153 $this->cache->reap( $this->getCacheKey(), INF );
154 }
155 );
156 } else {
157 $table[$id] = $name;
158 $searchResult = $id;
159 // As store returned an ID we know we inserted so delete from WAN cache
160 $this->purgeWANCache(
161 function () {
162 $this->cache->delete( $this->getCacheKey() );
163 }
164 );
165 }
166 $this->tableCache = $table;
167 }
168
169 return $searchResult;
170 }
171
172 /**
173 * Get the id of the given name.
174 * If the name doesn't exist this will throw.
175 * This should be used in cases where we believe the name already exists or want to check for
176 * existence.
177 *
178 * @param string $name
179 * @throws NameTableAccessException The name does not exist
180 * @return int Id
181 */
182 public function getId( $name ) {
183 Assert::parameterType( 'string', $name, '$name' );
184 $name = $this->normalizeName( $name );
185
186 $table = $this->getTableFromCachesOrReplica();
187 $searchResult = array_search( $name, $table, true );
188
189 if ( $searchResult !== false ) {
190 return $searchResult;
191 }
192
193 throw NameTableAccessException::newFromDetails( $this->table, 'name', $name );
194 }
195
196 /**
197 * Get the name of the given id.
198 * If the id doesn't exist this will throw.
199 * This should be used in cases where we believe the id already exists.
200 *
201 * Note: Calls to this method will result in a master select for non existing IDs.
202 *
203 * @param int $id
204 * @throws NameTableAccessException The id does not exist
205 * @return string name
206 */
207 public function getName( $id ) {
208 Assert::parameterType( 'integer', $id, '$id' );
209
210 $table = $this->getTableFromCachesOrReplica();
211 if ( array_key_exists( $id, $table ) ) {
212 return $table[$id];
213 }
214
215 $table = $this->cache->getWithSetCallback(
216 $this->getCacheKey(),
217 $this->cacheTTL,
218 function ( $oldValue, &$ttl, &$setOpts ) use ( $id ) {
219 // Check if cached value is up-to-date enough to have $id
220 if ( is_array( $oldValue ) && array_key_exists( $id, $oldValue ) ) {
221 // Completely leave the cache key alone
222 $ttl = WANObjectCache::TTL_UNCACHEABLE;
223 // Use the old value
224 return $oldValue;
225 }
226 // Regenerate from replica DB, and master DB if needed
227 foreach ( [ DB_REPLICA, DB_MASTER ] as $source ) {
228 // Log a fallback to master
229 if ( $source === DB_MASTER ) {
230 $this->logger->info(
231 __METHOD__ . 'falling back to master select from ' .
232 $this->table . ' with id ' . $id
233 );
234 }
235 $db = $this->getDBConnection( $source );
236 $cacheSetOpts = Database::getCacheSetOptions( $db );
237 $table = $this->loadTable( $db );
238 if ( array_key_exists( $id, $table ) ) {
239 break; // found it
240 }
241 }
242 // Use the value from last source checked
243 $setOpts += $cacheSetOpts;
244
245 return $table;
246 },
247 [ 'minAsOf' => INF ] // force callback run
248 );
249
250 $this->tableCache = $table;
251
252 if ( array_key_exists( $id, $table ) ) {
253 return $table[$id];
254 }
255
256 throw NameTableAccessException::newFromDetails( $this->table, 'id', $id );
257 }
258
259 /**
260 * Get the whole table, in no particular order as a map of ids to names.
261 * This method could be subject to DB or cache lag.
262 *
263 * @return string[] keys are the name ids, values are the names themselves
264 * Example: [ 1 => 'foo', 3 => 'bar' ]
265 */
266 public function getMap() {
267 return $this->getTableFromCachesOrReplica();
268 }
269
270 /**
271 * @return string[]
272 */
273 private function getTableFromCachesOrReplica() {
274 if ( $this->tableCache !== null ) {
275 return $this->tableCache;
276 }
277
278 $table = $this->cache->getWithSetCallback(
279 $this->getCacheKey(),
280 $this->cacheTTL,
281 function ( $oldValue, &$ttl, &$setOpts ) {
282 $dbr = $this->getDBConnection( DB_REPLICA );
283 $setOpts += Database::getCacheSetOptions( $dbr );
284 return $this->loadTable( $dbr );
285 }
286 );
287
288 $this->tableCache = $table;
289
290 return $table;
291 }
292
293 /**
294 * Reap the WANCache entry for this table.
295 *
296 * @param callable $purgeCallback callback to 'purge' the WAN cache
297 */
298 private function purgeWANCache( $purgeCallback ) {
299 // If the LB has no DB changes don't both with onTransactionPreCommitOrIdle
300 if ( !$this->loadBalancer->hasOrMadeRecentMasterChanges() ) {
301 $purgeCallback();
302 return;
303 }
304
305 $this->getDBConnection( DB_MASTER )
306 ->onTransactionPreCommitOrIdle( $purgeCallback, __METHOD__ );
307 }
308
309 /**
310 * Gets the table from the db
311 *
312 * @param IDatabase $db
313 *
314 * @return string[]
315 */
316 private function loadTable( IDatabase $db ) {
317 $result = $db->select(
318 $this->table,
319 [
320 'id' => $this->idField,
321 'name' => $this->nameField
322 ],
323 [],
324 __METHOD__
325 );
326
327 $assocArray = [];
328 foreach ( $result as $row ) {
329 $assocArray[$row->id] = $row->name;
330 }
331
332 return $assocArray;
333 }
334
335 /**
336 * Stores the given name in the DB, returning the ID when an insert occurs.
337 *
338 * @param string $name
339 * @return int|null int if we know the ID, null if we don't
340 */
341 private function store( $name ) {
342 Assert::parameterType( 'string', $name, '$name' );
343 Assert::parameter( $name !== '', '$name', 'should not be an empty string' );
344 // Note: this is only called internally so normalization of $name has already occurred.
345
346 $dbw = $this->getDBConnection( DB_MASTER );
347
348 $dbw->insert(
349 $this->table,
350 [ $this->nameField => $name ],
351 __METHOD__,
352 [ 'IGNORE' ]
353 );
354
355 if ( $dbw->affectedRows() === 0 ) {
356 $this->logger->info(
357 'Tried to insert name into table ' . $this->table . ', but value already existed.'
358 );
359 return null;
360 }
361
362 return $dbw->insertId();
363 }
364
365 }