objectcache: Make SqlBagOStuff::waitForSlaves() no-op without slaves
[lhc/web/wiklou.git] / includes / objectcache / SqlBagOStuff.php
1 <?php
2 /**
3 * Object caching using a SQL database.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
19 *
20 * @file
21 * @ingroup Cache
22 */
23
24 use \MediaWiki\MediaWikiServices;
25
26 /**
27 * Class to store objects in the database
28 *
29 * @ingroup Cache
30 */
31 class SqlBagOStuff extends BagOStuff {
32 /** @var array[] (server index => server config) */
33 protected $serverInfos;
34 /** @var string[] (server index => tag/host name) */
35 protected $serverTags;
36 /** @var int */
37 protected $numServers;
38 /** @var int */
39 protected $lastExpireAll = 0;
40 /** @var int */
41 protected $purgePeriod = 100;
42 /** @var int */
43 protected $shards = 1;
44 /** @var string */
45 protected $tableName = 'objectcache';
46 /** @var bool */
47 protected $slaveOnly = false;
48 /** @var int */
49 protected $syncTimeout = 3;
50
51 /** @var LoadBalancer|null */
52 protected $separateMainLB;
53 /** @var array */
54 protected $conns;
55 /** @var array UNIX timestamps */
56 protected $connFailureTimes = [];
57 /** @var array Exceptions */
58 protected $connFailureErrors = [];
59
60 /**
61 * Constructor. Parameters are:
62 * - server: A server info structure in the format required by each
63 * element in $wgDBServers.
64 *
65 * - servers: An array of server info structures describing a set of database servers
66 * to distribute keys to. If this is specified, the "server" option will be
67 * ignored. If string keys are used, then they will be used for consistent
68 * hashing *instead* of the host name (from the server config). This is useful
69 * when a cluster is replicated to another site (with different host names)
70 * but each server has a corresponding replica in the other cluster.
71 *
72 * - purgePeriod: The average number of object cache requests in between
73 * garbage collection operations, where expired entries
74 * are removed from the database. Or in other words, the
75 * reciprocal of the probability of purging on any given
76 * request. If this is set to zero, purging will never be
77 * done.
78 *
79 * - tableName: The table name to use, default is "objectcache".
80 *
81 * - shards: The number of tables to use for data storage on each server.
82 * If this is more than 1, table names will be formed in the style
83 * objectcacheNNN where NNN is the shard index, between 0 and
84 * shards-1. The number of digits will be the minimum number
85 * required to hold the largest shard index. Data will be
86 * distributed across all tables by key hash. This is for
87 * MySQL bugs 61735 and 61736.
88 * - slaveOnly: Whether to only use slave DBs and avoid triggering
89 * garbage collection logic of expired items. This only
90 * makes sense if the primary DB is used and only if get()
91 * calls will be used. This is used by ReplicatedBagOStuff.
92 * - syncTimeout: Max seconds to wait for slaves to catch up for WRITE_SYNC.
93 *
94 * @param array $params
95 */
96 public function __construct( $params ) {
97 parent::__construct( $params );
98
99 $this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL;
100
101 if ( isset( $params['servers'] ) ) {
102 $this->serverInfos = [];
103 $this->serverTags = [];
104 $this->numServers = count( $params['servers'] );
105 $index = 0;
106 foreach ( $params['servers'] as $tag => $info ) {
107 $this->serverInfos[$index] = $info;
108 if ( is_string( $tag ) ) {
109 $this->serverTags[$index] = $tag;
110 } else {
111 $this->serverTags[$index] = isset( $info['host'] ) ? $info['host'] : "#$index";
112 }
113 ++$index;
114 }
115 } elseif ( isset( $params['server'] ) ) {
116 $this->serverInfos = [ $params['server'] ];
117 $this->numServers = count( $this->serverInfos );
118 } else {
119 // Default to using the main wiki's database servers
120 $this->serverInfos = false;
121 $this->numServers = 1;
122 }
123 if ( isset( $params['purgePeriod'] ) ) {
124 $this->purgePeriod = intval( $params['purgePeriod'] );
125 }
126 if ( isset( $params['tableName'] ) ) {
127 $this->tableName = $params['tableName'];
128 }
129 if ( isset( $params['shards'] ) ) {
130 $this->shards = intval( $params['shards'] );
131 }
132 if ( isset( $params['syncTimeout'] ) ) {
133 $this->syncTimeout = $params['syncTimeout'];
134 }
135 $this->slaveOnly = !empty( $params['slaveOnly'] );
136 }
137
138 protected function getSeparateMainLB() {
139 global $wgDBtype;
140
141 if ( $wgDBtype === 'mysql' && $this->usesMainDB() ) {
142 if ( !$this->separateMainLB ) {
143 // We must keep a separate connection to MySQL in order to avoid deadlocks
144 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
145 $this->separateMainLB = $lbFactory->newMainLB();
146 }
147 return $this->separateMainLB;
148 } else {
149 // However, SQLite has an opposite behavior. And PostgreSQL needs to know
150 // if we are in transaction or not (@TODO: find some PostgreSQL work-around).
151 return null;
152 }
153 }
154
155 /**
156 * Get a connection to the specified database
157 *
158 * @param int $serverIndex
159 * @return IDatabase
160 * @throws MWException
161 */
162 protected function getDB( $serverIndex ) {
163 if ( !isset( $this->conns[$serverIndex] ) ) {
164 if ( $serverIndex >= $this->numServers ) {
165 throw new MWException( __METHOD__ . ": Invalid server index \"$serverIndex\"" );
166 }
167
168 # Don't keep timing out trying to connect for each call if the DB is down
169 if ( isset( $this->connFailureErrors[$serverIndex] )
170 && ( time() - $this->connFailureTimes[$serverIndex] ) < 60
171 ) {
172 throw $this->connFailureErrors[$serverIndex];
173 }
174
175 # If server connection info was given, use that
176 if ( $this->serverInfos ) {
177 $info = $this->serverInfos[$serverIndex];
178 $type = isset( $info['type'] ) ? $info['type'] : 'mysql';
179 $host = isset( $info['host'] ) ? $info['host'] : '[unknown]';
180 $this->logger->debug( __CLASS__ . ": connecting to $host" );
181 // Use a blank trx profiler to ignore expections as this is a cache
182 $info['trxProfiler'] = new TransactionProfiler();
183 $db = DatabaseBase::factory( $type, $info );
184 $db->clearFlag( DBO_TRX );
185 } else {
186 $index = $this->slaveOnly ? DB_SLAVE : DB_MASTER;
187 if ( $this->getSeparateMainLB() ) {
188 $db = $this->getSeparateMainLB()->getConnection( $index );
189 $db->clearFlag( DBO_TRX ); // auto-commit mode
190 } else {
191 $db = wfGetDB( $index );
192 // Can't mess with transaction rounds (DBO_TRX) :(
193 }
194 }
195 $this->logger->debug( sprintf( "Connection %s will be used for SqlBagOStuff", $db ) );
196 $this->conns[$serverIndex] = $db;
197 }
198
199 return $this->conns[$serverIndex];
200 }
201
202 /**
203 * Get the server index and table name for a given key
204 * @param string $key
205 * @return array Server index and table name
206 */
207 protected function getTableByKey( $key ) {
208 if ( $this->shards > 1 ) {
209 $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
210 $tableIndex = $hash % $this->shards;
211 } else {
212 $tableIndex = 0;
213 }
214 if ( $this->numServers > 1 ) {
215 $sortedServers = $this->serverTags;
216 ArrayUtils::consistentHashSort( $sortedServers, $key );
217 reset( $sortedServers );
218 $serverIndex = key( $sortedServers );
219 } else {
220 $serverIndex = 0;
221 }
222 return [ $serverIndex, $this->getTableNameByShard( $tableIndex ) ];
223 }
224
225 /**
226 * Get the table name for a given shard index
227 * @param int $index
228 * @return string
229 */
230 protected function getTableNameByShard( $index ) {
231 if ( $this->shards > 1 ) {
232 $decimals = strlen( $this->shards - 1 );
233 return $this->tableName .
234 sprintf( "%0{$decimals}d", $index );
235 } else {
236 return $this->tableName;
237 }
238 }
239
240 protected function doGet( $key, $flags = 0 ) {
241 $casToken = null;
242
243 return $this->getWithToken( $key, $casToken, $flags );
244 }
245
246 protected function getWithToken( $key, &$casToken, $flags = 0 ) {
247 $values = $this->getMulti( [ $key ] );
248 if ( array_key_exists( $key, $values ) ) {
249 $casToken = $values[$key];
250 return $values[$key];
251 }
252 return false;
253 }
254
255 public function getMulti( array $keys, $flags = 0 ) {
256 $values = []; // array of (key => value)
257
258 $keysByTable = [];
259 foreach ( $keys as $key ) {
260 list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
261 $keysByTable[$serverIndex][$tableName][] = $key;
262 }
263
264 $this->garbageCollect(); // expire old entries if any
265
266 $dataRows = [];
267 foreach ( $keysByTable as $serverIndex => $serverKeys ) {
268 try {
269 $db = $this->getDB( $serverIndex );
270 foreach ( $serverKeys as $tableName => $tableKeys ) {
271 $res = $db->select( $tableName,
272 [ 'keyname', 'value', 'exptime' ],
273 [ 'keyname' => $tableKeys ],
274 __METHOD__,
275 // Approximate write-on-the-fly BagOStuff API via blocking.
276 // This approximation fails if a ROLLBACK happens (which is rare).
277 // We do not want to flush the TRX as that can break callers.
278 $db->trxLevel() ? [ 'LOCK IN SHARE MODE' ] : []
279 );
280 if ( $res === false ) {
281 continue;
282 }
283 foreach ( $res as $row ) {
284 $row->serverIndex = $serverIndex;
285 $row->tableName = $tableName;
286 $dataRows[$row->keyname] = $row;
287 }
288 }
289 } catch ( DBError $e ) {
290 $this->handleReadError( $e, $serverIndex );
291 }
292 }
293
294 foreach ( $keys as $key ) {
295 if ( isset( $dataRows[$key] ) ) { // HIT?
296 $row = $dataRows[$key];
297 $this->debug( "get: retrieved data; expiry time is " . $row->exptime );
298 $db = null;
299 try {
300 $db = $this->getDB( $row->serverIndex );
301 if ( $this->isExpired( $db, $row->exptime ) ) { // MISS
302 $this->debug( "get: key has expired" );
303 } else { // HIT
304 $values[$key] = $this->unserialize( $db->decodeBlob( $row->value ) );
305 }
306 } catch ( DBQueryError $e ) {
307 $this->handleWriteError( $e, $db, $row->serverIndex );
308 }
309 } else { // MISS
310 $this->debug( 'get: no matching rows' );
311 }
312 }
313
314 return $values;
315 }
316
317 public function setMulti( array $data, $expiry = 0 ) {
318 $keysByTable = [];
319 foreach ( $data as $key => $value ) {
320 list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
321 $keysByTable[$serverIndex][$tableName][] = $key;
322 }
323
324 $this->garbageCollect(); // expire old entries if any
325
326 $result = true;
327 $exptime = (int)$expiry;
328 foreach ( $keysByTable as $serverIndex => $serverKeys ) {
329 $db = null;
330 try {
331 $db = $this->getDB( $serverIndex );
332 } catch ( DBError $e ) {
333 $this->handleWriteError( $e, $db, $serverIndex );
334 $result = false;
335 continue;
336 }
337
338 if ( $exptime < 0 ) {
339 $exptime = 0;
340 }
341
342 if ( $exptime == 0 ) {
343 $encExpiry = $this->getMaxDateTime( $db );
344 } else {
345 $exptime = $this->convertExpiry( $exptime );
346 $encExpiry = $db->timestamp( $exptime );
347 }
348 foreach ( $serverKeys as $tableName => $tableKeys ) {
349 $rows = [];
350 foreach ( $tableKeys as $key ) {
351 $rows[] = [
352 'keyname' => $key,
353 'value' => $db->encodeBlob( $this->serialize( $data[$key] ) ),
354 'exptime' => $encExpiry,
355 ];
356 }
357
358 try {
359 $db->replace(
360 $tableName,
361 [ 'keyname' ],
362 $rows,
363 __METHOD__
364 );
365 } catch ( DBError $e ) {
366 $this->handleWriteError( $e, $db, $serverIndex );
367 $result = false;
368 }
369
370 }
371
372 }
373
374 return $result;
375 }
376
377 public function set( $key, $value, $exptime = 0, $flags = 0 ) {
378 $ok = $this->setMulti( [ $key => $value ], $exptime );
379 if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
380 $ok = $ok && $this->waitForSlaves();
381 }
382
383 return $ok;
384 }
385
386 protected function cas( $casToken, $key, $value, $exptime = 0 ) {
387 list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
388 $db = null;
389 try {
390 $db = $this->getDB( $serverIndex );
391 $exptime = intval( $exptime );
392
393 if ( $exptime < 0 ) {
394 $exptime = 0;
395 }
396
397 if ( $exptime == 0 ) {
398 $encExpiry = $this->getMaxDateTime( $db );
399 } else {
400 $exptime = $this->convertExpiry( $exptime );
401 $encExpiry = $db->timestamp( $exptime );
402 }
403 // (bug 24425) use a replace if the db supports it instead of
404 // delete/insert to avoid clashes with conflicting keynames
405 $db->update(
406 $tableName,
407 [
408 'keyname' => $key,
409 'value' => $db->encodeBlob( $this->serialize( $value ) ),
410 'exptime' => $encExpiry
411 ],
412 [
413 'keyname' => $key,
414 'value' => $db->encodeBlob( $this->serialize( $casToken ) )
415 ],
416 __METHOD__
417 );
418 } catch ( DBQueryError $e ) {
419 $this->handleWriteError( $e, $db, $serverIndex );
420
421 return false;
422 }
423
424 return (bool)$db->affectedRows();
425 }
426
427 public function delete( $key ) {
428 list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
429 $db = null;
430 try {
431 $db = $this->getDB( $serverIndex );
432 $db->delete(
433 $tableName,
434 [ 'keyname' => $key ],
435 __METHOD__ );
436 } catch ( DBError $e ) {
437 $this->handleWriteError( $e, $db, $serverIndex );
438 return false;
439 }
440
441 return true;
442 }
443
444 public function incr( $key, $step = 1 ) {
445 list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
446 $db = null;
447 try {
448 $db = $this->getDB( $serverIndex );
449 $step = intval( $step );
450 $row = $db->selectRow(
451 $tableName,
452 [ 'value', 'exptime' ],
453 [ 'keyname' => $key ],
454 __METHOD__,
455 [ 'FOR UPDATE' ] );
456 if ( $row === false ) {
457 // Missing
458
459 return null;
460 }
461 $db->delete( $tableName, [ 'keyname' => $key ], __METHOD__ );
462 if ( $this->isExpired( $db, $row->exptime ) ) {
463 // Expired, do not reinsert
464
465 return null;
466 }
467
468 $oldValue = intval( $this->unserialize( $db->decodeBlob( $row->value ) ) );
469 $newValue = $oldValue + $step;
470 $db->insert( $tableName,
471 [
472 'keyname' => $key,
473 'value' => $db->encodeBlob( $this->serialize( $newValue ) ),
474 'exptime' => $row->exptime
475 ], __METHOD__, 'IGNORE' );
476
477 if ( $db->affectedRows() == 0 ) {
478 // Race condition. See bug 28611
479 $newValue = null;
480 }
481 } catch ( DBError $e ) {
482 $this->handleWriteError( $e, $db, $serverIndex );
483 return null;
484 }
485
486 return $newValue;
487 }
488
489 public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
490 $ok = $this->mergeViaCas( $key, $callback, $exptime, $attempts );
491 if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
492 $ok = $ok && $this->waitForSlaves();
493 }
494
495 return $ok;
496 }
497
498 public function changeTTL( $key, $expiry = 0 ) {
499 list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
500 $db = null;
501 try {
502 $db = $this->getDB( $serverIndex );
503 $db->update(
504 $tableName,
505 [ 'exptime' => $db->timestamp( $this->convertExpiry( $expiry ) ) ],
506 [ 'keyname' => $key, 'exptime > ' . $db->addQuotes( $db->timestamp( time() ) ) ],
507 __METHOD__
508 );
509 if ( $db->affectedRows() == 0 ) {
510 return false;
511 }
512 } catch ( DBError $e ) {
513 $this->handleWriteError( $e, $db, $serverIndex );
514 return false;
515 }
516
517 return true;
518 }
519
520 /**
521 * @param IDatabase $db
522 * @param string $exptime
523 * @return bool
524 */
525 protected function isExpired( $db, $exptime ) {
526 return $exptime != $this->getMaxDateTime( $db ) && wfTimestamp( TS_UNIX, $exptime ) < time();
527 }
528
529 /**
530 * @param IDatabase $db
531 * @return string
532 */
533 protected function getMaxDateTime( $db ) {
534 if ( time() > 0x7fffffff ) {
535 return $db->timestamp( 1 << 62 );
536 } else {
537 return $db->timestamp( 0x7fffffff );
538 }
539 }
540
541 protected function garbageCollect() {
542 if ( !$this->purgePeriod || $this->slaveOnly ) {
543 // Disabled
544 return;
545 }
546 // Only purge on one in every $this->purgePeriod requests.
547 if ( $this->purgePeriod !== 1 && mt_rand( 0, $this->purgePeriod - 1 ) ) {
548 return;
549 }
550 $now = time();
551 // Avoid repeating the delete within a few seconds
552 if ( $now > ( $this->lastExpireAll + 1 ) ) {
553 $this->lastExpireAll = $now;
554 $this->expireAll();
555 }
556 }
557
558 public function expireAll() {
559 $this->deleteObjectsExpiringBefore( wfTimestampNow() );
560 }
561
562 /**
563 * Delete objects from the database which expire before a certain date.
564 * @param string $timestamp
565 * @param bool|callable $progressCallback
566 * @return bool
567 */
568 public function deleteObjectsExpiringBefore( $timestamp, $progressCallback = false ) {
569 for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
570 $db = null;
571 try {
572 $db = $this->getDB( $serverIndex );
573 $dbTimestamp = $db->timestamp( $timestamp );
574 $totalSeconds = false;
575 $baseConds = [ 'exptime < ' . $db->addQuotes( $dbTimestamp ) ];
576 for ( $i = 0; $i < $this->shards; $i++ ) {
577 $maxExpTime = false;
578 while ( true ) {
579 $conds = $baseConds;
580 if ( $maxExpTime !== false ) {
581 $conds[] = 'exptime > ' . $db->addQuotes( $maxExpTime );
582 }
583 $rows = $db->select(
584 $this->getTableNameByShard( $i ),
585 [ 'keyname', 'exptime' ],
586 $conds,
587 __METHOD__,
588 [ 'LIMIT' => 100, 'ORDER BY' => 'exptime' ] );
589 if ( $rows === false || !$rows->numRows() ) {
590 break;
591 }
592 $keys = [];
593 $row = $rows->current();
594 $minExpTime = $row->exptime;
595 if ( $totalSeconds === false ) {
596 $totalSeconds = wfTimestamp( TS_UNIX, $timestamp )
597 - wfTimestamp( TS_UNIX, $minExpTime );
598 }
599 foreach ( $rows as $row ) {
600 $keys[] = $row->keyname;
601 $maxExpTime = $row->exptime;
602 }
603
604 $db->delete(
605 $this->getTableNameByShard( $i ),
606 [
607 'exptime >= ' . $db->addQuotes( $minExpTime ),
608 'exptime < ' . $db->addQuotes( $dbTimestamp ),
609 'keyname' => $keys
610 ],
611 __METHOD__ );
612
613 if ( $progressCallback ) {
614 if ( intval( $totalSeconds ) === 0 ) {
615 $percent = 0;
616 } else {
617 $remainingSeconds = wfTimestamp( TS_UNIX, $timestamp )
618 - wfTimestamp( TS_UNIX, $maxExpTime );
619 if ( $remainingSeconds > $totalSeconds ) {
620 $totalSeconds = $remainingSeconds;
621 }
622 $processedSeconds = $totalSeconds - $remainingSeconds;
623 $percent = ( $i + $processedSeconds / $totalSeconds )
624 / $this->shards * 100;
625 }
626 $percent = ( $percent / $this->numServers )
627 + ( $serverIndex / $this->numServers * 100 );
628 call_user_func( $progressCallback, $percent );
629 }
630 }
631 }
632 } catch ( DBError $e ) {
633 $this->handleWriteError( $e, $db, $serverIndex );
634 return false;
635 }
636 }
637 return true;
638 }
639
640 /**
641 * Delete content of shard tables in every server.
642 * Return true if the operation is successful, false otherwise.
643 * @return bool
644 */
645 public function deleteAll() {
646 for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
647 $db = null;
648 try {
649 $db = $this->getDB( $serverIndex );
650 for ( $i = 0; $i < $this->shards; $i++ ) {
651 $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ );
652 }
653 } catch ( DBError $e ) {
654 $this->handleWriteError( $e, $db, $serverIndex );
655 return false;
656 }
657 }
658 return true;
659 }
660
661 /**
662 * Serialize an object and, if possible, compress the representation.
663 * On typical message and page data, this can provide a 3X decrease
664 * in storage requirements.
665 *
666 * @param mixed $data
667 * @return string
668 */
669 protected function serialize( &$data ) {
670 $serial = serialize( $data );
671
672 if ( function_exists( 'gzdeflate' ) ) {
673 return gzdeflate( $serial );
674 } else {
675 return $serial;
676 }
677 }
678
679 /**
680 * Unserialize and, if necessary, decompress an object.
681 * @param string $serial
682 * @return mixed
683 */
684 protected function unserialize( $serial ) {
685 if ( function_exists( 'gzinflate' ) ) {
686 MediaWiki\suppressWarnings();
687 $decomp = gzinflate( $serial );
688 MediaWiki\restoreWarnings();
689
690 if ( false !== $decomp ) {
691 $serial = $decomp;
692 }
693 }
694
695 $ret = unserialize( $serial );
696
697 return $ret;
698 }
699
700 /**
701 * Handle a DBError which occurred during a read operation.
702 *
703 * @param DBError $exception
704 * @param int $serverIndex
705 */
706 protected function handleReadError( DBError $exception, $serverIndex ) {
707 if ( $exception instanceof DBConnectionError ) {
708 $this->markServerDown( $exception, $serverIndex );
709 }
710 $this->logger->error( "DBError: {$exception->getMessage()}" );
711 if ( $exception instanceof DBConnectionError ) {
712 $this->setLastError( BagOStuff::ERR_UNREACHABLE );
713 $this->logger->debug( __METHOD__ . ": ignoring connection error" );
714 } else {
715 $this->setLastError( BagOStuff::ERR_UNEXPECTED );
716 $this->logger->debug( __METHOD__ . ": ignoring query error" );
717 }
718 }
719
720 /**
721 * Handle a DBQueryError which occurred during a write operation.
722 *
723 * @param DBError $exception
724 * @param IDatabase|null $db DB handle or null if connection failed
725 * @param int $serverIndex
726 * @throws Exception
727 */
728 protected function handleWriteError( DBError $exception, IDatabase $db = null, $serverIndex ) {
729 if ( !$db ) {
730 $this->markServerDown( $exception, $serverIndex );
731 } elseif ( $db->wasReadOnlyError() ) {
732 if ( $db->trxLevel() && $this->usesMainDB() ) {
733 // Errors like deadlocks and connection drops already cause rollback.
734 // For consistency, we have no choice but to throw an error and trigger
735 // complete rollback if the main DB is also being used as the cache DB.
736 throw $exception;
737 }
738 }
739
740 $this->logger->error( "DBError: {$exception->getMessage()}" );
741 if ( $exception instanceof DBConnectionError ) {
742 $this->setLastError( BagOStuff::ERR_UNREACHABLE );
743 $this->logger->debug( __METHOD__ . ": ignoring connection error" );
744 } else {
745 $this->setLastError( BagOStuff::ERR_UNEXPECTED );
746 $this->logger->debug( __METHOD__ . ": ignoring query error" );
747 }
748 }
749
750 /**
751 * Mark a server down due to a DBConnectionError exception
752 *
753 * @param DBError $exception
754 * @param int $serverIndex
755 */
756 protected function markServerDown( DBError $exception, $serverIndex ) {
757 unset( $this->conns[$serverIndex] ); // bug T103435
758
759 if ( isset( $this->connFailureTimes[$serverIndex] ) ) {
760 if ( time() - $this->connFailureTimes[$serverIndex] >= 60 ) {
761 unset( $this->connFailureTimes[$serverIndex] );
762 unset( $this->connFailureErrors[$serverIndex] );
763 } else {
764 $this->logger->debug( __METHOD__ . ": Server #$serverIndex already down" );
765 return;
766 }
767 }
768 $now = time();
769 $this->logger->info( __METHOD__ . ": Server #$serverIndex down until " . ( $now + 60 ) );
770 $this->connFailureTimes[$serverIndex] = $now;
771 $this->connFailureErrors[$serverIndex] = $exception;
772 }
773
774 /**
775 * Create shard tables. For use from eval.php.
776 */
777 public function createTables() {
778 for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
779 $db = $this->getDB( $serverIndex );
780 if ( $db->getType() !== 'mysql' ) {
781 throw new MWException( __METHOD__ . ' is not supported on this DB server' );
782 }
783
784 for ( $i = 0; $i < $this->shards; $i++ ) {
785 $db->query(
786 'CREATE TABLE ' . $db->tableName( $this->getTableNameByShard( $i ) ) .
787 ' LIKE ' . $db->tableName( 'objectcache' ),
788 __METHOD__ );
789 }
790 }
791 }
792
793 /**
794 * @return bool Whether the main DB is used, e.g. wfGetDB( DB_MASTER )
795 */
796 protected function usesMainDB() {
797 return !$this->serverInfos;
798 }
799
800 protected function waitForSlaves() {
801 if ( $this->usesMainDB() ) {
802 $lb = $this->getSeparateMainLB()
803 ?: MediaWikiServices::getInstance()->getDBLoadBalancer();
804 // Return if there are no slaves
805 if ( $lb->getServerCount() <= 1 ) {
806 return true;
807 }
808 // Main LB is used; wait for any slaves to catch up
809 try {
810 $pos = $lb->getMasterPos();
811 if ( $pos ) {
812 return $lb->waitForAll( $pos, 3 );
813 }
814 } catch ( DBReplicationWaitError $e ) {
815 return false;
816 }
817 }
818
819 // Custom DB server list; probably doesn't use replication
820 return true;
821 }
822 }