rdbms: enforce and improve LBFactory/LoadBalancer callback handling
authorAaron Schulz <aschulz@wikimedia.org>
Wed, 28 Mar 2018 20:01:32 +0000 (13:01 -0700)
committerKrinkle <krinklemail@gmail.com>
Mon, 30 Apr 2018 21:55:25 +0000 (21:55 +0000)
* Handle the case where an onTransaction* callback for one handle
  adds more onTransaction* callbacks to a different handle. Instead
  of supporting only a short chain of such callbacks, try to resolve
  the whole chain by using a loop in LoadBalancer and LBFactory.
* Add sanity checks to enforce the proper call order of LoadBalancer
  transaction methods, such as those that execute callbacks. This is
  the order that LBFactory already uses. Use ROUND_ERROR for problems
  that can ruin the instance state. Such problems require rollback.
* Correct setTrxEndCallbackSuppression() calls in beginMasterChanges()
  that were making tests fail.
* Make Database handle callback suppression for FLUSHING_ALL_PEERS
  instead of making LoadBalancer/LBFactory have to manage it.
* Simplify finalizeMasterChanges() given that suppression does not
  actually effect runOnTransactionPreCommitCallbacks().
* Make dangling callback warning in Database::close work properly.
* Actually use $fname in flushReplicaSnapshots().
* Use DBTransactionError instead of DBExpectedError in some places
  where stages fail.
* Fix failing testGetScopedLock() unit tests so everything passes.

Add more comments to setTransactionListener and onTransactionIdle.

Change-Id: I6a25a6e4e5ba666e0da065a24846cbab7e786c7b

includes/libs/rdbms/database/Database.php
includes/libs/rdbms/database/IDatabase.php
includes/libs/rdbms/lbfactory/LBFactory.php
includes/libs/rdbms/loadbalancer/ILoadBalancer.php
includes/libs/rdbms/loadbalancer/LoadBalancer.php
tests/phpunit/includes/db/LoadBalancerTest.php
tests/phpunit/includes/libs/rdbms/database/DatabaseTest.php

index cb51113..f44b7cb 100644 (file)
@@ -722,17 +722,13 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        /**
-        * Get the list of method names that have pending write queries or callbacks
-        * for this transaction
+        * Get the list of method names that have pending write queries or that
+        * have transaction callbacks that have yet to run
         *
         * @return array
         */
        protected function pendingWriteAndCallbackCallers() {
-               if ( !$this->trxLevel ) {
-                       return [];
-               }
-
-               $fnames = $this->trxWriteCallers;
+               $fnames = $this->pendingWriteCallers();
                foreach ( [
                        $this->trxIdleCallbacks,
                        $this->trxPreCommitCallbacks,
@@ -960,12 +956,10 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                }
 
                // Sanity check that no callbacks are dangling
-               if (
-                       $this->trxIdleCallbacks || $this->trxPreCommitCallbacks || $this->trxEndCallbacks
-               ) {
+               $fnames = $this->pendingWriteAndCallbackCallers();
+               if ( $fnames ) {
                        throw new RuntimeException(
-                               "Transaction callbacks are still pending:\n" .
-                               implode( ', ', $this->pendingWriteAndCallbackCallers() )
+                               "Transaction callbacks are still pending:\n" . implode( ', ', $fnames )
                        );
                }
 
@@ -3414,19 +3408,25 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        /**
-        * Actually run and consume any "on transaction idle/resolution" callbacks.
+        * Actually consume and run any "on transaction idle/resolution" callbacks.
         *
         * This method should not be used outside of Database/LoadBalancer
         *
         * @param int $trigger IDatabase::TRIGGER_* constant
+        * @return int Number of callbacks attempted
         * @since 1.20
         * @throws Exception
         */
        public function runOnTransactionIdleCallbacks( $trigger ) {
+               if ( $this->trxLevel ) { // sanity
+                       throw new DBUnexpectedError( $this, __METHOD__ . ': a transaction is still open.' );
+               }
+
                if ( $this->trxEndCallbacksSuppressed ) {
-                       return;
+                       return 0;
                }
 
+               $count = 0;
                $autoTrx = $this->getFlag( self::DBO_TRX ); // automatic begin() enabled?
                /** @var Exception $e */
                $e = null; // first exception
@@ -3439,6 +3439,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        $this->trxEndCallbacks = []; // consumed (recursion guard)
                        foreach ( $callbacks as $callback ) {
                                try {
+                                       ++$count;
                                        list( $phpCallback ) = $callback;
                                        $this->clearFlag( self::DBO_TRX ); // make each query its own transaction
                                        call_user_func( $phpCallback, $trigger, $this );
@@ -3462,23 +3463,29 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                if ( $e instanceof Exception ) {
                        throw $e; // re-throw any first exception
                }
+
+               return $count;
        }
 
        /**
-        * Actually run and consume any "on transaction pre-commit" callbacks.
+        * Actually consume and run any "on transaction pre-commit" callbacks.
         *
         * This method should not be used outside of Database/LoadBalancer
         *
         * @since 1.22
+        * @return int Number of callbacks attempted
         * @throws Exception
         */
        public function runOnTransactionPreCommitCallbacks() {
+               $count = 0;
+
                $e = null; // first exception
                do { // callbacks may add callbacks :)
                        $callbacks = $this->trxPreCommitCallbacks;
                        $this->trxPreCommitCallbacks = []; // consumed (and recursion guard)
                        foreach ( $callbacks as $callback ) {
                                try {
+                                       ++$count;
                                        list( $phpCallback ) = $callback;
                                        call_user_func( $phpCallback, $this );
                                } catch ( Exception $ex ) {
@@ -3491,6 +3498,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                if ( $e instanceof Exception ) {
                        throw $e; // re-throw any first exception
                }
+
+               return $count;
        }
 
        /**
@@ -3591,7 +3600,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                $savepointId = $cancelable === self::ATOMIC_CANCELABLE ? self::$NOT_APPLICABLE : null;
 
                if ( !$this->trxLevel ) {
-                       $this->begin( $fname, self::TRANSACTION_INTERNAL );
+                       $this->begin( $fname, self::TRANSACTION_INTERNAL ); // sets trxAutomatic
                        // If DBO_TRX is set, a series of startAtomic/endAtomic pairs will result
                        // in all changes being in one transaction to keep requests transactional.
                        if ( $this->getFlag( self::DBO_TRX ) ) {
@@ -3845,8 +3854,11 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        );
                }
 
-               $this->runOnTransactionIdleCallbacks( self::TRIGGER_COMMIT );
-               $this->runTransactionListenerCallbacks( self::TRIGGER_COMMIT );
+               // With FLUSHING_ALL_PEERS, callbacks will be explicitly run later
+               if ( $flush !== self::FLUSHING_ALL_PEERS ) {
+                       $this->runOnTransactionIdleCallbacks( self::TRIGGER_COMMIT );
+                       $this->runTransactionListenerCallbacks( self::TRIGGER_COMMIT );
+               }
        }
 
        /**
@@ -3895,7 +3907,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                $this->trxIdleCallbacks = [];
                $this->trxPreCommitCallbacks = [];
 
-               if ( $trxActive ) {
+               // With FLUSHING_ALL_PEERS, callbacks will be explicitly run later
+               if ( $trxActive && $flush !== self::FLUSHING_ALL_PEERS ) {
                        try {
                                $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
                        } catch ( Exception $e ) {
index 6b3efa2..bfaa950 100644 (file)
@@ -58,7 +58,7 @@ interface IDatabase {
        /** @var string Commit/rollback is from the connection manager for the IDatabase handle */
        const FLUSHING_ALL_PEERS = 'flush';
        /** @var string Commit/rollback is from the IDatabase handle internally */
-       const FLUSHING_INTERNAL = 'flush';
+       const FLUSHING_INTERNAL = 'flush-internal';
 
        /** @var string Do not remember the prior flags */
        const REMEMBER_NOTHING = '';
@@ -1508,6 +1508,9 @@ interface IDatabase {
         * It can also be used for updates that easily suffer from lock timeouts and deadlocks,
         * but where atomicity is not essential.
         *
+        * Avoid using IDatabase instances aside from this one in the callback, unless such instances
+        * never have IDatabase::DBO_TRX set. This keeps callbacks from interfering with one another.
+        *
         * Updates will execute in the order they were enqueued.
         *
         * @note: do not assume that *other* IDatabase instances will be AUTOCOMMIT mode
@@ -1555,7 +1558,10 @@ interface IDatabase {
         *   - This IDatabase object
         * Callbacks must commit any transactions that they begin.
         *
-        * Registering a callback here will not affect writesOrCallbacks() pending
+        * Registering a callback here will not affect writesOrCallbacks() pending.
+        *
+        * Since callbacks from this method or onTransactionIdle() can start and end transactions,
+        * a single call to IDatabase::commit might trigger multiple runs of the listener callbacks.
         *
         * @param string $name Callback name
         * @param callable|null $callback Use null to unset a listener
index c272147..ca684c3 100644 (file)
@@ -88,6 +88,14 @@ abstract class LBFactory implements ILBFactory {
        /** @var string Agent name for query profiling */
        protected $agent;
 
+       /** @var string One of the ROUND_* class constants */
+       private $trxRoundStage = self::ROUND_CURSORY;
+
+       const ROUND_CURSORY = 'cursory';
+       const ROUND_BEGINNING = 'within-begin';
+       const ROUND_COMMITTING = 'within-commit';
+       const ROUND_ROLLING_BACK = 'within-rollback';
+
        private static $loggerFields =
                [ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ];
 
@@ -206,12 +214,14 @@ abstract class LBFactory implements ILBFactory {
                $this->forEachLBCallMethod( 'flushReplicaSnapshots', [ $fname ] );
        }
 
-       public function commitAll( $fname = __METHOD__, array $options = [] ) {
+       final public function commitAll( $fname = __METHOD__, array $options = [] ) {
                $this->commitMasterChanges( $fname, $options );
                $this->forEachLBCallMethod( 'commitAll', [ $fname ] );
        }
 
-       public function beginMasterChanges( $fname = __METHOD__ ) {
+       final public function beginMasterChanges( $fname = __METHOD__ ) {
+               $this->assertTransactionRoundStage( self::ROUND_CURSORY );
+               $this->trxRoundStage = self::ROUND_BEGINNING;
                if ( $this->trxRoundId !== false ) {
                        throw new DBTransactionError(
                                null,
@@ -221,9 +231,12 @@ abstract class LBFactory implements ILBFactory {
                $this->trxRoundId = $fname;
                // Set DBO_TRX flags on all appropriate DBs
                $this->forEachLBCallMethod( 'beginMasterChanges', [ $fname ] );
+               $this->trxRoundStage = self::ROUND_CURSORY;
        }
 
-       public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) {
+       final public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) {
+               $this->assertTransactionRoundStage( self::ROUND_CURSORY );
+               $this->trxRoundStage = self::ROUND_COMMITTING;
                if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) {
                        throw new DBTransactionError(
                                null,
@@ -241,29 +254,33 @@ abstract class LBFactory implements ILBFactory {
                $this->logIfMultiDbTransaction();
                // Actually perform the commit on all master DB connections and revert DBO_TRX
                $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
-               // Run all post-commit callbacks
-               /** @var Exception $e */
+               // Run all post-commit callbacks until new ones stop getting added
                $e = null; // first callback exception
+               do {
+                       $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e ) {
+                               $ex = $lb->runMasterTransactionIdleCallbacks();
+                               $e = $e ?: $ex;
+                       } );
+               } while ( $this->hasMasterChanges() );
+               // Run all listener callbacks once
                $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e ) {
-                       $ex = $lb->runMasterPostTrxCallbacks( IDatabase::TRIGGER_COMMIT );
+                       $ex = $lb->runMasterTransactionListenerCallbacks();
                        $e = $e ?: $ex;
                } );
-               // Commit any dangling DBO_TRX transactions from callbacks on one DB to another DB
-               $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
+               $this->trxRoundStage = self::ROUND_CURSORY;
                // Throw any last post-commit callback error
                if ( $e instanceof Exception ) {
                        throw $e;
                }
        }
 
-       public function rollbackMasterChanges( $fname = __METHOD__ ) {
+       final public function rollbackMasterChanges( $fname = __METHOD__ ) {
+               $this->trxRoundStage = self::ROUND_ROLLING_BACK;
                $this->trxRoundId = false;
-               $this->forEachLBCallMethod( 'suppressTransactionEndCallbacks' );
                $this->forEachLBCallMethod( 'rollbackMasterChanges', [ $fname ] );
-               // Run all post-rollback callbacks
-               $this->forEachLB( function ( ILoadBalancer $lb ) {
-                       $lb->runMasterPostTrxCallbacks( IDatabase::TRIGGER_ROLLBACK );
-               } );
+               $this->forEachLBCallMethod( 'runMasterTransactionIdleCallbacks' );
+               $this->forEachLBCallMethod( 'runMasterTransactionListenerCallbacks' );
+               $this->trxRoundStage = self::ROUND_CURSORY;
        }
 
        public function hasTransactionRound() {
@@ -408,7 +425,7 @@ abstract class LBFactory implements ILBFactory {
                return $this->ticket;
        }
 
-       public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) {
+       final public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) {
                if ( $ticket !== $this->ticket ) {
                        $this->perfLogger->error( __METHOD__ . ": $fname does not have outer scope.\n" .
                                ( new RuntimeException() )->getTraceAsString() );
@@ -597,6 +614,18 @@ abstract class LBFactory implements ILBFactory {
                $this->requestInfo = $info + $this->requestInfo;
        }
 
+       /**
+        * @param string $stage
+        */
+       private function assertTransactionRoundStage( $stage ) {
+               if ( $this->trxRoundStage !== $stage ) {
+                       throw new DBTransactionError(
+                               null,
+                               "Transaction round stage must be '$stage' (not '{$this->trxRoundStage}')"
+                       );
+               }
+       }
+
        /**
         * Make PHP ignore user aborts/disconnects until the returned
         * value leaves scope. This returns null and does nothing in CLI mode.
index fec496e..dd257e5 100644 (file)
@@ -377,8 +377,7 @@ interface ILoadBalancer {
        public function commitAll( $fname = __METHOD__ );
 
        /**
-        * Perform all pre-commit callbacks that remain part of the atomic transactions
-        * and disable any post-commit callbacks until runMasterPostTrxCallbacks()
+        * Run pre-commit callbacks and defer execution of post-commit callbacks
         *
         * Use this only for mutli-database commits
         */
@@ -417,14 +416,18 @@ interface ILoadBalancer {
        public function commitMasterChanges( $fname = __METHOD__ );
 
        /**
-        * Issue all pending post-COMMIT/ROLLBACK callbacks
+        * Consume and run all pending post-COMMIT/ROLLBACK callbacks
         *
-        * Use this only for mutli-database commits
+        * @return Exception|null The first exception or null if there were none
+        */
+       public function runMasterTransactionIdleCallbacks();
+
+       /**
+        * Run all recurring post-COMMIT/ROLLBACK listener callbacks
         *
-        * @param int $type IDatabase::TRIGGER_* constant
         * @return Exception|null The first exception or null if there were none
         */
-       public function runMasterPostTrxCallbacks( $type );
+       public function runMasterTransactionListenerCallbacks();
 
        /**
         * Issue ROLLBACK only on master, only if queries were done on connection
@@ -433,15 +436,6 @@ interface ILoadBalancer {
         */
        public function rollbackMasterChanges( $fname = __METHOD__ );
 
-       /**
-        * Suppress all pending post-COMMIT/ROLLBACK callbacks
-        *
-        * Use this only for mutli-database commits
-        *
-        * @return Exception|null The first exception or null if there were none
-        */
-       public function suppressTransactionEndCallbacks();
-
        /**
         * Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshot
         *
index 8de6064..e70d49e 100644 (file)
@@ -120,6 +120,8 @@ class LoadBalancer implements ILoadBalancer {
        private $connectionAttempted = false;
        /** @var int */
        private $maxLag = self::MAX_LAG_DEFAULT;
+       /** @var string Stage of the current transaction round in the transaction round life-cycle */
+       private $trxRoundStage = self::ROUND_CURSORY;
 
        /** @var int Warn when this many connection are held */
        const CONN_HELD_WARN_THRESHOLD = 10;
@@ -139,6 +141,19 @@ class LoadBalancer implements ILoadBalancer {
        const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
        const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
 
+       /** @var string Transaction round, explicit or implicit, has not finished writing */
+       const ROUND_CURSORY = 'cursory';
+       /** @var string Transaction round writes are complete and ready for pre-commit checks */
+       const ROUND_FINALIZED = 'finalized';
+       /** @var string Transaction round passed final pre-commit checks */
+       const ROUND_APPROVED = 'approved';
+       /** @var string Transaction round was committed and post-commit callbacks must be run */
+       const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
+       /** @var string Transaction round was rolled back and post-rollback callbacks must be run */
+       const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
+       /** @var string Transaction round encountered an error */
+       const ROUND_ERROR = 'error';
+
        public function __construct( array $params ) {
                if ( !isset( $params['servers'] ) ) {
                        throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
@@ -1242,44 +1257,37 @@ class LoadBalancer implements ILoadBalancer {
        }
 
        public function commitAll( $fname = __METHOD__ ) {
-               $failures = [];
-
-               $restore = ( $this->trxRoundId !== false );
-               $this->trxRoundId = false;
-               $this->forEachOpenConnection(
-                       function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
-                               try {
-                                       $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
-                               } catch ( DBError $e ) {
-                                       call_user_func( $this->errorLogger, $e );
-                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
-                               }
-                               if ( $restore && $conn->getLBInfo( 'master' ) ) {
-                                       $this->undoTransactionRoundFlags( $conn );
-                               }
-                       }
-               );
-
-               if ( $failures ) {
-                       throw new DBExpectedError(
-                               null,
-                               "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
-                       );
-               }
+               $this->commitMasterChanges( $fname );
+               $this->flushMasterSnapshots( $fname );
+               $this->flushReplicaSnapshots( $fname );
        }
 
        public function finalizeMasterChanges() {
+               $this->assertTransactionRoundStage( self::ROUND_CURSORY );
+
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+               // Loop until callbacks stop adding callbacks on other connections
+               do {
+                       $count = 0; // callbacks execution attempts
+                       $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$count ) {
+                               // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
+                               // Any error should cause all (peer) transactions to be rolled back together.
+                               $count += $conn->runOnTransactionPreCommitCallbacks();
+                       } );
+               } while ( $count > 0 );
+               // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
                $this->forEachOpenMasterConnection( function ( Database $conn ) {
-                       // Any error should cause all DB transactions to be rolled back together
-                       $conn->setTrxEndCallbackSuppression( false );
-                       $conn->runOnTransactionPreCommitCallbacks();
-                       // Defer post-commit callbacks until COMMIT finishes for all DBs
                        $conn->setTrxEndCallbackSuppression( true );
                } );
+               $this->trxRoundStage = self::ROUND_FINALIZED;
        }
 
        public function approveMasterChanges( array $options ) {
+               $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
+
                $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
+
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
                $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
                        // If atomic sections or explicit transactions are still open, some caller must have
                        // caught an exception but failed to properly rollback any changes. Detect that and
@@ -1309,6 +1317,7 @@ class LoadBalancer implements ILoadBalancer {
                                );
                        }
                } );
+               $this->trxRoundStage = self::ROUND_APPROVED;
        }
 
        public function beginMasterChanges( $fname = __METHOD__ ) {
@@ -1318,32 +1327,26 @@ class LoadBalancer implements ILoadBalancer {
                                "$fname: Transaction round '{$this->trxRoundId}' already started."
                        );
                }
-               $this->trxRoundId = $fname;
+               $this->assertTransactionRoundStage( self::ROUND_CURSORY );
 
-               $failures = [];
-               $this->forEachOpenMasterConnection(
-                       function ( Database $conn ) use ( $fname, &$failures ) {
-                               $conn->setTrxEndCallbackSuppression( true );
-                               try {
-                                       $conn->flushSnapshot( $fname );
-                               } catch ( DBError $e ) {
-                                       call_user_func( $this->errorLogger, $e );
-                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
-                               }
-                               $conn->setTrxEndCallbackSuppression( false );
-                               $this->applyTransactionRoundFlags( $conn );
-                       }
-               );
+               // Clear any empty transactions (no writes/callbacks) from the implicit round
+               $this->flushMasterSnapshots( $fname );
 
-               if ( $failures ) {
-                       throw new DBExpectedError(
-                               null,
-                               "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
-                       );
-               }
+               $this->trxRoundId = $fname;
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+               // Mark applicable handles as participating in this explicit transaction round.
+               // For each of these handles, any writes and callbacks will be tied to a single
+               // transaction. The (peer) handles will reject begin()/commit() calls unless they
+               // are part of an en masse commit or an en masse rollback.
+               $this->forEachOpenMasterConnection( function ( Database $conn ) {
+                       $this->applyTransactionRoundFlags( $conn );
+               } );
+               $this->trxRoundStage = self::ROUND_CURSORY;
        }
 
        public function commitMasterChanges( $fname = __METHOD__ ) {
+               $this->assertTransactionRoundStage( self::ROUND_APPROVED );
+
                $failures = [];
 
                /** @noinspection PhpUnusedLocalVariableInspection */
@@ -1351,62 +1354,117 @@ class LoadBalancer implements ILoadBalancer {
 
                $restore = ( $this->trxRoundId !== false );
                $this->trxRoundId = false;
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+               // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
+               // Note that callbacks should already be suppressed due to finalizeMasterChanges().
                $this->forEachOpenMasterConnection(
-                       function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
+                       function ( IDatabase $conn ) use ( $fname, &$failures ) {
                                try {
-                                       if ( $conn->writesOrCallbacksPending() ) {
-                                               $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
-                                       } elseif ( $restore ) {
-                                               $conn->flushSnapshot( $fname );
-                                       }
+                                       $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
                                } catch ( DBError $e ) {
                                        call_user_func( $this->errorLogger, $e );
                                        $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
                                }
-                               if ( $restore ) {
-                                       $this->undoTransactionRoundFlags( $conn );
-                               }
                        }
                );
-
                if ( $failures ) {
-                       throw new DBExpectedError(
+                       throw new DBTransactionError(
                                null,
                                "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
                        );
                }
+               if ( $restore ) {
+                       // Unmark handles as participating in this explicit transaction round
+                       $this->forEachOpenMasterConnection( function ( Database $conn ) {
+                               $this->undoTransactionRoundFlags( $conn );
+                       } );
+               }
+               $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
        }
 
-       public function runMasterPostTrxCallbacks( $type ) {
+       public function runMasterTransactionIdleCallbacks() {
+               if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
+                       $type = IDatabase::TRIGGER_COMMIT;
+               } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
+                       $type = IDatabase::TRIGGER_ROLLBACK;
+               } else {
+                       throw new DBTransactionError(
+                               null,
+                               "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
+                       );
+               }
+
+               $oldStage = $this->trxRoundStage;
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+
+               // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
+               $this->forEachOpenMasterConnection( function ( Database $conn ) {
+                       $conn->setTrxEndCallbackSuppression( false );
+               } );
+
                $e = null; // first exception
+               // Loop until callbacks stop adding callbacks on other connections
+               do {
+                       // Run any pending callbacks for each connection...
+                       $count = 0; // callback execution attempts
+                       $this->forEachOpenMasterConnection(
+                               function ( Database $conn ) use ( $type, &$e, &$count ) {
+                                       if ( $conn->trxLevel() ) {
+                                               return; // retry in the next iteration, after commit() is called
+                                       }
+                                       try {
+                                               $count += $conn->runOnTransactionIdleCallbacks( $type );
+                                       } catch ( Exception $ex ) {
+                                               $e = $e ?: $ex;
+                                       }
+                               }
+                       );
+                       // Clear out any active transactions left over from callbacks...
+                       $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$e ) {
+                               if ( $conn->writesPending() ) {
+                                       // A callback from another handle wrote to this one and DBO_TRX is set
+                                       $this->queryLogger->warning( __METHOD__ . ": found writes pending." );
+                               } elseif ( $conn->trxLevel() ) {
+                                       // A callback from another handle read from this one and DBO_TRX is set,
+                                       // which can easily happen if there is only one DB (no replicas)
+                                       $this->queryLogger->debug( __METHOD__ . ": found empty transaction." );
+                               }
+                               try {
+                                       $conn->commit( __METHOD__, $conn::FLUSHING_ALL_PEERS );
+                               } catch ( Exception $ex ) {
+                                       $e = $e ?: $ex;
+                               }
+                       } );
+               } while ( $count > 0 );
+
+               $this->trxRoundStage = $oldStage;
+
+               return $e;
+       }
+
+       public function runMasterTransactionListenerCallbacks() {
+               if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
+                       $type = IDatabase::TRIGGER_COMMIT;
+               } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
+                       $type = IDatabase::TRIGGER_ROLLBACK;
+               } else {
+                       throw new DBTransactionError(
+                               null,
+                               "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
+                       );
+               }
+
+               $e = null;
+
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
                $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
-                       $conn->setTrxEndCallbackSuppression( false );
-                       // Callbacks run in AUTO-COMMIT mode, so make sure no transactions are pending...
-                       if ( $conn->writesPending() ) {
-                               // This happens if onTransactionIdle() callbacks write to *other* handles
-                               // (which already finished their callbacks). Let any callbacks run in the final
-                               // commitMasterChanges() in LBFactory::shutdown(), when the transaction is gone.
-                               $this->queryLogger->warning( __METHOD__ . ": found writes pending." );
-                               return;
-                       } elseif ( $conn->trxLevel() ) {
-                               // This happens for single-DB setups where DB_REPLICA uses the master DB,
-                               // thus leaving an implicit read-only transaction open at this point. It
-                               // also happens if onTransactionIdle() callbacks leave implicit transactions
-                               // open on *other* DBs (which is slightly improper). Let these COMMIT on the
-                               // next call to commitMasterChanges(), possibly in LBFactory::shutdown().
-                               return;
-                       }
-                       try {
-                               $conn->runOnTransactionIdleCallbacks( $type );
-                       } catch ( Exception $ex ) {
-                               $e = $e ?: $ex;
-                       }
                        try {
                                $conn->runTransactionListenerCallbacks( $type );
                        } catch ( Exception $ex ) {
                                $e = $e ?: $ex;
                        }
                } );
+               $this->trxRoundStage = self::ROUND_CURSORY;
 
                return $e;
        }
@@ -1414,20 +1472,29 @@ class LoadBalancer implements ILoadBalancer {
        public function rollbackMasterChanges( $fname = __METHOD__ ) {
                $restore = ( $this->trxRoundId !== false );
                $this->trxRoundId = false;
-               $this->forEachOpenMasterConnection(
-                       function ( IDatabase $conn ) use ( $fname, $restore ) {
-                               $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
-                               if ( $restore ) {
-                                       $this->undoTransactionRoundFlags( $conn );
-                               }
-                       }
-               );
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+               $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
+                       $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
+               } );
+               if ( $restore ) {
+                       // Unmark handles as participating in this explicit transaction round
+                       $this->forEachOpenMasterConnection( function ( Database $conn ) {
+                               $this->undoTransactionRoundFlags( $conn );
+                       } );
+               }
+               $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
        }
 
-       public function suppressTransactionEndCallbacks() {
-               $this->forEachOpenMasterConnection( function ( Database $conn ) {
-                       $conn->setTrxEndCallbackSuppression( true );
-               } );
+       /**
+        * @param string $stage
+        */
+       private function assertTransactionRoundStage( $stage ) {
+               if ( $this->trxRoundStage !== $stage ) {
+                       throw new DBTransactionError(
+                               null,
+                               "Transaction round stage must be '$stage' (not '{$this->trxRoundStage}')"
+                       );
+               }
        }
 
        /**
@@ -1437,9 +1504,9 @@ class LoadBalancer implements ILoadBalancer {
         * transaction rounds and remain in auto-commit mode. Such behavior might be desired
         * when a DB server is used for something like simple key/value storage.
         *
-        * @param IDatabase $conn
+        * @param Database $conn
         */
-       private function applyTransactionRoundFlags( IDatabase $conn ) {
+       private function applyTransactionRoundFlags( Database $conn ) {
                if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
                        return; // transaction rounds do not apply to these connections
                }
@@ -1456,9 +1523,9 @@ class LoadBalancer implements ILoadBalancer {
        }
 
        /**
-        * @param IDatabase $conn
+        * @param Database $conn
         */
-       private function undoTransactionRoundFlags( IDatabase $conn ) {
+       private function undoTransactionRoundFlags( Database $conn ) {
                if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
                        return; // transaction rounds do not apply to these connections
                }
@@ -1473,11 +1540,25 @@ class LoadBalancer implements ILoadBalancer {
        }
 
        public function flushReplicaSnapshots( $fname = __METHOD__ ) {
-               $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) {
-                       $conn->flushSnapshot( __METHOD__ );
+               $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) use ( $fname ) {
+                       $conn->flushSnapshot( $fname );
                } );
        }
 
+       private function flushMasterSnapshots( $fname = __METHOD__ ) {
+               $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
+                       $conn->flushSnapshot( $fname );
+               } );
+       }
+
+       /**
+        * @return string
+        * @since 1.32
+        */
+       public function getTransactionRoundStage() {
+               return $this->trxRoundStage;
+       }
+
        public function hasMasterConnection() {
                return $this->isOpen( $this->getWriterIndex() );
        }
index e054569..7462f1d 100644 (file)
@@ -302,4 +302,91 @@ class LoadBalancerTest extends MediaWikiTestCase {
 
                $lb->closeAll();
        }
+
+       public function testTransactionCallbackChains() {
+               global $wgDBserver, $wgDBname, $wgDBuser, $wgDBpassword, $wgDBtype, $wgSQLiteDataDir;
+
+               $servers = [
+                       [
+                               'host' => $wgDBserver,
+                               'dbname' => $wgDBname,
+                               'tablePrefix' => $this->dbPrefix(),
+                               'user' => $wgDBuser,
+                               'password' => $wgDBpassword,
+                               'type' => $wgDBtype,
+                               'dbDirectory' => $wgSQLiteDataDir,
+                               'load' => 0,
+                               'flags' => DBO_TRX // REPEATABLE-READ for consistency
+                       ],
+               ];
+
+               $lb = new LoadBalancer( [
+                       'servers' => $servers,
+                       'localDomain' => new DatabaseDomain( $wgDBname, null, $this->dbPrefix() )
+               ] );
+
+               $conn1 = $lb->openConnection( $lb->getWriterIndex(), false );
+               $conn2 = $lb->openConnection( $lb->getWriterIndex(), '' );
+
+               $count = 0;
+               $lb->forEachOpenMasterConnection( function () use ( &$count ) {
+                       ++$count;
+               } );
+               $this->assertEquals( 2, $count, 'Connection handle count' );
+
+               $tlCalls = 0;
+               $lb->setTransactionListener( 'test-listener', function () use ( &$tlCalls ) {
+                       ++$tlCalls;
+               } );
+
+               $lb->beginMasterChanges( __METHOD__ );
+               $bc = array_fill_keys( [ 'a', 'b', 'c', 'd' ], 0 );
+               $conn1->onTransactionPreCommitOrIdle( function () use ( &$bc, $conn1, $conn2 ) {
+                       $bc['a'] = 1;
+                       $conn2->onTransactionPreCommitOrIdle( function () use ( &$bc, $conn1, $conn2 ) {
+                               $bc['b'] = 1;
+                               $conn1->onTransactionPreCommitOrIdle( function () use ( &$bc, $conn1, $conn2 ) {
+                                       $bc['c'] = 1;
+                                       $conn1->onTransactionPreCommitOrIdle( function () use ( &$bc, $conn1, $conn2 ) {
+                                               $bc['d'] = 1;
+                                       } );
+                               } );
+                       } );
+               } );
+               $lb->finalizeMasterChanges();
+               $lb->approveMasterChanges( [] );
+               $lb->commitMasterChanges( __METHOD__ );
+               $lb->runMasterTransactionIdleCallbacks();
+               $lb->runMasterTransactionListenerCallbacks();
+
+               $this->assertEquals( array_fill_keys( [ 'a', 'b', 'c', 'd' ], 1 ), $bc );
+               $this->assertEquals( 2, $tlCalls );
+
+               $tlCalls = 0;
+               $lb->beginMasterChanges( __METHOD__ );
+               $ac = array_fill_keys( [ 'a', 'b', 'c', 'd' ], 0 );
+               $conn1->onTransactionIdle( function () use ( &$ac, $conn1, $conn2 ) {
+                       $ac['a'] = 1;
+                       $conn2->onTransactionIdle( function () use ( &$ac, $conn1, $conn2 ) {
+                               $ac['b'] = 1;
+                               $conn1->onTransactionIdle( function () use ( &$ac, $conn1, $conn2 ) {
+                                       $ac['c'] = 1;
+                                       $conn1->onTransactionIdle( function () use ( &$ac, $conn1, $conn2 ) {
+                                               $ac['d'] = 1;
+                                       } );
+                               } );
+                       } );
+               } );
+               $lb->finalizeMasterChanges();
+               $lb->approveMasterChanges( [] );
+               $lb->commitMasterChanges( __METHOD__ );
+               $lb->runMasterTransactionIdleCallbacks();
+               $lb->runMasterTransactionListenerCallbacks();
+
+               $this->assertEquals( array_fill_keys( [ 'a', 'b', 'c', 'd' ], 1 ), $ac );
+               $this->assertEquals( 2, $tlCalls );
+
+               $conn1->close();
+               $conn2->close();
+       }
 }
index 3335a2b..f08b376 100644 (file)
@@ -9,6 +9,7 @@ use Wikimedia\TestingAccessWrapper;
 use Wikimedia\Rdbms\DatabaseSqlite;
 use Wikimedia\Rdbms\DatabasePostgres;
 use Wikimedia\Rdbms\DatabaseMssql;
+use Wikimedia\Rdbms\DBUnexpectedError;
 
 class DatabaseTest extends PHPUnit\Framework\TestCase {
 
@@ -367,7 +368,7 @@ class DatabaseTest extends PHPUnit\Framework\TestCase {
                        $called = true;
                        $db->setFlag( DBO_TRX );
                } );
-               $db->rollback( __METHOD__, IDatabase::FLUSHING_ALL_PEERS );
+               $db->rollback( __METHOD__ );
                $this->assertFalse( $db->getFlag( DBO_TRX ), 'DBO_TRX restored to default' );
                $this->assertTrue( $called, 'Callback reached' );
        }
@@ -489,37 +490,56 @@ class DatabaseTest extends PHPUnit\Framework\TestCase {
                $this->assertEquals( true, $db->lockIsFree( 'x', __METHOD__ ) );
                $db->clearFlag( DBO_TRX );
 
+               // Pending writes with DBO_TRX
                $this->assertEquals( 0, $db->trxLevel() );
-
+               $this->assertTrue( $db->lockIsFree( 'meow', __METHOD__ ) );
                $db->setFlag( DBO_TRX );
+               $db->query( "DELETE FROM test WHERE t = 1" ); // trigger DBO_TRX transaction before lock
                try {
-                       $this->badLockingMethodImplicit( $db );
-               } catch ( RunTimeException $e ) {
-                       $this->assertTrue( $db->trxLevel() > 0, "Transaction not committed." );
+                       $lock = $db->getScopedLockAndFlush( 'meow', __METHOD__, 1 );
+                       $this->fail( "Exception not reached" );
+               } catch ( DBUnexpectedError $e ) {
+                       $this->assertEquals( 1, $db->trxLevel(), "Transaction not committed." );
+                       $this->assertTrue( $db->lockIsFree( 'meow', __METHOD__ ), 'Lock not acquired' );
                }
-               $db->clearFlag( DBO_TRX );
                $db->rollback( __METHOD__, IDatabase::FLUSHING_ALL_PEERS );
-               $this->assertTrue( $db->lockIsFree( 'meow', __METHOD__ ) );
-
+               // Pending writes without DBO_TRX
+               $db->clearFlag( DBO_TRX );
+               $this->assertEquals( 0, $db->trxLevel() );
+               $this->assertTrue( $db->lockIsFree( 'meow2', __METHOD__ ) );
+               $db->begin( __METHOD__ );
+               $db->query( "DELETE FROM test WHERE t = 1" ); // trigger DBO_TRX transaction before lock
                try {
-                       $this->badLockingMethodExplicit( $db );
-               } catch ( RunTimeException $e ) {
-                       $this->assertTrue( $db->trxLevel() > 0, "Transaction not committed." );
+                       $lock = $db->getScopedLockAndFlush( 'meow2', __METHOD__, 1 );
+                       $this->fail( "Exception not reached" );
+               } catch ( DBUnexpectedError $e ) {
+                       $this->assertEquals( 1, $db->trxLevel(), "Transaction not committed." );
+                       $this->assertTrue( $db->lockIsFree( 'meow2', __METHOD__ ), 'Lock not acquired' );
                }
+               $db->rollback( __METHOD__ );
+               // No pending writes, with DBO_TRX
+               $db->setFlag( DBO_TRX );
+               $this->assertEquals( 0, $db->trxLevel() );
+               $this->assertTrue( $db->lockIsFree( 'wuff', __METHOD__ ) );
+               $db->query( "SELECT 1", __METHOD__ );
+               $this->assertEquals( 1, $db->trxLevel() );
+               $lock = $db->getScopedLockAndFlush( 'wuff', __METHOD__, 1 );
+               $this->assertEquals( 0, $db->trxLevel() );
+               $this->assertFalse( $db->lockIsFree( 'wuff', __METHOD__ ), 'Lock already acquired' );
                $db->rollback( __METHOD__, IDatabase::FLUSHING_ALL_PEERS );
-               $this->assertTrue( $db->lockIsFree( 'meow', __METHOD__ ) );
-       }
-
-       private function badLockingMethodImplicit( IDatabase $db ) {
-               $lock = $db->getScopedLockAndFlush( 'meow', __METHOD__, 1 );
-               $db->query( "SELECT 1" ); // trigger DBO_TRX
-               throw new RunTimeException( "Uh oh!" );
-       }
-
-       private function badLockingMethodExplicit( IDatabase $db ) {
-               $lock = $db->getScopedLockAndFlush( 'meow', __METHOD__, 1 );
+               // No pending writes, without DBO_TRX
+               $db->clearFlag( DBO_TRX );
+               $this->assertEquals( 0, $db->trxLevel() );
+               $this->assertTrue( $db->lockIsFree( 'wuff2', __METHOD__ ) );
                $db->begin( __METHOD__ );
-               throw new RunTimeException( "Uh oh!" );
+               try {
+                       $lock = $db->getScopedLockAndFlush( 'wuff2', __METHOD__, 1 );
+                       $this->fail( "Exception not reached" );
+               } catch ( DBUnexpectedError $e ) {
+                       $this->assertEquals( 1, $db->trxLevel(), "Transaction not committed." );
+                       $this->assertFalse( $db->lockIsFree( 'wuff2', __METHOD__ ), 'Lock not acquired' );
+               }
+               $db->rollback( __METHOD__ );
        }
 
        /**