rdbms: make implement IResultWrapper directly instead of via inheritence
[lhc/web/wiklou.git] / includes / libs / rdbms / database / Database.php
index bc8883c..9233e13 100644 (file)
@@ -38,6 +38,7 @@ use InvalidArgumentException;
 use UnexpectedValueException;
 use Exception;
 use RuntimeException;
+use Throwable;
 
 /**
  * Relational database abstraction object
@@ -74,7 +75,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        protected $delimiter = ';';
        /** @var string|bool|null Stashed value of html_errors INI setting */
        protected $htmlErrors;
-       /** @var int */
+       /** @var int Row batch size to use for emulated INSERT SELECT queries */
        protected $nonNativeInsertSelectBatchSize = 10000;
 
        /** @var BagOStuff APC cache */
@@ -93,22 +94,18 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        protected $trxProfiler;
        /** @var DatabaseDomain */
        protected $currentDomain;
+       /** @var object|resource|null Database connection */
+       protected $conn;
+
        /** @var IDatabase|null Lazy handle to the master DB this server replicates from */
        private $lazyMasterHandle;
 
-       /** @var object|resource|null Database connection */
-       protected $conn = null;
-       /** @var bool Whether a connection handle is open (connection itself might be dead) */
-       protected $opened = false;
-
        /** @var array Map of (name => 1) for locks obtained via lock() */
        protected $sessionNamedLocks = [];
        /** @var array Map of (table name => 1) for TEMPORARY tables */
        protected $sessionTempTables = [];
 
-       /** @var int Whether there is an active transaction (1 or 0) */
-       protected $trxLevel = 0;
-       /** @var string Hexidecimal string if a transaction is active or empty string otherwise */
+       /** @var string ID of the active transaction or the empty string otherwise */
        protected $trxShortId = '';
        /** @var int Transaction status */
        protected $trxStatus = self::STATUS_TRX_NONE;
@@ -150,6 +147,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        private $trxPreCommitCallbacks = [];
        /** @var array[] List of (callable, method name, atomic section id) */
        private $trxEndCallbacks = [];
+       /** @var array[] List of (callable, method name, atomic section id) */
+       private $trxSectionCancelCallbacks = [];
        /** @var callable[] Map of (name => callable) */
        private $trxRecurringCallbacks = [];
        /** @var bool Whether to suppress triggering of transaction end callbacks */
@@ -308,7 +307,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         * @param string $dbName Database name
         * @param string|null $schema Database schema name
         * @param string $tablePrefix Table prefix
-        * @return bool
         * @throws DBConnectionError
         */
        abstract protected function open( $server, $user, $password, $dbName, $schema, $tablePrefix );
@@ -512,12 +510,12 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                return $res;
        }
 
-       public function trxLevel() {
-               return $this->trxLevel;
+       final public function trxLevel() {
+               return ( $this->trxShortId != '' ) ? 1 : 0;
        }
 
        public function trxTimestamp() {
-               return $this->trxLevel ? $this->trxTimestamp : null;
+               return $this->trxLevel() ? $this->trxTimestamp : null;
        }
 
        /**
@@ -620,20 +618,21 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        public function writesPending() {
-               return $this->trxLevel && $this->trxDoneWrites;
+               return $this->trxLevel() && $this->trxDoneWrites;
        }
 
        public function writesOrCallbacksPending() {
-               return $this->trxLevel && (
+               return $this->trxLevel() && (
                        $this->trxDoneWrites ||
                        $this->trxIdleCallbacks ||
                        $this->trxPreCommitCallbacks ||
-                       $this->trxEndCallbacks
+                       $this->trxEndCallbacks ||
+                       $this->trxSectionCancelCallbacks
                );
        }
 
        public function preCommitCallbacksPending() {
-               return $this->trxLevel && $this->trxPreCommitCallbacks;
+               return $this->trxLevel() && $this->trxPreCommitCallbacks;
        }
 
        /**
@@ -651,7 +650,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL ) {
-               if ( !$this->trxLevel ) {
+               if ( !$this->trxLevel() ) {
                        return false;
                } elseif ( !$this->trxDoneWrites ) {
                        return 0.0;
@@ -681,7 +680,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        public function pendingWriteCallers() {
-               return $this->trxLevel ? $this->trxWriteCallers : [];
+               return $this->trxLevel() ? $this->trxWriteCallers : [];
        }
 
        public function pendingWriteRowsAffected() {
@@ -701,7 +700,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                foreach ( [
                        $this->trxIdleCallbacks,
                        $this->trxPreCommitCallbacks,
-                       $this->trxEndCallbacks
+                       $this->trxEndCallbacks,
+                       $this->trxSectionCancelCallbacks
                ] as $callbacks ) {
                        foreach ( $callbacks as $callback ) {
                                $fnames[] = $callback[1];
@@ -721,7 +721,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        public function isOpen() {
-               return $this->opened;
+               return (bool)$this->conn;
        }
 
        public function setFlag( $flag, $remember = self::REMEMBER_NOTHING ) {
@@ -865,11 +865,11 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        final public function close() {
                $exception = null; // error to throw after disconnecting
 
-               $wasOpen = $this->opened;
+               $wasOpen = (bool)$this->conn;
                // This should mostly do nothing if the connection is already closed
                if ( $this->conn ) {
                        // Roll back any dangling transaction first
-                       if ( $this->trxLevel ) {
+                       if ( $this->trxLevel() ) {
                                if ( $this->trxAtomicLevels ) {
                                        // Cannot let incomplete atomic sections be committed
                                        $levels = $this->flatAtomicSectionList();
@@ -914,7 +914,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                }
 
                $this->conn = false;
-               $this->opened = false;
 
                // Throw any unexpected errors after having disconnected
                if ( $exception instanceof Exception ) {
@@ -978,16 +977,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         */
        abstract protected function closeConnection();
 
-       /**
-        * @deprecated since 1.32
-        * @param string $error Fallback message, if none is given by DB
-        * @throws DBConnectionError
-        */
-       public function reportConnectionError( $error = 'Unknown error' ) {
-               call_user_func( $this->deprecationLogger, 'Use of ' . __METHOD__ . ' is deprecated.' );
-               throw new DBConnectionError( $this, $this->lastError() ?: $error );
-       }
-
        /**
         * Run a query and return a DBMS-dependent wrapper or boolean
         *
@@ -1005,8 +994,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         * For SELECT queries, this returns either:
         *   - a) A driver-specific value/resource, only on success. This can be iterated
         *        over by calling fetchObject()/fetchRow() until there are no more rows.
-        *        Alternatively, the result can be passed to resultObject() to obtain a
-        *        ResultWrapper instance which can then be iterated over via "foreach".
+        *        Alternatively, the result can be passed to resultObject() to obtain an
+        *        IResultWrapper instance which can then be iterated over via "foreach".
         *   - b) False, on any query failure
         *
         * For non-SELECT queries, this returns either:
@@ -1079,7 +1068,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        protected function isTransactableQuery( $sql ) {
                return !in_array(
                        $this->getQueryVerb( $sql ),
-                       [ 'BEGIN', 'ROLLBACK', 'COMMIT', 'SET', 'SHOW', 'CREATE', 'ALTER', 'USE' ],
+                       [ 'BEGIN', 'ROLLBACK', 'COMMIT', 'SET', 'SHOW', 'CREATE', 'ALTER', 'USE', 'SHOW' ],
                        true
                );
        }
@@ -1167,7 +1156,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        final protected function executeQuery( $sql, $fname, $flags ) {
                $this->assertHasConnectionHandle();
 
-               $priorTransaction = $this->trxLevel;
+               $priorTransaction = $this->trxLevel();
 
                if ( $this->isWriteQuery( $sql ) ) {
                        # In theory, non-persistent writes are allowed in read-only mode, but due to things
@@ -1194,8 +1183,10 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                // Send the query to the server and fetch any corresponding errors
                list( $ret, $err, $errno, $recoverableSR, $recoverableCL, $reconnected ) =
                        $this->executeQueryAttempt( $sql, $commentedSql, $isPermWrite, $fname, $flags );
+
                // Check if the query failed due to a recoverable connection loss
-               if ( $ret === false && $recoverableCL && $reconnected ) {
+               $allowRetry = !$this->hasFlags( $flags, self::QUERY_NO_RETRY );
+               if ( $ret === false && $recoverableCL && $reconnected && $allowRetry ) {
                        // Silently resend the query to the server since it is safe and possible
                        list( $ret, $err, $errno, $recoverableSR, $recoverableCL ) =
                                $this->executeQueryAttempt( $sql, $commentedSql, $isPermWrite, $fname, $flags );
@@ -1255,7 +1246,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                // Keep track of whether the transaction has write queries pending
                if ( $isPermWrite ) {
                        $this->lastWriteTime = microtime( true );
-                       if ( $this->trxLevel && !$this->trxDoneWrites ) {
+                       if ( $this->trxLevel() && !$this->trxDoneWrites ) {
                                $this->trxDoneWrites = true;
                                $this->trxProfiler->transactionWritingIn(
                                        $this->server, $this->getDomainID(), $this->trxShortId );
@@ -1285,7 +1276,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
 
                if ( $ret !== false ) {
                        $this->lastPing = $startTime;
-                       if ( $isPermWrite && $this->trxLevel ) {
+                       if ( $isPermWrite && $this->trxLevel() ) {
                                $this->updateTrxWriteQueryTime( $sql, $queryRuntime, $this->affectedRows() );
                                $this->trxWriteCallers[] = $fname;
                        }
@@ -1334,7 +1325,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         */
        private function beginIfImplied( $sql, $fname ) {
                if (
-                       !$this->trxLevel &&
+                       !$this->trxLevel() &&
                        $this->getFlag( self::DBO_TRX ) &&
                        $this->isTransactableQuery( $sql )
                ) {
@@ -1464,7 +1455,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                // https://www.postgresql.org/docs/9.4/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
                $this->sessionNamedLocks = [];
                // Session loss implies transaction loss
-               $this->trxLevel = 0;
+               $oldTrxShortId = $this->consumeTrxShortId();
                $this->trxAtomicCounter = 0;
                $this->trxIdleCallbacks = []; // T67263; transaction already lost
                $this->trxPreCommitCallbacks = []; // T67263; transaction already lost
@@ -1473,7 +1464,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        $this->trxProfiler->transactionWritingOut(
                                $this->server,
                                $this->getDomainID(),
-                               $this->trxShortId,
+                               $oldTrxShortId,
                                $this->pendingWriteQueryDuration( self::ESTIMATE_TOTAL ),
                                $this->trxWriteAffectedRows
                        );
@@ -1499,6 +1490,18 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                }
        }
 
+       /**
+        * Reset the transaction ID and return the old one
+        *
+        * @return string The old transaction ID or the empty string if there wasn't one
+        */
+       private function consumeTrxShortId() {
+               $old = $this->trxShortId;
+               $this->trxShortId = '';
+
+               return $old;
+       }
+
        /**
         * Checks whether the cause of the error is detected to be a timeout.
         *
@@ -1996,7 +1999,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        public function lockForUpdate(
                $table, $conds = '', $fname = __METHOD__, $options = [], $join_conds = []
        ) {
-               if ( !$this->trxLevel && !$this->getFlag( self::DBO_TRX ) ) {
+               if ( !$this->trxLevel() && !$this->getFlag( self::DBO_TRX ) ) {
                        throw new DBUnexpectedError(
                                $this,
                                __METHOD__ . ': no transaction is active nor is DBO_TRX set'
@@ -3343,21 +3346,21 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        final public function onTransactionResolution( callable $callback, $fname = __METHOD__ ) {
-               if ( !$this->trxLevel ) {
+               if ( !$this->trxLevel() ) {
                        throw new DBUnexpectedError( $this, "No transaction is active." );
                }
                $this->trxEndCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
        }
 
        final public function onTransactionCommitOrIdle( callable $callback, $fname = __METHOD__ ) {
-               if ( !$this->trxLevel && $this->getTransactionRoundId() ) {
+               if ( !$this->trxLevel() && $this->getTransactionRoundId() ) {
                        // Start an implicit transaction similar to how query() does
                        $this->begin( __METHOD__, self::TRANSACTION_INTERNAL );
                        $this->trxAutomatic = true;
                }
 
                $this->trxIdleCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
-               if ( !$this->trxLevel ) {
+               if ( !$this->trxLevel() ) {
                        $this->runOnTransactionIdleCallbacks( self::TRIGGER_IDLE );
                }
        }
@@ -3367,13 +3370,13 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        final public function onTransactionPreCommitOrIdle( callable $callback, $fname = __METHOD__ ) {
-               if ( !$this->trxLevel && $this->getTransactionRoundId() ) {
+               if ( !$this->trxLevel() && $this->getTransactionRoundId() ) {
                        // Start an implicit transaction similar to how query() does
                        $this->begin( __METHOD__, self::TRANSACTION_INTERNAL );
                        $this->trxAutomatic = true;
                }
 
-               if ( $this->trxLevel ) {
+               if ( $this->trxLevel() ) {
                        $this->trxPreCommitCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
                } else {
                        // No transaction is active nor will start implicitly, so make one for this callback
@@ -3388,11 +3391,18 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                }
        }
 
+       final public function onAtomicSectionCancel( callable $callback, $fname = __METHOD__ ) {
+               if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
+                       throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
+               }
+               $this->trxSectionCancelCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
+       }
+
        /**
         * @return AtomicSectionIdentifier|null ID of the topmost atomic section level
         */
        private function currentAtomicSectionId() {
-               if ( $this->trxLevel && $this->trxAtomicLevels ) {
+               if ( $this->trxLevel() && $this->trxAtomicLevels ) {
                        $levelInfo = end( $this->trxAtomicLevels );
 
                        return $levelInfo[1];
@@ -3402,6 +3412,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        /**
+        * Hoist callback ownership for callbacks in a section to a parent section.
+        * All callbacks should have an owner that is present in trxAtomicLevels.
         * @param AtomicSectionIdentifier $old
         * @param AtomicSectionIdentifier $new
         */
@@ -3423,13 +3435,35 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                                $this->trxEndCallbacks[$key][2] = $new;
                        }
                }
+               foreach ( $this->trxSectionCancelCallbacks as $key => $info ) {
+                       if ( $info[2] === $old ) {
+                               $this->trxSectionCancelCallbacks[$key][2] = $new;
+                       }
+               }
        }
 
        /**
+        * Update callbacks that were owned by cancelled atomic sections.
+        *
+        * Callbacks for "on commit" should never be run if they're owned by a
+        * section that won't be committed.
+        *
+        * Callbacks for "on resolution" need to reflect that the section was
+        * rolled back, even if the transaction as a whole commits successfully.
+        *
+        * Callbacks for "on section cancel" should already have been consumed,
+        * but errors during the cancellation itself can prevent that while still
+        * destroying the section. Hoist any such callbacks to the new top section,
+        * which we assume will itself have to be cancelled or rolled back to
+        * resolve the error.
+        *
         * @param AtomicSectionIdentifier[] $sectionIds ID of an actual savepoint
+        * @param AtomicSectionIdentifier|null $newSectionId New top section ID.
         * @throws UnexpectedValueException
         */
-       private function modifyCallbacksForCancel( array $sectionIds ) {
+       private function modifyCallbacksForCancel(
+               array $sectionIds, AtomicSectionIdentifier $newSectionId = null
+       ) {
                // Cancel the "on commit" callbacks owned by this savepoint
                $this->trxIdleCallbacks = array_filter(
                        $this->trxIdleCallbacks,
@@ -3448,8 +3482,17 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        if ( in_array( $entry[2], $sectionIds, true ) ) {
                                $callback = $entry[0];
                                $this->trxEndCallbacks[$key][0] = function () use ( $callback ) {
+                                       // @phan-suppress-next-line PhanInfiniteRecursion No recursion at all here, phan is confused
                                        return $callback( self::TRIGGER_ROLLBACK, $this );
                                };
+                               // This "on resolution" callback no longer belongs to a section.
+                               $this->trxEndCallbacks[$key][2] = null;
+                       }
+               }
+               // Hoist callback ownership for section cancel callbacks to the new top section
+               foreach ( $this->trxSectionCancelCallbacks as $key => $entry ) {
+                       if ( in_array( $entry[2], $sectionIds, true ) ) {
+                               $this->trxSectionCancelCallbacks[$key][2] = $newSectionId;
                        }
                }
        }
@@ -3485,7 +3528,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         * @throws Exception
         */
        public function runOnTransactionIdleCallbacks( $trigger ) {
-               if ( $this->trxLevel ) { // sanity
+               if ( $this->trxLevel() ) { // sanity
                        throw new DBUnexpectedError( $this, __METHOD__ . ': a transaction is still open.' );
                }
 
@@ -3504,6 +3547,14 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        );
                        $this->trxIdleCallbacks = []; // consumed (and recursion guard)
                        $this->trxEndCallbacks = []; // consumed (recursion guard)
+
+                       // Only run trxSectionCancelCallbacks on rollback, not commit.
+                       // But always consume them.
+                       if ( $trigger === self::TRIGGER_ROLLBACK ) {
+                               $callbacks = array_merge( $callbacks, $this->trxSectionCancelCallbacks );
+                       }
+                       $this->trxSectionCancelCallbacks = []; // consumed (recursion guard)
+
                        foreach ( $callbacks as $callback ) {
                                ++$count;
                                list( $phpCallback ) = $callback;
@@ -3571,6 +3622,46 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                return $count;
        }
 
+       /**
+        * Actually run any "atomic section cancel" callbacks.
+        *
+        * @param int $trigger IDatabase::TRIGGER_* constant
+        * @param AtomicSectionIdentifier[]|null $sectionIds Section IDs to cancel,
+        *  null on transaction rollback
+        */
+       private function runOnAtomicSectionCancelCallbacks(
+               $trigger, array $sectionIds = null
+       ) {
+               /** @var Exception|Throwable $e */
+               $e = null; // first exception
+
+               $notCancelled = [];
+               do {
+                       $callbacks = $this->trxSectionCancelCallbacks;
+                       $this->trxSectionCancelCallbacks = []; // consumed (recursion guard)
+                       foreach ( $callbacks as $entry ) {
+                               if ( $sectionIds === null || in_array( $entry[2], $sectionIds, true ) ) {
+                                       try {
+                                               $entry[0]( $trigger, $this );
+                                       } catch ( Exception $ex ) {
+                                               ( $this->errorLogger )( $ex );
+                                               $e = $e ?: $ex;
+                                       } catch ( Throwable $ex ) {
+                                               // @todo: Log?
+                                               $e = $e ?: $ex;
+                                       }
+                               } else {
+                                       $notCancelled[] = $entry;
+                               }
+                       }
+               } while ( count( $this->trxSectionCancelCallbacks ) );
+               $this->trxSectionCancelCallbacks = $notCancelled;
+
+               if ( $e !== null ) {
+                       throw $e; // re-throw any first Exception/Throwable
+               }
+       }
+
        /**
         * Actually run any "transaction listener" callbacks.
         *
@@ -3668,7 +3759,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        ) {
                $savepointId = $cancelable === self::ATOMIC_CANCELABLE ? self::$NOT_APPLICABLE : null;
 
-               if ( !$this->trxLevel ) {
+               if ( !$this->trxLevel() ) {
                        $this->begin( $fname, self::TRANSACTION_INTERNAL ); // sets trxAutomatic
                        // If DBO_TRX is set, a series of startAtomic/endAtomic pairs will result
                        // in all changes being in one transaction to keep requests transactional.
@@ -3694,7 +3785,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        final public function endAtomic( $fname = __METHOD__ ) {
-               if ( !$this->trxLevel || !$this->trxAtomicLevels ) {
+               if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
                        throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
                }
 
@@ -3730,71 +3821,83 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        final public function cancelAtomic(
                $fname = __METHOD__, AtomicSectionIdentifier $sectionId = null
        ) {
-               if ( !$this->trxLevel || !$this->trxAtomicLevels ) {
+               if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
                        throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
                }
 
-               $excisedFnames = [];
-               if ( $sectionId !== null ) {
-                       // Find the (last) section with the given $sectionId
-                       $pos = -1;
-                       foreach ( $this->trxAtomicLevels as $i => list( $asFname, $asId, $spId ) ) {
-                               if ( $asId === $sectionId ) {
-                                       $pos = $i;
+               $excisedIds = [];
+               $newTopSection = $this->currentAtomicSectionId();
+               try {
+                       $excisedFnames = [];
+                       if ( $sectionId !== null ) {
+                               // Find the (last) section with the given $sectionId
+                               $pos = -1;
+                               foreach ( $this->trxAtomicLevels as $i => list( $asFname, $asId, $spId ) ) {
+                                       if ( $asId === $sectionId ) {
+                                               $pos = $i;
+                                       }
                                }
+                               if ( $pos < 0 ) {
+                                       throw new DBUnexpectedError( $this, "Atomic section not found (for $fname)" );
+                               }
+                               // Remove all descendant sections and re-index the array
+                               $len = count( $this->trxAtomicLevels );
+                               for ( $i = $pos + 1; $i < $len; ++$i ) {
+                                       $excisedFnames[] = $this->trxAtomicLevels[$i][0];
+                                       $excisedIds[] = $this->trxAtomicLevels[$i][1];
+                               }
+                               $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
+                               $newTopSection = $this->currentAtomicSectionId();
                        }
-                       if ( $pos < 0 ) {
-                               throw new DBUnexpectedError( $this, "Atomic section not found (for $fname)" );
-                       }
-                       // Remove all descendant sections and re-index the array
-                       $excisedIds = [];
-                       $len = count( $this->trxAtomicLevels );
-                       for ( $i = $pos + 1; $i < $len; ++$i ) {
-                               $excisedFnames[] = $this->trxAtomicLevels[$i][0];
-                               $excisedIds[] = $this->trxAtomicLevels[$i][1];
-                       }
-                       $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
-                       $this->modifyCallbacksForCancel( $excisedIds );
-               }
 
-               // Check if the current section matches $fname
-               $pos = count( $this->trxAtomicLevels ) - 1;
-               list( $savedFname, $savedSectionId, $savepointId ) = $this->trxAtomicLevels[$pos];
+                       // Check if the current section matches $fname
+                       $pos = count( $this->trxAtomicLevels ) - 1;
+                       list( $savedFname, $savedSectionId, $savepointId ) = $this->trxAtomicLevels[$pos];
 
-               if ( $excisedFnames ) {
-                       $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname) " .
-                               "and descendants " . implode( ', ', $excisedFnames ) );
-               } else {
-                       $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname)" );
-               }
+                       if ( $excisedFnames ) {
+                               $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname) " .
+                                       "and descendants " . implode( ', ', $excisedFnames ) );
+                       } else {
+                               $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname)" );
+                       }
 
-               if ( $savedFname !== $fname ) {
-                       throw new DBUnexpectedError(
-                               $this,
-                               "Invalid atomic section ended (got $fname but expected $savedFname)."
-                       );
-               }
+                       if ( $savedFname !== $fname ) {
+                               throw new DBUnexpectedError(
+                                       $this,
+                                       "Invalid atomic section ended (got $fname but expected $savedFname)."
+                               );
+                       }
 
-               // Remove the last section (no need to re-index the array)
-               array_pop( $this->trxAtomicLevels );
-               $this->modifyCallbacksForCancel( [ $savedSectionId ] );
+                       // Remove the last section (no need to re-index the array)
+                       array_pop( $this->trxAtomicLevels );
+                       $excisedIds[] = $savedSectionId;
+                       $newTopSection = $this->currentAtomicSectionId();
 
-               if ( $savepointId !== null ) {
-                       // Rollback the transaction to the state just before this atomic section
-                       if ( $savepointId === self::$NOT_APPLICABLE ) {
-                               $this->rollback( $fname, self::FLUSHING_INTERNAL );
-                       } else {
-                               $this->doRollbackToSavepoint( $savepointId, $fname );
-                               $this->trxStatus = self::STATUS_TRX_OK; // no exception; recovered
-                               $this->trxStatusIgnoredCause = null;
+                       if ( $savepointId !== null ) {
+                               // Rollback the transaction to the state just before this atomic section
+                               if ( $savepointId === self::$NOT_APPLICABLE ) {
+                                       $this->rollback( $fname, self::FLUSHING_INTERNAL );
+                                       // Note: rollback() will run trxSectionCancelCallbacks
+                               } else {
+                                       $this->doRollbackToSavepoint( $savepointId, $fname );
+                                       $this->trxStatus = self::STATUS_TRX_OK; // no exception; recovered
+                                       $this->trxStatusIgnoredCause = null;
+
+                                       // Run trxSectionCancelCallbacks now.
+                                       $this->runOnAtomicSectionCancelCallbacks( self::TRIGGER_CANCEL, $excisedIds );
+                               }
+                       } elseif ( $this->trxStatus > self::STATUS_TRX_ERROR ) {
+                               // Put the transaction into an error state if it's not already in one
+                               $this->trxStatus = self::STATUS_TRX_ERROR;
+                               $this->trxStatusCause = new DBUnexpectedError(
+                                       $this,
+                                       "Uncancelable atomic section canceled (got $fname)."
+                               );
                        }
-               } elseif ( $this->trxStatus > self::STATUS_TRX_ERROR ) {
-                       // Put the transaction into an error state if it's not already in one
-                       $this->trxStatus = self::STATUS_TRX_ERROR;
-                       $this->trxStatusCause = new DBUnexpectedError(
-                               $this,
-                               "Uncancelable atomic section canceled (got $fname)."
-                       );
+               } finally {
+                       // Fix up callbacks owned by the sections that were just cancelled.
+                       // All callbacks should have an owner that is present in trxAtomicLevels.
+                       $this->modifyCallbacksForCancel( $excisedIds, $newTopSection );
                }
 
                $this->affectedRowCount = 0; // for the sake of consistency
@@ -3823,7 +3926,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                }
 
                // Protect against mismatched atomic section, transaction nesting, and snapshot loss
-               if ( $this->trxLevel ) {
+               if ( $this->trxLevel() ) {
                        if ( $this->trxAtomicLevels ) {
                                $levels = $this->flatAtomicSectionList();
                                $msg = "$fname: Got explicit BEGIN while atomic section(s) $levels are open.";
@@ -3843,6 +3946,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                $this->assertHasConnectionHandle();
 
                $this->doBegin( $fname );
+               $this->trxShortId = sprintf( '%06x', mt_rand( 0, 0xffffff ) );
                $this->trxStatus = self::STATUS_TRX_OK;
                $this->trxStatusIgnoredCause = null;
                $this->trxAtomicCounter = 0;
@@ -3851,7 +3955,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                $this->trxDoneWrites = false;
                $this->trxAutomaticAtomic = false;
                $this->trxAtomicLevels = [];
-               $this->trxShortId = sprintf( '%06x', mt_rand( 0, 0xffffff ) );
                $this->trxWriteDuration = 0.0;
                $this->trxWriteQueryCount = 0;
                $this->trxWriteAffectedRows = 0;
@@ -3873,10 +3976,10 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         *
         * @see Database::begin()
         * @param string $fname
+        * @throws DBError
         */
        protected function doBegin( $fname ) {
                $this->query( 'BEGIN', $fname );
-               $this->trxLevel = 1;
        }
 
        final public function commit( $fname = __METHOD__, $flush = self::FLUSHING_ONE ) {
@@ -3885,7 +3988,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        throw new DBUnexpectedError( $this, "$fname: invalid flush parameter '$flush'." );
                }
 
-               if ( $this->trxLevel && $this->trxAtomicLevels ) {
+               if ( $this->trxLevel() && $this->trxAtomicLevels ) {
                        // There are still atomic sections open; this cannot be ignored
                        $levels = $this->flatAtomicSectionList();
                        throw new DBUnexpectedError(
@@ -3895,7 +3998,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                }
 
                if ( $flush === self::FLUSHING_INTERNAL || $flush === self::FLUSHING_ALL_PEERS ) {
-                       if ( !$this->trxLevel ) {
+                       if ( !$this->trxLevel() ) {
                                return; // nothing to do
                        } elseif ( !$this->trxAutomatic ) {
                                throw new DBUnexpectedError(
@@ -3903,7 +4006,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                                        "$fname: Flushing an explicit transaction, getting out of sync."
                                );
                        }
-               } elseif ( !$this->trxLevel ) {
+               } elseif ( !$this->trxLevel() ) {
                        $this->queryLogger->error(
                                "$fname: No transaction to commit, something got out of sync." );
                        return; // nothing to do
@@ -3920,6 +4023,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
 
                $writeTime = $this->pendingWriteQueryDuration( self::ESTIMATE_DB_APPLY );
                $this->doCommit( $fname );
+               $oldTrxShortId = $this->consumeTrxShortId();
                $this->trxStatus = self::STATUS_TRX_NONE;
 
                if ( $this->trxDoneWrites ) {
@@ -3927,7 +4031,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        $this->trxProfiler->transactionWritingOut(
                                $this->server,
                                $this->getDomainID(),
-                               $this->trxShortId,
+                               $oldTrxShortId,
                                $writeTime,
                                $this->trxWriteAffectedRows
                        );
@@ -3945,16 +4049,16 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         *
         * @see Database::commit()
         * @param string $fname
+        * @throws DBError
         */
        protected function doCommit( $fname ) {
-               if ( $this->trxLevel ) {
+               if ( $this->trxLevel() ) {
                        $this->query( 'COMMIT', $fname );
-                       $this->trxLevel = 0;
                }
        }
 
        final public function rollback( $fname = __METHOD__, $flush = self::FLUSHING_ONE ) {
-               $trxActive = $this->trxLevel;
+               $trxActive = $this->trxLevel();
 
                if ( $flush !== self::FLUSHING_INTERNAL
                        && $flush !== self::FLUSHING_ALL_PEERS
@@ -3970,6 +4074,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        $this->assertHasConnectionHandle();
 
                        $this->doRollback( $fname );
+                       $oldTrxShortId = $this->consumeTrxShortId();
                        $this->trxStatus = self::STATUS_TRX_NONE;
                        $this->trxAtomicLevels = [];
                        // Estimate the RTT via a query now that trxStatus is OK
@@ -3979,7 +4084,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                                $this->trxProfiler->transactionWritingOut(
                                        $this->server,
                                        $this->getDomainID(),
-                                       $this->trxShortId,
+                                       $oldTrxShortId,
                                        $writeTime,
                                        $this->trxWriteAffectedRows
                                );
@@ -4013,13 +4118,13 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         *
         * @see Database::rollback()
         * @param string $fname
+        * @throws DBError
         */
        protected function doRollback( $fname ) {
-               if ( $this->trxLevel ) {
+               if ( $this->trxLevel() ) {
                        # Disconnects cause rollback anyway, so ignore those errors
                        $ignoreErrors = true;
                        $this->query( 'ROLLBACK', $fname, $ignoreErrors );
-                       $this->trxLevel = 0;
                }
        }
 
@@ -4037,7 +4142,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        public function explicitTrxActive() {
-               return $this->trxLevel && ( $this->trxAtomicLevels || !$this->trxAutomatic );
+               return $this->trxLevel() && ( $this->trxAtomicLevels || !$this->trxAutomatic );
        }
 
        public function duplicateTableStructure(
@@ -4080,9 +4185,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        abstract protected function fetchAffectedRowCount();
 
        /**
-        * Take the result from a query, and wrap it in a ResultWrapper if
-        * necessary. Boolean values are passed through as is, to indicate success
-        * of write queries or failure.
+        * Take a query result and wrap it in an iterable result wrapper if necessary.
+        * Booleans are passed through as-is to indicate success/failure of write queries.
         *
         * Once upon a time, Database::query() returned a bare MySQL result
         * resource, and it was necessary to call this function to convert it to
@@ -4094,12 +4198,11 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         */
        protected function resultObject( $result ) {
                if ( !$result ) {
-                       return false;
-               } elseif ( $result instanceof ResultWrapper ) {
+                       return false; // failed query
+               } elseif ( $result instanceof IResultWrapper ) {
                        return $result;
                } elseif ( $result === true ) {
-                       // Successful write query
-                       return $result;
+                       return $result; // succesful write query
                } else {
                        return new ResultWrapper( $this, $result );
                }
@@ -4134,7 +4237,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         */
        protected function replaceLostConnection( $fname ) {
                $this->closeConnection();
-               $this->opened = false;
                $this->conn = false;
 
                $this->handleSessionLossPreconnect();
@@ -4190,7 +4292,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         * @since 1.27
         */
        final protected function getRecordedTransactionLagStatus() {
-               return ( $this->trxLevel && $this->trxReplicaLag !== null )
+               return ( $this->trxLevel() && $this->trxReplicaLag !== null )
                        ? [ 'lag' => $this->trxReplicaLag, 'since' => $this->trxTimestamp() ]
                        : null;
        }
@@ -4708,9 +4810,9 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
 
                if ( $this->isOpen() ) {
                        // Open a new connection resource without messing with the old one
-                       $this->opened = false;
                        $this->conn = false;
                        $this->trxEndCallbacks = []; // don't copy
+                       $this->trxSectionCancelCallbacks = []; // don't copy
                        $this->handleSessionLossPreconnect(); // no trx or locks anymore
                        $this->open(
                                $this->server,
@@ -4738,7 +4840,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         * Run a few simple sanity checks and close dangling connections
         */
        public function __destruct() {
-               if ( $this->trxLevel && $this->trxDoneWrites ) {
+               if ( $this->trxLevel() && $this->trxDoneWrites ) {
                        trigger_error( "Uncommitted DB writes (transaction from {$this->trxFname})." );
                }
 
@@ -4755,7 +4857,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        $this->closeConnection();
                        Wikimedia\restoreWarnings();
                        $this->conn = false;
-                       $this->opened = false;
                }
        }
 }