Merge "jobqueue: remove unused "aggregator" field reference in JobQueueFederated"
[lhc/web/wiklou.git] / includes / objectcache / SqlBagOStuff.php
1 <?php
2 /**
3 * Object caching using a SQL database.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
19 *
20 * @file
21 * @ingroup Cache
22 */
23
24 use MediaWiki\MediaWikiServices;
25 use Wikimedia\Rdbms\Database;
26 use Wikimedia\Rdbms\IDatabase;
27 use Wikimedia\Rdbms\DBError;
28 use Wikimedia\Rdbms\DBQueryError;
29 use Wikimedia\Rdbms\DBConnectionError;
30 use Wikimedia\Rdbms\LoadBalancer;
31 use Wikimedia\ScopedCallback;
32 use Wikimedia\WaitConditionLoop;
33
34 /**
35 * Class to store objects in the database
36 *
37 * @ingroup Cache
38 */
39 class SqlBagOStuff extends BagOStuff {
40 /** @var array[] (server index => server config) */
41 protected $serverInfos;
42 /** @var string[] (server index => tag/host name) */
43 protected $serverTags;
44 /** @var int */
45 protected $numServers;
46 /** @var int UNIX timestamp */
47 protected $lastGarbageCollect = 0;
48 /** @var int */
49 protected $purgePeriod = 10;
50 /** @var int */
51 protected $purgeLimit = 100;
52 /** @var int */
53 protected $shards = 1;
54 /** @var string */
55 protected $tableName = 'objectcache';
56 /** @var bool */
57 protected $replicaOnly = false;
58 /** @var int */
59 protected $syncTimeout = 3;
60
61 /** @var LoadBalancer|null */
62 protected $separateMainLB;
63 /** @var array */
64 protected $conns;
65 /** @var array UNIX timestamps */
66 protected $connFailureTimes = [];
67 /** @var array Exceptions */
68 protected $connFailureErrors = [];
69
70 /** @var int */
71 const GARBAGE_COLLECT_DELAY_SEC = 1;
72
73 /**
74 * Constructor. Parameters are:
75 * - server: A server info structure in the format required by each
76 * element in $wgDBServers.
77 *
78 * - servers: An array of server info structures describing a set of database servers
79 * to distribute keys to. If this is specified, the "server" option will be
80 * ignored. If string keys are used, then they will be used for consistent
81 * hashing *instead* of the host name (from the server config). This is useful
82 * when a cluster is replicated to another site (with different host names)
83 * but each server has a corresponding replica in the other cluster.
84 *
85 * - purgePeriod: The average number of object cache writes in between
86 * garbage collection operations, where expired entries
87 * are removed from the database. Or in other words, the
88 * reciprocal of the probability of purging on any given
89 * write. If this is set to zero, purging will never be done.
90 *
91 * - purgeLimit: Maximum number of rows to purge at once.
92 *
93 * - tableName: The table name to use, default is "objectcache".
94 *
95 * - shards: The number of tables to use for data storage on each server.
96 * If this is more than 1, table names will be formed in the style
97 * objectcacheNNN where NNN is the shard index, between 0 and
98 * shards-1. The number of digits will be the minimum number
99 * required to hold the largest shard index. Data will be
100 * distributed across all tables by key hash. This is for
101 * MySQL bugs 61735 <https://bugs.mysql.com/bug.php?id=61735>
102 * and 61736 <https://bugs.mysql.com/bug.php?id=61736>.
103 *
104 * - slaveOnly: Whether to only use replica DBs and avoid triggering
105 * garbage collection logic of expired items. This only
106 * makes sense if the primary DB is used and only if get()
107 * calls will be used. This is used by ReplicatedBagOStuff.
108 * - syncTimeout: Max seconds to wait for replica DBs to catch up for WRITE_SYNC.
109 *
110 * @param array $params
111 */
112 public function __construct( $params ) {
113 parent::__construct( $params );
114
115 $this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL;
116 $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_NONE;
117
118 if ( isset( $params['servers'] ) ) {
119 $this->serverInfos = [];
120 $this->serverTags = [];
121 $this->numServers = count( $params['servers'] );
122 $index = 0;
123 foreach ( $params['servers'] as $tag => $info ) {
124 $this->serverInfos[$index] = $info;
125 if ( is_string( $tag ) ) {
126 $this->serverTags[$index] = $tag;
127 } else {
128 $this->serverTags[$index] = $info['host'] ?? "#$index";
129 }
130 ++$index;
131 }
132 } elseif ( isset( $params['server'] ) ) {
133 $this->serverInfos = [ $params['server'] ];
134 $this->numServers = count( $this->serverInfos );
135 } else {
136 // Default to using the main wiki's database servers
137 $this->serverInfos = false;
138 $this->numServers = 1;
139 $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_BE;
140 }
141 if ( isset( $params['purgePeriod'] ) ) {
142 $this->purgePeriod = intval( $params['purgePeriod'] );
143 }
144 if ( isset( $params['purgeLimit'] ) ) {
145 $this->purgeLimit = intval( $params['purgeLimit'] );
146 }
147 if ( isset( $params['tableName'] ) ) {
148 $this->tableName = $params['tableName'];
149 }
150 if ( isset( $params['shards'] ) ) {
151 $this->shards = intval( $params['shards'] );
152 }
153 if ( isset( $params['syncTimeout'] ) ) {
154 $this->syncTimeout = $params['syncTimeout'];
155 }
156 $this->replicaOnly = !empty( $params['slaveOnly'] );
157 }
158
159 /**
160 * Get a connection to the specified database
161 *
162 * @param int $serverIndex
163 * @return Database
164 * @throws MWException
165 */
166 protected function getDB( $serverIndex ) {
167 if ( !isset( $this->conns[$serverIndex] ) ) {
168 if ( $serverIndex >= $this->numServers ) {
169 throw new MWException( __METHOD__ . ": Invalid server index \"$serverIndex\"" );
170 }
171
172 # Don't keep timing out trying to connect for each call if the DB is down
173 if ( isset( $this->connFailureErrors[$serverIndex] )
174 && ( time() - $this->connFailureTimes[$serverIndex] ) < 60
175 ) {
176 throw $this->connFailureErrors[$serverIndex];
177 }
178
179 if ( $this->serverInfos ) {
180 // Use custom database defined by server connection info
181 $info = $this->serverInfos[$serverIndex];
182 $type = $info['type'] ?? 'mysql';
183 $host = $info['host'] ?? '[unknown]';
184 $this->logger->debug( __CLASS__ . ": connecting to $host" );
185 $db = Database::factory( $type, $info );
186 $db->clearFlag( DBO_TRX ); // auto-commit mode
187 } else {
188 // Use the main LB database
189 $lb = MediaWikiServices::getInstance()->getDBLoadBalancer();
190 $index = $this->replicaOnly ? DB_REPLICA : DB_MASTER;
191 if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) {
192 // Keep a separate connection to avoid contention and deadlocks
193 $db = $lb->getConnection( $index, [], false, $lb::CONN_TRX_AUTOCOMMIT );
194 } else {
195 // However, SQLite has the opposite behavior due to DB-level locking.
196 // Stock sqlite MediaWiki installs use a separate sqlite cache DB instead.
197 $db = $lb->getConnection( $index );
198 }
199 }
200
201 $this->logger->debug( sprintf( "Connection %s will be used for SqlBagOStuff", $db ) );
202 $this->conns[$serverIndex] = $db;
203 }
204
205 return $this->conns[$serverIndex];
206 }
207
208 /**
209 * Get the server index and table name for a given key
210 * @param string $key
211 * @return array Server index and table name
212 */
213 protected function getTableByKey( $key ) {
214 if ( $this->shards > 1 ) {
215 $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
216 $tableIndex = $hash % $this->shards;
217 } else {
218 $tableIndex = 0;
219 }
220 if ( $this->numServers > 1 ) {
221 $sortedServers = $this->serverTags;
222 ArrayUtils::consistentHashSort( $sortedServers, $key );
223 reset( $sortedServers );
224 $serverIndex = key( $sortedServers );
225 } else {
226 $serverIndex = 0;
227 }
228 return [ $serverIndex, $this->getTableNameByShard( $tableIndex ) ];
229 }
230
231 /**
232 * Get the table name for a given shard index
233 * @param int $index
234 * @return string
235 */
236 protected function getTableNameByShard( $index ) {
237 if ( $this->shards > 1 ) {
238 $decimals = strlen( $this->shards - 1 );
239 return $this->tableName .
240 sprintf( "%0{$decimals}d", $index );
241 } else {
242 return $this->tableName;
243 }
244 }
245
246 protected function doGet( $key, $flags = 0, &$casToken = null ) {
247 $casToken = null;
248
249 $blobs = $this->fetchBlobMulti( [ $key ] );
250 if ( array_key_exists( $key, $blobs ) ) {
251 $blob = $blobs[$key];
252 $value = $this->unserialize( $blob );
253
254 $casToken = ( $value !== false ) ? $blob : null;
255
256 return $value;
257 }
258
259 return false;
260 }
261
262 protected function doGetMulti( array $keys, $flags = 0 ) {
263 $values = [];
264
265 $blobs = $this->fetchBlobMulti( $keys );
266 foreach ( $blobs as $key => $blob ) {
267 $values[$key] = $this->unserialize( $blob );
268 }
269
270 return $values;
271 }
272
273 protected function fetchBlobMulti( array $keys, $flags = 0 ) {
274 $values = []; // array of (key => value)
275
276 $keysByTable = [];
277 foreach ( $keys as $key ) {
278 list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
279 $keysByTable[$serverIndex][$tableName][] = $key;
280 }
281
282 $dataRows = [];
283 foreach ( $keysByTable as $serverIndex => $serverKeys ) {
284 try {
285 $db = $this->getDB( $serverIndex );
286 foreach ( $serverKeys as $tableName => $tableKeys ) {
287 $res = $db->select( $tableName,
288 [ 'keyname', 'value', 'exptime' ],
289 [ 'keyname' => $tableKeys ],
290 __METHOD__,
291 // Approximate write-on-the-fly BagOStuff API via blocking.
292 // This approximation fails if a ROLLBACK happens (which is rare).
293 // We do not want to flush the TRX as that can break callers.
294 $db->trxLevel() ? [ 'LOCK IN SHARE MODE' ] : []
295 );
296 if ( $res === false ) {
297 continue;
298 }
299 foreach ( $res as $row ) {
300 $row->serverIndex = $serverIndex;
301 $row->tableName = $tableName;
302 $dataRows[$row->keyname] = $row;
303 }
304 }
305 } catch ( DBError $e ) {
306 $this->handleReadError( $e, $serverIndex );
307 }
308 }
309
310 foreach ( $keys as $key ) {
311 if ( isset( $dataRows[$key] ) ) { // HIT?
312 $row = $dataRows[$key];
313 $this->debug( "get: retrieved data; expiry time is " . $row->exptime );
314 $db = null;
315 try {
316 $db = $this->getDB( $row->serverIndex );
317 if ( $this->isExpired( $db, $row->exptime ) ) { // MISS
318 $this->debug( "get: key has expired" );
319 } else { // HIT
320 $values[$key] = $db->decodeBlob( $row->value );
321 }
322 } catch ( DBQueryError $e ) {
323 $this->handleWriteError( $e, $db, $row->serverIndex );
324 }
325 } else { // MISS
326 $this->debug( 'get: no matching rows' );
327 }
328 }
329
330 return $values;
331 }
332
333 public function setMulti( array $data, $expiry = 0, $flags = 0 ) {
334 return $this->insertMulti( $data, $expiry, $flags, true );
335 }
336
337 private function insertMulti( array $data, $expiry, $flags, $replace ) {
338 $keysByTable = [];
339 foreach ( $data as $key => $value ) {
340 list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
341 $keysByTable[$serverIndex][$tableName][] = $key;
342 }
343
344 $result = true;
345 $exptime = (int)$expiry;
346 /** @noinspection PhpUnusedLocalVariableInspection */
347 $silenceScope = $this->silenceTransactionProfiler();
348 foreach ( $keysByTable as $serverIndex => $serverKeys ) {
349 $db = null;
350 try {
351 $db = $this->getDB( $serverIndex );
352 $this->occasionallyGarbageCollect( $db );
353 } catch ( DBError $e ) {
354 $this->handleWriteError( $e, $db, $serverIndex );
355 $result = false;
356 continue;
357 }
358
359 if ( $exptime < 0 ) {
360 $exptime = 0;
361 }
362
363 if ( $exptime == 0 ) {
364 $encExpiry = $this->getMaxDateTime( $db );
365 } else {
366 $exptime = $this->convertToExpiry( $exptime );
367 $encExpiry = $db->timestamp( $exptime );
368 }
369 foreach ( $serverKeys as $tableName => $tableKeys ) {
370 $rows = [];
371 foreach ( $tableKeys as $key ) {
372 $rows[] = [
373 'keyname' => $key,
374 'value' => $db->encodeBlob( $this->serialize( $data[$key] ) ),
375 'exptime' => $encExpiry,
376 ];
377 }
378
379 try {
380 if ( $replace ) {
381 $db->replace( $tableName, [ 'keyname' ], $rows, __METHOD__ );
382 } else {
383 $db->insert( $tableName, $rows, __METHOD__, [ 'IGNORE' ] );
384 $result = ( $db->affectedRows() > 0 && $result );
385 }
386 } catch ( DBError $e ) {
387 $this->handleWriteError( $e, $db, $serverIndex );
388 $result = false;
389 }
390
391 }
392 }
393
394 if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
395 $result = $this->waitForReplication() && $result;
396 }
397
398 return $result;
399 }
400
401 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
402 $ok = $this->insertMulti( [ $key => $value ], $exptime, $flags, true );
403
404 return $ok;
405 }
406
407 public function add( $key, $value, $exptime = 0, $flags = 0 ) {
408 $added = $this->insertMulti( [ $key => $value ], $exptime, $flags, false );
409
410 return $added;
411 }
412
413 protected function cas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
414 list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
415 $db = null;
416 /** @noinspection PhpUnusedLocalVariableInspection */
417 $silenceScope = $this->silenceTransactionProfiler();
418 try {
419 $db = $this->getDB( $serverIndex );
420 $exptime = intval( $exptime );
421
422 if ( $exptime < 0 ) {
423 $exptime = 0;
424 }
425
426 if ( $exptime == 0 ) {
427 $encExpiry = $this->getMaxDateTime( $db );
428 } else {
429 $exptime = $this->convertToExpiry( $exptime );
430 $encExpiry = $db->timestamp( $exptime );
431 }
432 // (T26425) use a replace if the db supports it instead of
433 // delete/insert to avoid clashes with conflicting keynames
434 $db->update(
435 $tableName,
436 [
437 'keyname' => $key,
438 'value' => $db->encodeBlob( $this->serialize( $value ) ),
439 'exptime' => $encExpiry
440 ],
441 [
442 'keyname' => $key,
443 'value' => $db->encodeBlob( $casToken )
444 ],
445 __METHOD__
446 );
447 } catch ( DBQueryError $e ) {
448 $this->handleWriteError( $e, $db, $serverIndex );
449
450 return false;
451 }
452
453 return (bool)$db->affectedRows();
454 }
455
456 public function deleteMulti( array $keys, $flags = 0 ) {
457 return $this->purgeMulti( $keys, $flags );
458 }
459
460 public function purgeMulti( array $keys, $flags = 0 ) {
461 $keysByTable = [];
462 foreach ( $keys as $key ) {
463 list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
464 $keysByTable[$serverIndex][$tableName][] = $key;
465 }
466
467 $result = true;
468 /** @noinspection PhpUnusedLocalVariableInspection */
469 $silenceScope = $this->silenceTransactionProfiler();
470 foreach ( $keysByTable as $serverIndex => $serverKeys ) {
471 $db = null;
472 try {
473 $db = $this->getDB( $serverIndex );
474 } catch ( DBError $e ) {
475 $this->handleWriteError( $e, $db, $serverIndex );
476 $result = false;
477 continue;
478 }
479
480 foreach ( $serverKeys as $tableName => $tableKeys ) {
481 try {
482 $db->delete( $tableName, [ 'keyname' => $tableKeys ], __METHOD__ );
483 } catch ( DBError $e ) {
484 $this->handleWriteError( $e, $db, $serverIndex );
485 $result = false;
486 }
487
488 }
489 }
490
491 if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
492 $result = $this->waitForReplication() && $result;
493 }
494
495 return $result;
496 }
497
498 protected function doDelete( $key, $flags = 0 ) {
499 $ok = $this->purgeMulti( [ $key ], $flags );
500
501 return $ok;
502 }
503
504 public function incr( $key, $step = 1 ) {
505 list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
506 $db = null;
507 /** @noinspection PhpUnusedLocalVariableInspection */
508 $silenceScope = $this->silenceTransactionProfiler();
509 try {
510 $db = $this->getDB( $serverIndex );
511 $step = intval( $step );
512 $row = $db->selectRow(
513 $tableName,
514 [ 'value', 'exptime' ],
515 [ 'keyname' => $key ],
516 __METHOD__,
517 [ 'FOR UPDATE' ]
518 );
519 if ( $row === false ) {
520 // Missing
521 return false;
522 }
523 $db->delete( $tableName, [ 'keyname' => $key ], __METHOD__ );
524 if ( $this->isExpired( $db, $row->exptime ) ) {
525 // Expired, do not reinsert
526 return false;
527 }
528
529 $oldValue = intval( $this->unserialize( $db->decodeBlob( $row->value ) ) );
530 $newValue = $oldValue + $step;
531 $db->insert(
532 $tableName,
533 [
534 'keyname' => $key,
535 'value' => $db->encodeBlob( $this->serialize( $newValue ) ),
536 'exptime' => $row->exptime
537 ],
538 __METHOD__,
539 [ 'IGNORE' ]
540 );
541
542 if ( $db->affectedRows() == 0 ) {
543 // Race condition. See T30611
544 $newValue = false;
545 }
546 } catch ( DBError $e ) {
547 $this->handleWriteError( $e, $db, $serverIndex );
548 return false;
549 }
550
551 return $newValue;
552 }
553
554 public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
555 $ok = $this->mergeViaCas( $key, $callback, $exptime, $attempts, $flags );
556 if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
557 $ok = $this->waitForReplication() && $ok;
558 }
559
560 return $ok;
561 }
562
563 public function changeTTL( $key, $exptime = 0, $flags = 0 ) {
564 list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
565 $db = null;
566 /** @noinspection PhpUnusedLocalVariableInspection */
567 $silenceScope = $this->silenceTransactionProfiler();
568 try {
569 $db = $this->getDB( $serverIndex );
570 if ( $exptime == 0 ) {
571 $timestamp = $this->getMaxDateTime( $db );
572 } else {
573 $timestamp = $db->timestamp( $this->convertToExpiry( $exptime ) );
574 }
575 $db->update(
576 $tableName,
577 [ 'exptime' => $timestamp ],
578 [ 'keyname' => $key, 'exptime > ' . $db->addQuotes( $db->timestamp( time() ) ) ],
579 __METHOD__
580 );
581 if ( $db->affectedRows() == 0 ) {
582 $exists = (bool)$db->selectField(
583 $tableName,
584 1,
585 [ 'keyname' => $key, 'exptime' => $timestamp ],
586 __METHOD__,
587 [ 'FOR UPDATE' ]
588 );
589
590 return $exists;
591 }
592 } catch ( DBError $e ) {
593 $this->handleWriteError( $e, $db, $serverIndex );
594 return false;
595 }
596
597 return true;
598 }
599
600 /**
601 * @param IDatabase $db
602 * @param string $exptime
603 * @return bool
604 */
605 protected function isExpired( $db, $exptime ) {
606 return (
607 $exptime != $this->getMaxDateTime( $db ) &&
608 wfTimestamp( TS_UNIX, $exptime ) < time()
609 );
610 }
611
612 /**
613 * @param IDatabase $db
614 * @return string
615 */
616 protected function getMaxDateTime( $db ) {
617 if ( time() > 0x7fffffff ) {
618 return $db->timestamp( 1 << 62 );
619 } else {
620 return $db->timestamp( 0x7fffffff );
621 }
622 }
623
624 /**
625 * @param IDatabase $db
626 * @throws DBError
627 */
628 protected function occasionallyGarbageCollect( IDatabase $db ) {
629 if (
630 // Random purging is enabled
631 $this->purgePeriod &&
632 // This is not using a replica DB
633 !$this->replicaOnly &&
634 // Only purge on one in every $this->purgePeriod writes
635 mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
636 // Avoid repeating the delete within a few seconds
637 ( time() - $this->lastGarbageCollect ) > self::GARBAGE_COLLECT_DELAY_SEC
638 ) {
639 $garbageCollector = function () use ( $db ) {
640 $this->deleteServerObjectsExpiringBefore( $db, time(), null, $this->purgeLimit );
641 $this->lastGarbageCollect = time();
642 };
643 if ( $this->asyncHandler ) {
644 $this->lastGarbageCollect = time(); // avoid duplicate enqueues
645 ( $this->asyncHandler )( $garbageCollector );
646 } else {
647 $garbageCollector();
648 }
649 }
650 }
651
652 public function expireAll() {
653 $this->deleteObjectsExpiringBefore( time() );
654 }
655
656 public function deleteObjectsExpiringBefore(
657 $timestamp,
658 callable $progressCallback = null,
659 $limit = INF
660 ) {
661 /** @noinspection PhpUnusedLocalVariableInspection */
662 $silenceScope = $this->silenceTransactionProfiler();
663
664 $serverIndexes = range( 0, $this->numServers - 1 );
665 shuffle( $serverIndexes );
666
667 $ok = true;
668
669 $keysDeletedCount = 0;
670 foreach ( $serverIndexes as $numServersDone => $serverIndex ) {
671 $db = null;
672 try {
673 $db = $this->getDB( $serverIndex );
674 $this->deleteServerObjectsExpiringBefore(
675 $db,
676 $timestamp,
677 $progressCallback,
678 $limit,
679 $numServersDone,
680 $keysDeletedCount
681 );
682 } catch ( DBError $e ) {
683 $this->handleWriteError( $e, $db, $serverIndex );
684 $ok = false;
685 }
686 }
687
688 return $ok;
689 }
690
691 /**
692 * @param IDatabase $db
693 * @param string|int $timestamp
694 * @param callable|null $progressCallback
695 * @param int $limit
696 * @param int $serversDoneCount
697 * @param int &$keysDeletedCount
698 * @throws DBError
699 */
700 private function deleteServerObjectsExpiringBefore(
701 IDatabase $db,
702 $timestamp,
703 $progressCallback,
704 $limit,
705 $serversDoneCount = 0,
706 &$keysDeletedCount = 0
707 ) {
708 $cutoffUnix = wfTimestamp( TS_UNIX, $timestamp );
709 $shardIndexes = range( 0, $this->shards - 1 );
710 shuffle( $shardIndexes );
711
712 foreach ( $shardIndexes as $numShardsDone => $shardIndex ) {
713 $continue = null; // last exptime
714 $lag = null; // purge lag
715 do {
716 $res = $db->select(
717 $this->getTableNameByShard( $shardIndex ),
718 [ 'keyname', 'exptime' ],
719 array_merge(
720 [ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ) ],
721 $continue ? [ 'exptime >= ' . $db->addQuotes( $continue ) ] : []
722 ),
723 __METHOD__,
724 [ 'LIMIT' => min( $limit, 100 ), 'ORDER BY' => 'exptime' ]
725 );
726
727 if ( $res->numRows() ) {
728 $row = $res->current();
729 if ( $lag === null ) {
730 $lag = max( $cutoffUnix - wfTimestamp( TS_UNIX, $row->exptime ), 1 );
731 }
732
733 $keys = [];
734 foreach ( $res as $row ) {
735 $keys[] = $row->keyname;
736 $continue = $row->exptime;
737 }
738
739 $db->delete(
740 $this->getTableNameByShard( $shardIndex ),
741 [
742 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ),
743 'keyname' => $keys
744 ],
745 __METHOD__
746 );
747 $keysDeletedCount += $db->affectedRows();
748 }
749
750 if ( is_callable( $progressCallback ) ) {
751 if ( $lag ) {
752 $remainingLag = $cutoffUnix - wfTimestamp( TS_UNIX, $continue );
753 $processedLag = max( $lag - $remainingLag, 0 );
754 $doneRatio = ( $numShardsDone + $processedLag / $lag ) / $this->shards;
755 } else {
756 $doneRatio = 1;
757 }
758
759 $overallRatio = ( $doneRatio / $this->numServers )
760 + ( $serversDoneCount / $this->numServers );
761 call_user_func( $progressCallback, $overallRatio * 100 );
762 }
763 } while ( $res->numRows() && $keysDeletedCount < $limit );
764 }
765 }
766
767 /**
768 * Delete content of shard tables in every server.
769 * Return true if the operation is successful, false otherwise.
770 * @return bool
771 */
772 public function deleteAll() {
773 /** @noinspection PhpUnusedLocalVariableInspection */
774 $silenceScope = $this->silenceTransactionProfiler();
775 for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
776 $db = null;
777 try {
778 $db = $this->getDB( $serverIndex );
779 for ( $i = 0; $i < $this->shards; $i++ ) {
780 $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ );
781 }
782 } catch ( DBError $e ) {
783 $this->handleWriteError( $e, $db, $serverIndex );
784 return false;
785 }
786 }
787 return true;
788 }
789
790 public function lock( $key, $timeout = 6, $expiry = 6, $rclass = '' ) {
791 // Avoid deadlocks and allow lock reentry if specified
792 if ( isset( $this->locks[$key] ) ) {
793 if ( $rclass != '' && $this->locks[$key]['class'] === $rclass ) {
794 ++$this->locks[$key]['depth'];
795 return true;
796 } else {
797 return false;
798 }
799 }
800
801 list( $serverIndex ) = $this->getTableByKey( $key );
802
803 $db = null;
804 try {
805 $db = $this->getDB( $serverIndex );
806 $ok = $db->lock( $key, __METHOD__, $timeout );
807 if ( $ok ) {
808 $this->locks[$key] = [ 'class' => $rclass, 'depth' => 1 ];
809 }
810
811 $this->logger->warning(
812 __METHOD__ . " failed due to timeout for {key}.",
813 [ 'key' => $key, 'timeout' => $timeout ]
814 );
815
816 return $ok;
817 } catch ( DBError $e ) {
818 $this->handleWriteError( $e, $db, $serverIndex );
819 $ok = false;
820 }
821
822 return $ok;
823 }
824
825 public function unlock( $key ) {
826 if ( !isset( $this->locks[$key] ) ) {
827 return false;
828 }
829
830 if ( --$this->locks[$key]['depth'] <= 0 ) {
831 unset( $this->locks[$key] );
832
833 list( $serverIndex ) = $this->getTableByKey( $key );
834
835 $db = null;
836 try {
837 $db = $this->getDB( $serverIndex );
838 $ok = $db->unlock( $key, __METHOD__ );
839 if ( !$ok ) {
840 $this->logger->warning(
841 __METHOD__ . ' failed to release lock for {key}.',
842 [ 'key' => $key ]
843 );
844 }
845 } catch ( DBError $e ) {
846 $this->handleWriteError( $e, $db, $serverIndex );
847 $ok = false;
848 }
849 } else {
850 $ok = false;
851 }
852
853 return $ok;
854 }
855
856 /**
857 * Serialize an object and, if possible, compress the representation.
858 * On typical message and page data, this can provide a 3X decrease
859 * in storage requirements.
860 *
861 * @param mixed $data
862 * @return string
863 */
864 protected function serialize( $data ) {
865 $serial = serialize( $data );
866
867 if ( function_exists( 'gzdeflate' ) ) {
868 return gzdeflate( $serial );
869 } else {
870 return $serial;
871 }
872 }
873
874 /**
875 * Unserialize and, if necessary, decompress an object.
876 * @param string $serial
877 * @return mixed
878 */
879 protected function unserialize( $serial ) {
880 if ( function_exists( 'gzinflate' ) ) {
881 Wikimedia\suppressWarnings();
882 $decomp = gzinflate( $serial );
883 Wikimedia\restoreWarnings();
884
885 if ( $decomp !== false ) {
886 $serial = $decomp;
887 }
888 }
889
890 $ret = unserialize( $serial );
891
892 return $ret;
893 }
894
895 /**
896 * Handle a DBError which occurred during a read operation.
897 *
898 * @param DBError $exception
899 * @param int $serverIndex
900 */
901 protected function handleReadError( DBError $exception, $serverIndex ) {
902 if ( $exception instanceof DBConnectionError ) {
903 $this->markServerDown( $exception, $serverIndex );
904 }
905
906 $this->setAndLogDBError( $exception );
907 }
908
909 /**
910 * Handle a DBQueryError which occurred during a write operation.
911 *
912 * @param DBError $exception
913 * @param IDatabase|null $db DB handle or null if connection failed
914 * @param int $serverIndex
915 * @throws Exception
916 */
917 protected function handleWriteError( DBError $exception, $db, $serverIndex ) {
918 if ( !( $db instanceof IDatabase ) ) {
919 $this->markServerDown( $exception, $serverIndex );
920 }
921
922 $this->setAndLogDBError( $exception );
923 }
924
925 /**
926 * @param DBError $exception
927 */
928 private function setAndLogDBError( DBError $exception ) {
929 $this->logger->error( "DBError: {$exception->getMessage()}" );
930 if ( $exception instanceof DBConnectionError ) {
931 $this->setLastError( BagOStuff::ERR_UNREACHABLE );
932 $this->logger->debug( __METHOD__ . ": ignoring connection error" );
933 } else {
934 $this->setLastError( BagOStuff::ERR_UNEXPECTED );
935 $this->logger->debug( __METHOD__ . ": ignoring query error" );
936 }
937 }
938
939 /**
940 * Mark a server down due to a DBConnectionError exception
941 *
942 * @param DBError $exception
943 * @param int $serverIndex
944 */
945 protected function markServerDown( DBError $exception, $serverIndex ) {
946 unset( $this->conns[$serverIndex] ); // bug T103435
947
948 if ( isset( $this->connFailureTimes[$serverIndex] ) ) {
949 if ( time() - $this->connFailureTimes[$serverIndex] >= 60 ) {
950 unset( $this->connFailureTimes[$serverIndex] );
951 unset( $this->connFailureErrors[$serverIndex] );
952 } else {
953 $this->logger->debug( __METHOD__ . ": Server #$serverIndex already down" );
954 return;
955 }
956 }
957 $now = time();
958 $this->logger->info( __METHOD__ . ": Server #$serverIndex down until " . ( $now + 60 ) );
959 $this->connFailureTimes[$serverIndex] = $now;
960 $this->connFailureErrors[$serverIndex] = $exception;
961 }
962
963 /**
964 * Create shard tables. For use from eval.php.
965 */
966 public function createTables() {
967 for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
968 $db = $this->getDB( $serverIndex );
969 if ( $db->getType() !== 'mysql' ) {
970 throw new MWException( __METHOD__ . ' is not supported on this DB server' );
971 }
972
973 for ( $i = 0; $i < $this->shards; $i++ ) {
974 $db->query(
975 'CREATE TABLE ' . $db->tableName( $this->getTableNameByShard( $i ) ) .
976 ' LIKE ' . $db->tableName( 'objectcache' ),
977 __METHOD__ );
978 }
979 }
980 }
981
982 /**
983 * @return bool Whether the main DB is used, e.g. wfGetDB( DB_MASTER )
984 */
985 protected function usesMainDB() {
986 return !$this->serverInfos;
987 }
988
989 protected function waitForReplication() {
990 if ( !$this->usesMainDB() ) {
991 // Custom DB server list; probably doesn't use replication
992 return true;
993 }
994
995 $lb = MediaWikiServices::getInstance()->getDBLoadBalancer();
996 if ( $lb->getServerCount() <= 1 ) {
997 return true; // no replica DBs
998 }
999
1000 // Main LB is used; wait for any replica DBs to catch up
1001 try {
1002 $masterPos = $lb->getMasterPos();
1003 if ( !$masterPos ) {
1004 return true; // not applicable
1005 }
1006
1007 $loop = new WaitConditionLoop(
1008 function () use ( $lb, $masterPos ) {
1009 return $lb->waitForAll( $masterPos, 1 );
1010 },
1011 $this->syncTimeout,
1012 $this->busyCallbacks
1013 );
1014
1015 return ( $loop->invoke() === $loop::CONDITION_REACHED );
1016 } catch ( DBError $e ) {
1017 $this->setAndLogDBError( $e );
1018
1019 return false;
1020 }
1021 }
1022
1023 /**
1024 * Returns a ScopedCallback which resets the silence flag in the transaction profiler when it is
1025 * destroyed on the end of a scope, for example on return or throw
1026 * @return ScopedCallback
1027 * @since 1.32
1028 */
1029 protected function silenceTransactionProfiler() {
1030 $trxProfiler = Profiler::instance()->getTransactionProfiler();
1031 $oldSilenced = $trxProfiler->setSilenced( true );
1032 return new ScopedCallback( function () use ( $trxProfiler, $oldSilenced ) {
1033 $trxProfiler->setSilenced( $oldSilenced );
1034 } );
1035 }
1036 }