Merge "Use {{int:}} on MediaWiki:Blockedtext and MediaWiki:Autoblockedtext"
[lhc/web/wiklou.git] / includes / libs / rdbms / loadbalancer / LoadBalancer.php
index 94acc1e..360be42 100644 (file)
@@ -59,8 +59,8 @@ class LoadBalancer implements ILoadBalancer {
 
        /** @var ILoadMonitor */
        private $loadMonitor;
-       /** @var ChronologyProtector|null */
-       private $chronProt;
+       /** @var callable|null Callback to run before the first connection attempt */
+       private $chronologyCallback;
        /** @var BagOStuff */
        private $srvCache;
        /** @var WANObjectCache */
@@ -116,10 +116,12 @@ class LoadBalancer implements ILoadBalancer {
 
        /** @var bool */
        private $disabled = false;
-       /** @var bool */
-       private $chronProtInitialized = false;
+       /** @var bool Whether any connection has been attempted yet */
+       private $connectionAttempted = false;
        /** @var int */
        private $maxLag = self::MAX_LAG_DEFAULT;
+       /** @var string Stage of the current transaction round in the transaction round life-cycle */
+       private $trxRoundStage = self::ROUND_CURSORY;
 
        /** @var int Warn when this many connection are held */
        const CONN_HELD_WARN_THRESHOLD = 10;
@@ -139,6 +141,19 @@ class LoadBalancer implements ILoadBalancer {
        const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
        const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
 
+       /** @var string Transaction round, explicit or implicit, has not finished writing */
+       const ROUND_CURSORY = 'cursory';
+       /** @var string Transaction round writes are complete and ready for pre-commit checks */
+       const ROUND_FINALIZED = 'finalized';
+       /** @var string Transaction round passed final pre-commit checks */
+       const ROUND_APPROVED = 'approved';
+       /** @var string Transaction round was committed and post-commit callbacks must be run */
+       const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
+       /** @var string Transaction round was rolled back and post-rollback callbacks must be run */
+       const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
+       /** @var string Transaction round encountered an error */
+       const ROUND_ERROR = 'error';
+
        public function __construct( array $params ) {
                if ( !isset( $params['servers'] ) ) {
                        throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
@@ -243,8 +258,16 @@ class LoadBalancer implements ILoadBalancer {
                        : ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
                $this->agent = isset( $params['agent'] ) ? $params['agent'] : '';
 
-               if ( isset( $params['chronologyProtector'] ) ) {
-                       $this->chronProt = $params['chronologyProtector'];
+               if ( isset( $params['chronologyCallback'] ) ) {
+                       $this->chronologyCallback = $params['chronologyCallback'];
+               }
+
+               if ( isset( $params['roundStage'] ) ) {
+                       if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
+                               $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
+                       } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
+                               $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
+                       }
                }
        }
 
@@ -369,7 +392,7 @@ class LoadBalancer implements ILoadBalancer {
                // Scale the configured load ratios according to each server's load and state
                $this->getLoadMonitor()->scaleLoads( $loads, $domain );
 
-               // Pick a server to use, accounting for weights, load, lag, and mWaitForPos
+               // Pick a server to use, accounting for weights, load, lag, and "waitForPos"
                list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
                if ( $i === false ) {
                        // Replica DB connection unsuccessful
@@ -379,7 +402,7 @@ class LoadBalancer implements ILoadBalancer {
                if ( $this->waitForPos && $i != $this->getWriterIndex() ) {
                        // Before any data queries are run, wait for the server to catch up to the
                        // specified position. This is used to improve session consistency. Note that
-                       // when LoadBalancer::waitFor() sets mWaitForPos, the waiting triggers here,
+                       // when LoadBalancer::waitFor() sets "waitForPos", the waiting triggers here,
                        // so update laggedReplicaMode as needed for consistency.
                        if ( !$this->doWait( $i ) ) {
                                $laggedReplicaMode = true;
@@ -424,7 +447,7 @@ class LoadBalancer implements ILoadBalancer {
                        } else {
                                $i = false;
                                if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
-                                       // ChronologyProtecter sets mWaitForPos for session consistency.
+                                       // "chronologyCallback" sets "waitForPos" for session consistency.
                                        // This triggers doWait() after connect, so it's especially good to
                                        // avoid lagged servers so as to avoid excessive delay in that method.
                                        $ago = microtime( true ) - $this->waitForPos->asOfTime();
@@ -566,17 +589,17 @@ class LoadBalancer implements ILoadBalancer {
                }
        }
 
-       /**
-        * @param int $i
-        * @return IDatabase|bool
-        */
-       public function getAnyOpenConnection( $i ) {
+       public function getAnyOpenConnection( $i, $flags = 0 ) {
+               $autocommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
                foreach ( $this->conns as $connsByServer ) {
-                       if ( !empty( $connsByServer[$i] ) ) {
-                               /** @var IDatabase[] $serverConns */
-                               $serverConns = $connsByServer[$i];
+                       if ( !isset( $connsByServer[$i] ) ) {
+                               continue;
+                       }
 
-                               return reset( $serverConns );
+                       foreach ( $connsByServer[$i] as $conn ) {
+                               if ( !$autocommit || $conn->getLBInfo( 'autoCommitOnly' ) ) {
+                                       return $conn;
+                               }
                        }
                }
 
@@ -591,7 +614,7 @@ class LoadBalancer implements ILoadBalancer {
         * @return bool
         */
        protected function doWait( $index, $open = false, $timeout = null ) {
-               $timeout = max( 1, $timeout ?: $this->waitTimeout );
+               $timeout = max( 1, intval( $timeout ?: $this->waitTimeout ) );
 
                // Check if we already know that the DB has reached this point
                $server = $this->getServerName( $index );
@@ -689,7 +712,7 @@ class LoadBalancer implements ILoadBalancer {
                        $domain = false; // local connection requested
                }
 
-               if ( ( $flags & self::CONN_TRX_AUTO ) === self::CONN_TRX_AUTO ) {
+               if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) {
                        // Assuming all servers are of the same type (or similar), which is overwhelmingly
                        // the case, use the master server information to get the attributes. The information
                        // for $i cannot be used since it might be DB_REPLICA, which might require connection
@@ -700,8 +723,9 @@ class LoadBalancer implements ILoadBalancer {
                                // rows (e.g. FOR UPDATE) or (b) make small commits during a larger transactions
                                // to reduce lock contention. None of these apply for sqlite and using separate
                                // connections just causes self-deadlocks.
-                               $flags &= ~self::CONN_TRX_AUTO;
-                               $this->connLogger->info( __METHOD__ . ': ignoring CONN_TRX_AUTO to avoid deadlocks.' );
+                               $flags &= ~self::CONN_TRX_AUTOCOMMIT;
+                               $this->connLogger->info( __METHOD__ .
+                                       ': ignoring CONN_TRX_AUTOCOMMIT to avoid deadlocks.' );
                        }
                }
 
@@ -714,7 +738,7 @@ class LoadBalancer implements ILoadBalancer {
 
                if ( $i == self::DB_MASTER ) {
                        $i = $this->getWriterIndex();
-               } else {
+               } elseif ( $i == self::DB_REPLICA ) {
                        # Try to find an available server in any the query groups (in order)
                        foreach ( $groups as $group ) {
                                $groupIndex = $this->getReaderIndex( $group, $domain );
@@ -848,18 +872,18 @@ class LoadBalancer implements ILoadBalancer {
                        $domain = false; // local connection requested
                }
 
-               if ( !$this->chronProtInitialized && $this->chronProt ) {
+               if ( !$this->connectionAttempted && $this->chronologyCallback ) {
                        $this->connLogger->debug( __METHOD__ . ': calling initLB() before first connection.' );
-                       // Load CP positions before connecting so that doWait() triggers later if needed
-                       $this->chronProtInitialized = true;
-                       $this->chronProt->initLB( $this );
+                       // Load any "waitFor" positions before connecting so that doWait() is triggered
+                       $this->connectionAttempted = true;
+                       call_user_func( $this->chronologyCallback, $this );
                }
 
                // Check if an auto-commit connection is being requested. If so, it will not reuse the
                // main set of DB connections but rather its own pool since:
                // a) those are usually set to implicitly use transaction rounds via DBO_TRX
                // b) those must support the use of explicit transaction rounds via beginMasterChanges()
-               $autoCommit = ( ( $flags & self::CONN_TRX_AUTO ) == self::CONN_TRX_AUTO );
+               $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
 
                if ( $domain !== false ) {
                        // Connection is to a foreign domain
@@ -937,7 +961,7 @@ class LoadBalancer implements ILoadBalancer {
                $domainInstance = DatabaseDomain::newFromId( $domain );
                $dbName = $domainInstance->getDatabase();
                $prefix = $domainInstance->getTablePrefix();
-               $autoCommit = ( ( $flags & self::CONN_TRX_AUTO ) == self::CONN_TRX_AUTO );
+               $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
 
                if ( $autoCommit ) {
                        $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
@@ -1129,8 +1153,7 @@ class LoadBalancer implements ILoadBalancer {
                                $context
                        );
 
-                       // throws DBConnectionError
-                       $conn->reportConnectionError( "{$this->lastError} ({$context['db_server']})" );
+                       throw new DBConnectionError( $conn, "{$this->lastError} ({$context['db_server']})" );
                } else {
                        // No last connection, probably due to all servers being too busy
                        $this->connLogger->error(
@@ -1139,7 +1162,7 @@ class LoadBalancer implements ILoadBalancer {
                                $context
                        );
 
-                       // If all servers were busy, mLastError will contain something sensible
+                       // If all servers were busy, "lastError" will contain something sensible
                        throw new DBConnectionError( null, $this->lastError );
                }
        }
@@ -1220,7 +1243,7 @@ class LoadBalancer implements ILoadBalancer {
        }
 
        public function closeConnection( IDatabase $conn ) {
-               $serverIndex = $conn->getLBInfo( 'serverIndex' ); // second index level of mConns
+               $serverIndex = $conn->getLBInfo( 'serverIndex' );
                foreach ( $this->conns as $type => $connsByServer ) {
                        if ( !isset( $connsByServer[$serverIndex] ) ) {
                                continue;
@@ -1242,44 +1265,41 @@ class LoadBalancer implements ILoadBalancer {
        }
 
        public function commitAll( $fname = __METHOD__ ) {
-               $failures = [];
-
-               $restore = ( $this->trxRoundId !== false );
-               $this->trxRoundId = false;
-               $this->forEachOpenConnection(
-                       function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
-                               try {
-                                       $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
-                               } catch ( DBError $e ) {
-                                       call_user_func( $this->errorLogger, $e );
-                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
-                               }
-                               if ( $restore && $conn->getLBInfo( 'master' ) ) {
-                                       $this->undoTransactionRoundFlags( $conn );
-                               }
-                       }
-               );
-
-               if ( $failures ) {
-                       throw new DBExpectedError(
-                               null,
-                               "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
-                       );
-               }
+               $this->commitMasterChanges( $fname );
+               $this->flushMasterSnapshots( $fname );
+               $this->flushReplicaSnapshots( $fname );
        }
 
        public function finalizeMasterChanges() {
+               $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
+
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+               // Loop until callbacks stop adding callbacks on other connections
+               $total = 0;
+               do {
+                       $count = 0; // callbacks execution attempts
+                       $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$count ) {
+                               // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
+                               // Any error should cause all (peer) transactions to be rolled back together.
+                               $count += $conn->runOnTransactionPreCommitCallbacks();
+                       } );
+                       $total += $count;
+               } while ( $count > 0 );
+               // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
                $this->forEachOpenMasterConnection( function ( Database $conn ) {
-                       // Any error should cause all DB transactions to be rolled back together
-                       $conn->setTrxEndCallbackSuppression( false );
-                       $conn->runOnTransactionPreCommitCallbacks();
-                       // Defer post-commit callbacks until COMMIT finishes for all DBs
                        $conn->setTrxEndCallbackSuppression( true );
                } );
+               $this->trxRoundStage = self::ROUND_FINALIZED;
+
+               return $total;
        }
 
        public function approveMasterChanges( array $options ) {
+               $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
+
                $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
+
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
                $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
                        // If atomic sections or explicit transactions are still open, some caller must have
                        // caught an exception but failed to properly rollback any changes. Detect that and
@@ -1309,6 +1329,7 @@ class LoadBalancer implements ILoadBalancer {
                                );
                        }
                } );
+               $this->trxRoundStage = self::ROUND_APPROVED;
        }
 
        public function beginMasterChanges( $fname = __METHOD__ ) {
@@ -1318,32 +1339,26 @@ class LoadBalancer implements ILoadBalancer {
                                "$fname: Transaction round '{$this->trxRoundId}' already started."
                        );
                }
-               $this->trxRoundId = $fname;
+               $this->assertTransactionRoundStage( self::ROUND_CURSORY );
 
-               $failures = [];
-               $this->forEachOpenMasterConnection(
-                       function ( Database $conn ) use ( $fname, &$failures ) {
-                               $conn->setTrxEndCallbackSuppression( true );
-                               try {
-                                       $conn->flushSnapshot( $fname );
-                               } catch ( DBError $e ) {
-                                       call_user_func( $this->errorLogger, $e );
-                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
-                               }
-                               $conn->setTrxEndCallbackSuppression( false );
-                               $this->applyTransactionRoundFlags( $conn );
-                       }
-               );
+               // Clear any empty transactions (no writes/callbacks) from the implicit round
+               $this->flushMasterSnapshots( $fname );
 
-               if ( $failures ) {
-                       throw new DBExpectedError(
-                               null,
-                               "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
-                       );
-               }
+               $this->trxRoundId = $fname;
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+               // Mark applicable handles as participating in this explicit transaction round.
+               // For each of these handles, any writes and callbacks will be tied to a single
+               // transaction. The (peer) handles will reject begin()/commit() calls unless they
+               // are part of an en masse commit or an en masse rollback.
+               $this->forEachOpenMasterConnection( function ( Database $conn ) {
+                       $this->applyTransactionRoundFlags( $conn );
+               } );
+               $this->trxRoundStage = self::ROUND_CURSORY;
        }
 
        public function commitMasterChanges( $fname = __METHOD__ ) {
+               $this->assertTransactionRoundStage( self::ROUND_APPROVED );
+
                $failures = [];
 
                /** @noinspection PhpUnusedLocalVariableInspection */
@@ -1351,62 +1366,125 @@ class LoadBalancer implements ILoadBalancer {
 
                $restore = ( $this->trxRoundId !== false );
                $this->trxRoundId = false;
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+               // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
+               // Note that callbacks should already be suppressed due to finalizeMasterChanges().
                $this->forEachOpenMasterConnection(
-                       function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
+                       function ( IDatabase $conn ) use ( $fname, &$failures ) {
                                try {
-                                       if ( $conn->writesOrCallbacksPending() ) {
-                                               $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
-                                       } elseif ( $restore ) {
-                                               $conn->flushSnapshot( $fname );
-                                       }
+                                       $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
                                } catch ( DBError $e ) {
                                        call_user_func( $this->errorLogger, $e );
                                        $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
                                }
-                               if ( $restore ) {
-                                       $this->undoTransactionRoundFlags( $conn );
-                               }
                        }
                );
-
                if ( $failures ) {
-                       throw new DBExpectedError(
+                       throw new DBTransactionError(
                                null,
                                "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
                        );
                }
+               if ( $restore ) {
+                       // Unmark handles as participating in this explicit transaction round
+                       $this->forEachOpenMasterConnection( function ( Database $conn ) {
+                               $this->undoTransactionRoundFlags( $conn );
+                       } );
+               }
+               $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
        }
 
-       public function runMasterPostTrxCallbacks( $type ) {
+       public function runMasterTransactionIdleCallbacks() {
+               if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
+                       $type = IDatabase::TRIGGER_COMMIT;
+               } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
+                       $type = IDatabase::TRIGGER_ROLLBACK;
+               } else {
+                       throw new DBTransactionError(
+                               null,
+                               "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
+                       );
+               }
+
+               $oldStage = $this->trxRoundStage;
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+
+               // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
+               $this->forEachOpenMasterConnection( function ( Database $conn ) {
+                       $conn->setTrxEndCallbackSuppression( false );
+               } );
+
                $e = null; // first exception
+               // Loop until callbacks stop adding callbacks on other connections
+               do {
+                       // Run any pending callbacks for each connection...
+                       $count = 0; // callback execution attempts
+                       $this->forEachOpenMasterConnection(
+                               function ( Database $conn ) use ( $type, &$e, &$count ) {
+                                       if ( $conn->trxLevel() ) {
+                                               return; // retry in the next iteration, after commit() is called
+                                       }
+                                       try {
+                                               $count += $conn->runOnTransactionIdleCallbacks( $type );
+                                       } catch ( Exception $ex ) {
+                                               $e = $e ?: $ex;
+                                       }
+                               }
+                       );
+                       // Clear out any active transactions left over from callbacks...
+                       $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$e ) {
+                               if ( $conn->writesPending() ) {
+                                       // A callback from another handle wrote to this one and DBO_TRX is set
+                                       $this->queryLogger->warning( __METHOD__ . ": found writes pending." );
+                                       $fnames = implode( ', ', $conn->pendingWriteAndCallbackCallers() );
+                                       $this->queryLogger->warning(
+                                               __METHOD__ . ": found writes pending ($fnames).",
+                                               [
+                                                       'db_server' => $conn->getServer(),
+                                                       'db_name' => $conn->getDBname()
+                                               ]
+                                       );
+                               } elseif ( $conn->trxLevel() ) {
+                                       // A callback from another handle read from this one and DBO_TRX is set,
+                                       // which can easily happen if there is only one DB (no replicas)
+                                       $this->queryLogger->debug( __METHOD__ . ": found empty transaction." );
+                               }
+                               try {
+                                       $conn->commit( __METHOD__, $conn::FLUSHING_ALL_PEERS );
+                               } catch ( Exception $ex ) {
+                                       $e = $e ?: $ex;
+                               }
+                       } );
+               } while ( $count > 0 );
+
+               $this->trxRoundStage = $oldStage;
+
+               return $e;
+       }
+
+       public function runMasterTransactionListenerCallbacks() {
+               if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
+                       $type = IDatabase::TRIGGER_COMMIT;
+               } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
+                       $type = IDatabase::TRIGGER_ROLLBACK;
+               } else {
+                       throw new DBTransactionError(
+                               null,
+                               "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
+                       );
+               }
+
+               $e = null;
+
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
                $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
-                       $conn->setTrxEndCallbackSuppression( false );
-                       // Callbacks run in AUTO-COMMIT mode, so make sure no transactions are pending...
-                       if ( $conn->writesPending() ) {
-                               // This happens if onTransactionIdle() callbacks write to *other* handles
-                               // (which already finished their callbacks). Let any callbacks run in the final
-                               // commitMasterChanges() in LBFactory::shutdown(), when the transaction is gone.
-                               $this->queryLogger->warning( __METHOD__ . ": found writes pending." );
-                               return;
-                       } elseif ( $conn->trxLevel() ) {
-                               // This happens for single-DB setups where DB_REPLICA uses the master DB,
-                               // thus leaving an implicit read-only transaction open at this point. It
-                               // also happens if onTransactionIdle() callbacks leave implicit transactions
-                               // open on *other* DBs (which is slightly improper). Let these COMMIT on the
-                               // next call to commitMasterChanges(), possibly in LBFactory::shutdown().
-                               return;
-                       }
-                       try {
-                               $conn->runOnTransactionIdleCallbacks( $type );
-                       } catch ( Exception $ex ) {
-                               $e = $e ?: $ex;
-                       }
                        try {
                                $conn->runTransactionListenerCallbacks( $type );
                        } catch ( Exception $ex ) {
                                $e = $e ?: $ex;
                        }
                } );
+               $this->trxRoundStage = self::ROUND_CURSORY;
 
                return $e;
        }
@@ -1414,20 +1492,37 @@ class LoadBalancer implements ILoadBalancer {
        public function rollbackMasterChanges( $fname = __METHOD__ ) {
                $restore = ( $this->trxRoundId !== false );
                $this->trxRoundId = false;
-               $this->forEachOpenMasterConnection(
-                       function ( IDatabase $conn ) use ( $fname, $restore ) {
-                               $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
-                               if ( $restore ) {
-                                       $this->undoTransactionRoundFlags( $conn );
-                               }
-                       }
-               );
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+               $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
+                       $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
+               } );
+               if ( $restore ) {
+                       // Unmark handles as participating in this explicit transaction round
+                       $this->forEachOpenMasterConnection( function ( Database $conn ) {
+                               $this->undoTransactionRoundFlags( $conn );
+                       } );
+               }
+               $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
        }
 
-       public function suppressTransactionEndCallbacks() {
-               $this->forEachOpenMasterConnection( function ( Database $conn ) {
-                       $conn->setTrxEndCallbackSuppression( true );
-               } );
+       /**
+        * @param string|string[] $stage
+        */
+       private function assertTransactionRoundStage( $stage ) {
+               $stages = (array)$stage;
+
+               if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
+                       $stageList = implode(
+                               '/',
+                               array_map( function ( $v ) {
+                                       return "'$v'";
+                               }, $stages )
+                       );
+                       throw new DBTransactionError(
+                               null,
+                               "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
+                       );
+               }
        }
 
        /**
@@ -1437,9 +1532,9 @@ class LoadBalancer implements ILoadBalancer {
         * transaction rounds and remain in auto-commit mode. Such behavior might be desired
         * when a DB server is used for something like simple key/value storage.
         *
-        * @param IDatabase $conn
+        * @param Database $conn
         */
-       private function applyTransactionRoundFlags( IDatabase $conn ) {
+       private function applyTransactionRoundFlags( Database $conn ) {
                if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
                        return; // transaction rounds do not apply to these connections
                }
@@ -1456,9 +1551,9 @@ class LoadBalancer implements ILoadBalancer {
        }
 
        /**
-        * @param IDatabase $conn
+        * @param Database $conn
         */
-       private function undoTransactionRoundFlags( IDatabase $conn ) {
+       private function undoTransactionRoundFlags( Database $conn ) {
                if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
                        return; // transaction rounds do not apply to these connections
                }
@@ -1473,11 +1568,25 @@ class LoadBalancer implements ILoadBalancer {
        }
 
        public function flushReplicaSnapshots( $fname = __METHOD__ ) {
-               $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) {
-                       $conn->flushSnapshot( __METHOD__ );
+               $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) use ( $fname ) {
+                       $conn->flushSnapshot( $fname );
                } );
        }
 
+       public function flushMasterSnapshots( $fname = __METHOD__ ) {
+               $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
+                       $conn->flushSnapshot( $fname );
+               } );
+       }
+
+       /**
+        * @return string
+        * @since 1.32
+        */
+       public function getTransactionRoundStage() {
+               return $this->trxRoundStage;
+       }
+
        public function hasMasterConnection() {
                return $this->isOpen( $this->getWriterIndex() );
        }
@@ -1533,15 +1642,6 @@ class LoadBalancer implements ILoadBalancer {
                return $this->laggedReplicaMode;
        }
 
-       /**
-        * @param bool $domain
-        * @return bool
-        * @deprecated 1.28; use getLaggedReplicaMode()
-        */
-       public function getLaggedSlaveMode( $domain = false ) {
-               return $this->getLaggedReplicaMode( $domain );
-       }
-
        public function laggedReplicaUsed() {
                return $this->laggedReplicaMode;
        }