Improve beginMasterChanges and make methods for DeferredUpdates
authorAaron Schulz <aschulz@wikimedia.org>
Fri, 26 Aug 2016 07:19:34 +0000 (00:19 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Wed, 31 Aug 2016 23:34:39 +0000 (16:34 -0700)
* Fixed bug in LBFactory::beginMasterChanges where
  untracked DBs that then get written to where not in the
  transaction round. The call to onTransactionResolution()
  right after commit() was also problematic, so a flag was
  added to handle that.
* Add LBFactory::setWaitForReplicationListener(), which
  fires at what is often a good point to run deferred updates.
* Add DatabaseBase::clearSnapshot() for committing no-op
  transactions.
* Add IDatabase::setTransactionListener() for persistent
  callbacks. Monitoring commits can be used to find a point
  where deferred updates can run.
* Follow-up commit will address DeferredUpdates.

Change-Id: I4589e6f3ae40b1c362601416db48857fb89840bf

includes/db/DBConnRef.php
includes/db/Database.php
includes/db/IDatabase.php
includes/db/loadbalancer/LBFactory.php
includes/db/loadbalancer/LBFactoryMulti.php
includes/db/loadbalancer/LBFactorySimple.php
includes/db/loadbalancer/LoadBalancer.php
tests/phpunit/includes/db/DatabaseTest.php

index 790a073..ee0a75a 100644 (file)
@@ -441,6 +441,10 @@ class DBConnRef implements IDatabase {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
 
+       public function setTransactionListener( $name, callable $callback = null ) {
+               return $this->__call( __FUNCTION__, func_get_args() );
+       }
+
        public function startAtomic( $fname = __METHOD__ ) {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
index e07836b..26d0f5f 100644 (file)
@@ -69,8 +69,10 @@ abstract class DatabaseBase implements IDatabase {
        protected $mTrxPreCommitCallbacks = [];
        /** @var array[] List of (callable, method name) */
        protected $mTrxEndCallbacks = [];
-       /** @var bool Whether to suppress triggering of post-commit callbacks */
-       protected $suppressPostCommitCallbacks = false;
+       /** @var array[] Map of (name => (callable, method name)) */
+       protected $mTrxRecurringCallbacks = [];
+       /** @var bool Whether to suppress triggering of transaction end callbacks */
+       protected $mTrxEndCallbacksSuppressed = false;
 
        /** @var string */
        protected $mTablePrefix;
@@ -993,6 +995,7 @@ abstract class DatabaseBase implements IDatabase {
                try {
                        // Handle callbacks in mTrxEndCallbacks
                        $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+                       $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
                        return null;
                } catch ( Exception $e ) {
                        // Already logged; move on...
@@ -2572,16 +2575,24 @@ abstract class DatabaseBase implements IDatabase {
                }
        }
 
+       final public function setTransactionListener( $name, callable $callback = null ) {
+               if ( $callback ) {
+                       $this->mTrxRecurringCallbacks[$name] = [ $callback, wfGetCaller() ];
+               } else {
+                       unset( $this->mTrxRecurringCallbacks[$name] );
+               }
+       }
+
        /**
-        * Whether to disable running of post-commit callbacks
+        * Whether to disable running of post-COMMIT/ROLLBACK callbacks
         *
         * This method should not be used outside of Database/LoadBalancer
         *
         * @param bool $suppress
         * @since 1.28
         */
-       final public function setPostCommitCallbackSupression( $suppress ) {
-               $this->suppressPostCommitCallbacks = $suppress;
+       final public function setTrxEndCallbackSuppression( $suppress ) {
+               $this->mTrxEndCallbacksSuppressed = $suppress;
        }
 
        /**
@@ -2594,7 +2605,7 @@ abstract class DatabaseBase implements IDatabase {
         * @throws Exception
         */
        public function runOnTransactionIdleCallbacks( $trigger ) {
-               if ( $this->suppressPostCommitCallbacks ) {
+               if ( $this->mTrxEndCallbacksSuppressed ) {
                        return;
                }
 
@@ -2664,6 +2675,38 @@ abstract class DatabaseBase implements IDatabase {
                }
        }
 
+       /**
+        * Actually run any "transaction listener" callbacks.
+        *
+        * This method should not be used outside of Database/LoadBalancer
+        *
+        * @param integer $trigger IDatabase::TRIGGER_* constant
+        * @throws Exception
+        * @since 1.20
+        */
+       public function runTransactionListenerCallbacks( $trigger ) {
+               if ( $this->mTrxEndCallbacksSuppressed ) {
+                       return;
+               }
+
+               /** @var Exception $e */
+               $e = null; // first exception
+
+               foreach ( $this->mTrxRecurringCallbacks as $callback ) {
+                       try {
+                               list( $phpCallback ) = $callback;
+                               $phpCallback( $trigger, $this );
+                       } catch ( Exception $ex ) {
+                               MWExceptionHandler::logException( $ex );
+                               $e = $e ?: $ex;
+                       }
+               }
+
+               if ( $e instanceof Exception ) {
+                       throw $e; // re-throw any first exception
+               }
+       }
+
        final public function startAtomic( $fname = __METHOD__ ) {
                if ( !$this->mTrxLevel ) {
                        $this->begin( $fname, self::TRANSACTION_INTERNAL );
@@ -2802,6 +2845,7 @@ abstract class DatabaseBase implements IDatabase {
                }
 
                $this->runOnTransactionIdleCallbacks( self::TRIGGER_COMMIT );
+               $this->runTransactionListenerCallbacks( self::TRIGGER_COMMIT );
        }
 
        /**
@@ -2847,6 +2891,7 @@ abstract class DatabaseBase implements IDatabase {
                $this->mTrxIdleCallbacks = []; // clear
                $this->mTrxPreCommitCallbacks = []; // clear
                $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+               $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
        }
 
        /**
@@ -2864,6 +2909,18 @@ abstract class DatabaseBase implements IDatabase {
                }
        }
 
+       public function clearSnapshot( $fname = __METHOD__ ) {
+               if ( $this->writesOrCallbacksPending() || $this->explicitTrxActive() ) {
+                       // This only flushes transactions to clear snapshots, not to write data
+                       throw new DBUnexpectedError(
+                               $this,
+                               "$fname: Cannot COMMIT to clear snapshot because writes are pending."
+                       );
+               }
+
+               $this->commit( $fname, self::FLUSHING_INTERNAL );
+       }
+
        public function explicitTrxActive() {
                return $this->mTrxLevel && ( $this->mTrxAtomicLevels || !$this->mTrxAutomatic );
        }
index 1aa931e..4ad23dd 100644 (file)
@@ -35,9 +35,9 @@
 interface IDatabase {
        /** @var int Callback triggered immediately due to no active transaction */
        const TRIGGER_IDLE = 1;
-       /** @var int Callback triggered by commit */
+       /** @var int Callback triggered by COMMIT */
        const TRIGGER_COMMIT = 2;
-       /** @var int Callback triggered by rollback */
+       /** @var int Callback triggered by ROLLBACK */
        const TRIGGER_ROLLBACK = 3;
 
        /** @var string Transaction is requested by regular caller outside of the DB layer */
@@ -200,6 +200,7 @@ interface IDatabase {
        /**
         * Returns true if there is a transaction open with possible write
         * queries or transaction pre-commit/idle callbacks waiting on it to finish.
+        * This does *not* count recurring callbacks, e.g. from setTransactionListener().
         *
         * @return bool
         */
@@ -1267,7 +1268,7 @@ interface IDatabase {
         * This is useful for combining cooperative locks and DB transactions.
         *
         * The callback takes one argument:
-        * How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_ROLLBACK)
+        *   - How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_ROLLBACK)
         *
         * @param callable $callback
         * @return mixed
@@ -1289,7 +1290,7 @@ interface IDatabase {
         * Updates will execute in the order they were enqueued.
         *
         * The callback takes one argument:
-        * How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_IDLE)
+        *   - How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_IDLE)
         *
         * @param callable $callback
         * @since 1.20
@@ -1312,6 +1313,23 @@ interface IDatabase {
         */
        public function onTransactionPreCommitOrIdle( callable $callback );
 
+       /**
+        * Run a callback each time any transaction commits or rolls back
+        *
+        * The callback takes two arguments:
+        *   - IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_ROLLBACK
+        *   - This IDatabase object
+        * Callbacks must commit any transactions that they begin.
+        *
+        * Registering a callback here will not affect writesOrCallbacks() pending
+        *
+        * @param string $name Callback name
+        * @param callable|null $callback Use null to unset a listener
+        * @return mixed
+        * @since 1.28
+        */
+       public function setTransactionListener( $name, callable $callback = null );
+
        /**
         * Begin an atomic section of statements
         *
index f560cf1..7038eee 100644 (file)
@@ -44,8 +44,12 @@ abstract class LBFactory implements DestructibleService {
 
        /** @var mixed */
        protected $ticket;
+       /** @var string|bool String if a requested DBO_TRX transaction round is active */
+       protected $trxRoundId = false;
        /** @var string|bool Reason all LBs are read-only or false if not */
        protected $readOnlyReason = false;
+       /** @var callable[] */
+       protected $replicationWaitCallbacks = [];
 
        const SHUTDOWN_NO_CHRONPROT = 1; // don't save ChronologyProtector positions (for async code)
 
@@ -226,9 +230,18 @@ abstract class LBFactory implements DestructibleService {
         * This allows for custom transaction rounds from any outer transaction scope.
         *
         * @param string $fname
+        * @throws DBTransactionError
         * @since 1.28
         */
        public function beginMasterChanges( $fname = __METHOD__ ) {
+               if ( $this->trxRoundId !== false ) {
+                       throw new DBTransactionError(
+                               null,
+                               "Transaction round '{$this->trxRoundId}' already started."
+                       );
+               }
+               $this->trxRoundId = $fname;
+               // Set DBO_TRX flags on all appropriate DBs
                $this->forEachLBCallMethod( 'beginMasterChanges', [ $fname ] );
        }
 
@@ -253,19 +266,20 @@ abstract class LBFactory implements DestructibleService {
         * @throws Exception
         */
        public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) {
-               // Perform all pre-commit callbacks, aborting on failure
-               $this->forEachLBCallMethod( 'runMasterPreCommitCallbacks' );
-               // Perform all pre-commit checks, aborting on failure
+               // Run pre-commit callbacks and suppress post-commit callbacks, aborting on failure
+               $this->forEachLBCallMethod( 'finalizeMasterChanges' );
+               $this->trxRoundId = false;
+               // Perform pre-commit checks, aborting on failure
                $this->forEachLBCallMethod( 'approveMasterChanges', [ $options ] );
                // Log the DBs and methods involved in multi-DB transactions
                $this->logIfMultiDbTransaction();
-               // Actually perform the commit on all master DB connections
+               // Actually perform the commit on all master DB connections and revert DBO_TRX
                $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
                // Run all post-commit callbacks
                /** @var Exception $e */
                $e = null; // first callback exception
                $this->forEachLB( function ( LoadBalancer $lb ) use ( &$e ) {
-                       $ex = $lb->runMasterPostCommitCallbacks();
+                       $ex = $lb->runMasterPostTrxCallbacks( IDatabase::TRIGGER_COMMIT );
                        $e = $e ?: $ex;
                } );
                // Commit any dangling DBO_TRX transactions from callbacks on one DB to another DB
@@ -282,7 +296,13 @@ abstract class LBFactory implements DestructibleService {
         * @since 1.23
         */
        public function rollbackMasterChanges( $fname = __METHOD__ ) {
+               $this->trxRoundId = false;
+               $this->forEachLBCallMethod( 'suppressTransactionEndCallbacks' );
                $this->forEachLBCallMethod( 'rollbackMasterChanges', [ $fname ] );
+               // Run all post-rollback callbacks
+               $this->forEachLB( function ( LoadBalancer $lb ) {
+                       $lb->runMasterPostTrxCallbacks( IDatabase::TRIGGER_ROLLBACK );
+               } );
        }
 
        /**
@@ -381,6 +401,10 @@ abstract class LBFactory implements DestructibleService {
                        'ifWritesSince' => null
                ];
 
+               foreach ( $this->replicationWaitCallbacks as $callback ) {
+                       $callback();
+               }
+
                // Figure out which clusters need to be checked
                /** @var LoadBalancer[] $lbs */
                $lbs = [];
@@ -433,6 +457,23 @@ abstract class LBFactory implements DestructibleService {
                }
        }
 
+       /**
+        * Add a callback to be run in every call to waitForReplication() before waiting
+        *
+        * Callbacks must clear any transactions that they start
+        *
+        * @param string $name Callback name
+        * @param callable|null $callback Use null to unset a callback
+        * @since 1.28
+        */
+       public function setWaitForReplicationListener( $name, callable $callback = null ) {
+               if ( $callback ) {
+                       $this->replicationWaitCallbacks[$name] = $callback;
+               } else {
+                       unset( $this->replicationWaitCallbacks[$name] );
+               }
+       }
+
        /**
         * Get a token asserting that no transaction writes are active
         *
@@ -527,6 +568,15 @@ abstract class LBFactory implements DestructibleService {
                } );
        }
 
+       /**
+        * @param LoadBalancer $lb
+        */
+       protected function initLoadBalancer( LoadBalancer $lb ) {
+               if ( $this->trxRoundId !== false ) {
+                       $lb->beginMasterChanges( $this->trxRoundId ); // set DBO_TRX
+               }
+       }
+
        /**
         * Close all open database connections on all open load balancers.
         * @since 1.28
index 4b9cccc..523ccf9 100644 (file)
@@ -313,7 +313,7 @@ class LBFactoryMulti extends LBFactory {
         * @return LoadBalancer
         */
        private function newLoadBalancer( $template, $loads, $groupLoads, $readOnlyReason ) {
-               return new LoadBalancer( [
+               $lb = new LoadBalancer( [
                        'servers' => $this->makeServerArray( $template, $loads, $groupLoads ),
                        'loadMonitor' => $this->loadMonitorClass,
                        'readOnlyReason' => $readOnlyReason,
@@ -321,6 +321,10 @@ class LBFactoryMulti extends LBFactory {
                        'srvCache' => $this->srvCache,
                        'wanCache' => $this->wanCache
                ] );
+
+               $this->initLoadBalancer( $lb );
+
+               return $lb;
        }
 
        /**
index 3702c8b..a992276 100644 (file)
@@ -133,7 +133,7 @@ class LBFactorySimple extends LBFactory {
        }
 
        private function newLoadBalancer( array $servers ) {
-               return new LoadBalancer( [
+               $lb = new LoadBalancer( [
                        'servers' => $servers,
                        'loadMonitor' => $this->loadMonitorClass,
                        'readOnlyReason' => $this->readOnlyReason,
@@ -141,6 +141,10 @@ class LBFactorySimple extends LBFactory {
                        'srvCache' => $this->srvCache,
                        'wanCache' => $this->wanCache
                ] );
+
+               $this->initLoadBalancer( $lb );
+
+               return $lb;
        }
 
        /**
index 65cd3b3..2532bca 100644 (file)
@@ -51,6 +51,8 @@ class LoadBalancer {
        private $srvCache;
        /** @var WANObjectCache */
        private $wanCache;
+       /** @var TransactionProfiler */
+       protected $trxProfiler;
 
        /** @var bool|DatabaseBase Database connection that caused a problem */
        private $mErrorConnection;
@@ -68,9 +70,8 @@ class LoadBalancer {
        private $readOnlyReason = false;
        /** @var integer Total connections opened */
        private $connsOpened = 0;
-
-       /** @var TransactionProfiler */
-       protected $trxProfiler;
+       /** @var string|bool String if a requested DBO_TRX transaction round is active */
+       private $trxRoundId = false;
 
        /** @var integer Warn when this many connection are held */
        const CONN_HELD_WARN_THRESHOLD = 10;
@@ -864,6 +865,9 @@ class LoadBalancer {
                        $this->getLazyConnectionRef( DB_MASTER, [], $db->getWikiID() )
                );
                $db->setTransactionProfiler( $this->trxProfiler );
+               if ( $this->trxRoundId !== false ) {
+                       $this->applyTransactionRoundFlags( $db );
+               }
 
                return $db;
        }
@@ -1059,24 +1063,47 @@ class LoadBalancer {
        /**
         * Commit transactions on all open connections
         * @param string $fname Caller name
+        * @throws DBExpectedError
         */
        public function commitAll( $fname = __METHOD__ ) {
-               $this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( $fname ) {
-                       $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
-               } );
+               $failures = [];
+
+               $restore = ( $this->trxRoundId !== false );
+               $this->trxRoundId = false;
+               $this->forEachOpenConnection(
+                       function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
+                               try {
+                                       $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
+                               } catch ( DBError $e ) {
+                                       MWExceptionHandler::logException( $e );
+                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+                               }
+                               if ( $restore && $conn->getLBInfo( 'master' ) ) {
+                                       $this->undoTransactionRoundFlags( $conn );
+                               }
+                       }
+               );
+
+               if ( $failures ) {
+                       throw new DBExpectedError(
+                               null,
+                               "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
+                       );
+               }
        }
 
        /**
         * Perform all pre-commit callbacks that remain part of the atomic transactions
-        * and disable any post-commit callbacks until runMasterPostCommitCallbacks()
+        * and disable any post-commit callbacks until runMasterPostTrxCallbacks()
         * @since 1.28
         */
-       public function runMasterPreCommitCallbacks() {
+       public function finalizeMasterChanges() {
                $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
-                       // Any error will cause all DB transactions to be rolled back together.
+                       // Any error should cause all DB transactions to be rolled back together
+                       $conn->setTrxEndCallbackSuppression( false );
                        $conn->runOnTransactionPreCommitCallbacks();
-                       // Defer post-commit callbacks until COMMIT finishes for all DBs.
-                       $conn->setPostCommitCallbackSupression( true );
+                       // Defer post-commit callbacks until COMMIT finishes for all DBs
+                       $conn->setTrxEndCallbackSuppression( true );
                } );
        }
 
@@ -1129,55 +1156,96 @@ class LoadBalancer {
         * This allows for custom transaction rounds from any outer transaction scope.
         *
         * @param string $fname
+        * @throws DBExpectedError
         * @since 1.28
         */
        public function beginMasterChanges( $fname = __METHOD__ ) {
-               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) {
-                       if ( $conn->writesOrCallbacksPending() ) {
-                               throw new DBTransactionError(
-                                       $conn,
-                                       "Transaction with pending writes still active."
-                               );
-                       } elseif ( $conn->trxLevel() ) {
-                               $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
-                       }
-                       if ( $conn->getFlag( DBO_DEFAULT ) ) {
-                               // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
-                               // Force DBO_TRX even in CLI mode since a commit round is expected soon.
-                               $conn->setFlag( DBO_TRX, $conn::REMEMBER_PRIOR );
-                               $conn->onTransactionResolution( function () use ( $conn ) {
-                                       $conn->restoreFlags( $conn::RESTORE_PRIOR );
-                               } );
-                       } else {
-                               // Config has explicitly requested DBO_TRX be either on or off; respect that.
-                               // This is useful for things like blob stores which use auto-commit mode.
+               if ( $this->trxRoundId !== false ) {
+                       throw new DBTransactionError(
+                               null,
+                               "$fname: Transaction round '{$this->trxRoundId}' already started."
+                       );
+               }
+               $this->trxRoundId = $fname;
+
+               $failures = [];
+               $this->forEachOpenMasterConnection(
+                       function ( DatabaseBase $conn ) use ( $fname, &$failures ) {
+                               $conn->setTrxEndCallbackSuppression( true );
+                               try {
+                                       $conn->clearSnapshot( $fname );
+                               } catch ( DBError $e ) {
+                                       MWExceptionHandler::logException( $e );
+                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+                               }
+                               $conn->setTrxEndCallbackSuppression( false );
+                               $this->applyTransactionRoundFlags( $conn );
                        }
-               } );
+               );
+
+               if ( $failures ) {
+                       throw new DBExpectedError(
+                               null,
+                               "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
+                       );
+               }
        }
 
        /**
         * Issue COMMIT on all master connections where writes where done
         * @param string $fname Caller name
+        * @throws DBExpectedError
         */
        public function commitMasterChanges( $fname = __METHOD__ ) {
-               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) {
-                       if ( $conn->writesOrCallbacksPending() ) {
-                               $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
+               $failures = [];
+
+               $restore = ( $this->trxRoundId !== false );
+               $this->trxRoundId = false;
+               $this->forEachOpenMasterConnection(
+                       function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
+                               try {
+                                       if ( $conn->writesOrCallbacksPending() ) {
+                                               $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
+                                       } elseif ( $restore ) {
+                                               $conn->clearSnapshot( $fname );
+                                       }
+                               } catch ( DBError $e ) {
+                                       MWExceptionHandler::logException( $e );
+                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+                               }
+                               if ( $restore ) {
+                                       $this->undoTransactionRoundFlags( $conn );
+                               }
                        }
-               } );
+               );
+
+               if ( $failures ) {
+                       throw new DBExpectedError(
+                               null,
+                               "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
+                       );
+               }
        }
 
        /**
-        * Issue all pending post-commit callbacks
+        * Issue all pending post-COMMIT/ROLLBACK callbacks
+        * @param integer $type IDatabase::TRIGGER_* constant
         * @return Exception|null The first exception or null if there were none
         * @since 1.28
         */
-       public function runMasterPostCommitCallbacks() {
+       public function runMasterPostTrxCallbacks( $type ) {
                $e = null; // first exception
-               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( &$e ) {
-                       $conn->setPostCommitCallbackSupression( false );
+               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $type, &$e ) {
+                       $conn->clearSnapshot( __METHOD__ ); // clear no-op transactions
+
+                       $conn->setTrxEndCallbackSuppression( false );
+                       try {
+                               $conn->runOnTransactionIdleCallbacks( $type );
+                       } catch ( Exception $ex ) {
+                               $e = $e ?: $ex;
+                       }
                        try {
-                               $conn->runOnTransactionIdleCallbacks( $conn::TRIGGER_COMMIT );
+                               $conn->runTransactionListenerCallbacks( $type );
                        } catch ( Exception $ex ) {
                                $e = $e ?: $ex;
                        }
@@ -1193,29 +1261,51 @@ class LoadBalancer {
         * @since 1.23
         */
        public function rollbackMasterChanges( $fname = __METHOD__ ) {
-               $failedServers = [];
-
-               $masterIndex = $this->getWriterIndex();
-               foreach ( $this->mConns as $conns2 ) {
-                       if ( empty( $conns2[$masterIndex] ) ) {
-                               continue;
-                       }
-                       /** @var DatabaseBase $conn */
-                       foreach ( $conns2[$masterIndex] as $conn ) {
-                               if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
-                                       try {
-                                               $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
-                                       } catch ( DBError $e ) {
-                                               MWExceptionHandler::logException( $e );
-                                               $failedServers[] = $conn->getServer();
-                                       }
+               $restore = ( $this->trxRoundId !== false );
+               $this->trxRoundId = false;
+               $this->forEachOpenMasterConnection(
+                       function ( DatabaseBase $conn ) use ( $fname, $restore ) {
+                               if ( $conn->writesOrCallbacksPending() ) {
+                                       $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
+                               }
+                               if ( $restore ) {
+                                       $this->undoTransactionRoundFlags( $conn );
                                }
                        }
+               );
+       }
+
+       /**
+        * Suppress all pending post-COMMIT/ROLLBACK callbacks
+        * @return Exception|null The first exception or null if there were none
+        * @since 1.28
+        */
+       public function suppressTransactionEndCallbacks() {
+               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
+                       $conn->setTrxEndCallbackSuppression( true );
+               } );
+       }
+
+       /**
+        * @param DatabaseBase $conn
+        */
+       private function applyTransactionRoundFlags( DatabaseBase $conn ) {
+               if ( $conn->getFlag( DBO_DEFAULT ) ) {
+                       // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
+                       // Force DBO_TRX even in CLI mode since a commit round is expected soon.
+                       $conn->setFlag( DBO_TRX, $conn::REMEMBER_PRIOR );
+                       // If config has explicitly requested DBO_TRX be either on or off by not
+                       // setting DBO_DEFAULT, then respect that. Forcing no transactions is useful
+                       // for things like blob stores (ExternalStore) which want auto-commit mode.
                }
+       }
 
-               if ( $failedServers ) {
-                       throw new DBExpectedError( null, "Rollback failed on server(s) " .
-                               implode( ', ', array_unique( $failedServers ) ) );
+       /**
+        * @param DatabaseBase $conn
+        */
+       private function undoTransactionRoundFlags( DatabaseBase $conn ) {
+               if ( $conn->getFlag( DBO_DEFAULT ) ) {
+                       $conn->restoreFlags( $conn::RESTORE_PRIOR );
                }
        }
 
index 0751409..16297ad 100644 (file)
@@ -291,6 +291,56 @@ class DatabaseTest extends MediaWikiTestCase {
                $this->assertTrue( $called, 'Callback reached' );
        }
 
+       /**
+        * @covers DatabaseBase::setTransactionListener()
+        */
+       public function testTransactionListener() {
+               $db = $this->db;
+
+               $db->setTransactionListener( 'ping', function() use ( $db, &$called ) {
+                       $called = true;
+               } );
+
+               $called = false;
+               $db->begin( __METHOD__ );
+               $db->commit( __METHOD__ );
+               $this->assertTrue( $called, 'Callback reached' );
+
+               $called = false;
+               $db->begin( __METHOD__ );
+               $db->commit( __METHOD__ );
+               $this->assertTrue( $called, 'Callback still reached' );
+
+               $called = false;
+               $db->begin( __METHOD__ );
+               $db->rollback( __METHOD__ );
+               $this->assertTrue( $called, 'Callback reached' );
+
+               $db->setTransactionListener( 'ping', null );
+               $called = false;
+               $db->begin( __METHOD__ );
+               $db->commit( __METHOD__ );
+               $this->assertFalse( $called, 'Callback not reached' );
+       }
+
+       /**
+        * @covers DatabaseBase::clearSnapshot()
+        */
+       public function testClearSnapshot() {
+               $db = $this->db;
+
+               $db->clearSnapshot( __METHOD__ ); // ok
+               $db->clearSnapshot( __METHOD__ ); // ok
+
+               $db->setFlag( DBO_TRX, $db::REMEMBER_PRIOR );
+               $db->query( 'SELECT 1', __METHOD__ );
+               $this->assertTrue( (bool)$db->trxLevel(), "Transaction started." );
+               $db->clearSnapshot( __METHOD__ ); // ok
+               $db->restoreFlags( $db::RESTORE_PRIOR );
+
+               $this->assertFalse( (bool)$db->trxLevel(), "Transaction cleared." );
+       }
+
        public function testGetScopedLock() {
                $db = $this->db;