rdbms: Add callback for atomic section cancellation
authorBrad Jorsch <bjorsch@wikimedia.org>
Tue, 4 Jun 2019 18:43:18 +0000 (14:43 -0400)
committerAaron Schulz <aschulz@wikimedia.org>
Fri, 28 Jun 2019 03:09:42 +0000 (20:09 -0700)
The callback will be called immediately when the section is cancelled,
whether that occurs directly, or via cancelling of an outer section, or
via rollback of the entire transaction.

Change-Id: Id05296948b52b95544547bd38c4387496b6c83b9

includes/libs/rdbms/database/DBConnRef.php
includes/libs/rdbms/database/Database.php
includes/libs/rdbms/database/IDatabase.php
tests/phpunit/includes/libs/rdbms/database/DatabaseSQLTest.php

index 8af6bb3..c8e31df 100644 (file)
@@ -598,6 +598,10 @@ class DBConnRef implements IDatabase {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
 
+       public function onAtomicSectionCancel( callable $callback, $fname = __METHOD__ ) {
+               return $this->__call( __FUNCTION__, func_get_args() );
+       }
+
        public function setTransactionListener( $name, callable $callback = null ) {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
index 971257f..c86adac 100644 (file)
@@ -38,6 +38,7 @@ use InvalidArgumentException;
 use UnexpectedValueException;
 use Exception;
 use RuntimeException;
+use Throwable;
 
 /**
  * Relational database abstraction object
@@ -148,6 +149,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        private $trxPreCommitCallbacks = [];
        /** @var array[] List of (callable, method name, atomic section id) */
        private $trxEndCallbacks = [];
+       /** @var array[] List of (callable, method name, atomic section id) */
+       private $trxSectionCancelCallbacks = [];
        /** @var callable[] Map of (name => callable) */
        private $trxRecurringCallbacks = [];
        /** @var bool Whether to suppress triggering of transaction end callbacks */
@@ -625,7 +628,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        $this->trxDoneWrites ||
                        $this->trxIdleCallbacks ||
                        $this->trxPreCommitCallbacks ||
-                       $this->trxEndCallbacks
+                       $this->trxEndCallbacks ||
+                       $this->trxSectionCancelCallbacks
                );
        }
 
@@ -698,7 +702,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                foreach ( [
                        $this->trxIdleCallbacks,
                        $this->trxPreCommitCallbacks,
-                       $this->trxEndCallbacks
+                       $this->trxEndCallbacks,
+                       $this->trxSectionCancelCallbacks
                ] as $callbacks ) {
                        foreach ( $callbacks as $callback ) {
                                $fnames[] = $callback[1];
@@ -3376,6 +3381,13 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                }
        }
 
+       final public function onAtomicSectionCancel( callable $callback, $fname = __METHOD__ ) {
+               if ( !$this->trxLevel || !$this->trxAtomicLevels ) {
+                       throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
+               }
+               $this->trxSectionCancelCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
+       }
+
        /**
         * @return AtomicSectionIdentifier|null ID of the topmost atomic section level
         */
@@ -3390,6 +3402,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        /**
+        * Hoist callback ownership for callbacks in a section to a parent section.
+        * All callbacks should have an owner that is present in trxAtomicLevels.
         * @param AtomicSectionIdentifier $old
         * @param AtomicSectionIdentifier $new
         */
@@ -3411,13 +3425,35 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                                $this->trxEndCallbacks[$key][2] = $new;
                        }
                }
+               foreach ( $this->trxSectionCancelCallbacks as $key => $info ) {
+                       if ( $info[2] === $old ) {
+                               $this->trxSectionCancelCallbacks[$key][2] = $new;
+                       }
+               }
        }
 
        /**
+        * Update callbacks that were owned by cancelled atomic sections.
+        *
+        * Callbacks for "on commit" should never be run if they're owned by a
+        * section that won't be committed.
+        *
+        * Callbacks for "on resolution" need to reflect that the section was
+        * rolled back, even if the transaction as a whole commits successfully.
+        *
+        * Callbacks for "on section cancel" should already have been consumed,
+        * but errors during the cancellation itself can prevent that while still
+        * destroying the section. Hoist any such callbacks to the new top section,
+        * which we assume will itself have to be cancelled or rolled back to
+        * resolve the error.
+        *
         * @param AtomicSectionIdentifier[] $sectionIds ID of an actual savepoint
+        * @param AtomicSectionIdentifier|null $newSectionId New top section ID.
         * @throws UnexpectedValueException
         */
-       private function modifyCallbacksForCancel( array $sectionIds ) {
+       private function modifyCallbacksForCancel(
+               array $sectionIds, AtomicSectionIdentifier $newSectionId = null
+       ) {
                // Cancel the "on commit" callbacks owned by this savepoint
                $this->trxIdleCallbacks = array_filter(
                        $this->trxIdleCallbacks,
@@ -3436,8 +3472,17 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        if ( in_array( $entry[2], $sectionIds, true ) ) {
                                $callback = $entry[0];
                                $this->trxEndCallbacks[$key][0] = function () use ( $callback ) {
+                                       // @phan-suppress-next-line PhanInfiniteRecursion No recursion at all here, phan is confused
                                        return $callback( self::TRIGGER_ROLLBACK, $this );
                                };
+                               // This "on resolution" callback no longer belongs to a section.
+                               $this->trxEndCallbacks[$key][2] = null;
+                       }
+               }
+               // Hoist callback ownership for section cancel callbacks to the new top section
+               foreach ( $this->trxSectionCancelCallbacks as $key => $entry ) {
+                       if ( in_array( $entry[2], $sectionIds, true ) ) {
+                               $this->trxSectionCancelCallbacks[$key][2] = $newSectionId;
                        }
                }
        }
@@ -3492,6 +3537,14 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        );
                        $this->trxIdleCallbacks = []; // consumed (and recursion guard)
                        $this->trxEndCallbacks = []; // consumed (recursion guard)
+
+                       // Only run trxSectionCancelCallbacks on rollback, not commit.
+                       // But always consume them.
+                       if ( $trigger === self::TRIGGER_ROLLBACK ) {
+                               $callbacks = array_merge( $callbacks, $this->trxSectionCancelCallbacks );
+                       }
+                       $this->trxSectionCancelCallbacks = []; // consumed (recursion guard)
+
                        foreach ( $callbacks as $callback ) {
                                ++$count;
                                list( $phpCallback ) = $callback;
@@ -3559,6 +3612,46 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                return $count;
        }
 
+       /**
+        * Actually run any "atomic section cancel" callbacks.
+        *
+        * @param int $trigger IDatabase::TRIGGER_* constant
+        * @param AtomicSectionIdentifier[]|null $sectionId Section IDs to cancel,
+        *  null on transaction rollback
+        */
+       private function runOnAtomicSectionCancelCallbacks(
+               $trigger, array $sectionIds = null
+       ) {
+               /** @var Exception|Throwable $e */
+               $e = null; // first exception
+
+               $notCancelled = [];
+               do {
+                       $callbacks = $this->trxSectionCancelCallbacks;
+                       $this->trxSectionCancelCallbacks = []; // consumed (recursion guard)
+                       foreach ( $callbacks as $entry ) {
+                               if ( $sectionIds === null || in_array( $entry[2], $sectionIds, true ) ) {
+                                       try {
+                                               $entry[0]( $trigger, $this );
+                                       } catch ( Exception $ex ) {
+                                               ( $this->errorLogger )( $ex );
+                                               $e = $e ?: $ex;
+                                       } catch ( Throwable $ex ) {
+                                               // @todo: Log?
+                                               $e = $e ?: $ex;
+                                       }
+                               } else {
+                                       $notCancelled[] = $entry;
+                               }
+                       }
+               } while ( count( $this->trxSectionCancelCallbacks ) );
+               $this->trxSectionCancelCallbacks = $notCancelled;
+
+               if ( $e !== null ) {
+                       throw $e; // re-throw any first Exception/Throwable
+               }
+       }
+
        /**
         * Actually run any "transaction listener" callbacks.
         *
@@ -3722,67 +3815,79 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
                }
 
-               $excisedFnames = [];
-               if ( $sectionId !== null ) {
-                       // Find the (last) section with the given $sectionId
-                       $pos = -1;
-                       foreach ( $this->trxAtomicLevels as $i => list( $asFname, $asId, $spId ) ) {
-                               if ( $asId === $sectionId ) {
-                                       $pos = $i;
+               $excisedIds = [];
+               $newTopSection = $this->currentAtomicSectionId();
+               try {
+                       $excisedFnames = [];
+                       if ( $sectionId !== null ) {
+                               // Find the (last) section with the given $sectionId
+                               $pos = -1;
+                               foreach ( $this->trxAtomicLevels as $i => list( $asFname, $asId, $spId ) ) {
+                                       if ( $asId === $sectionId ) {
+                                               $pos = $i;
+                                       }
                                }
+                               if ( $pos < 0 ) {
+                                       throw new DBUnexpectedError( $this, "Atomic section not found (for $fname)" );
+                               }
+                               // Remove all descendant sections and re-index the array
+                               $len = count( $this->trxAtomicLevels );
+                               for ( $i = $pos + 1; $i < $len; ++$i ) {
+                                       $excisedFnames[] = $this->trxAtomicLevels[$i][0];
+                                       $excisedIds[] = $this->trxAtomicLevels[$i][1];
+                               }
+                               $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
+                               $newTopSection = $this->currentAtomicSectionId();
                        }
-                       if ( $pos < 0 ) {
-                               throw new DBUnexpectedError( $this, "Atomic section not found (for $fname)" );
-                       }
-                       // Remove all descendant sections and re-index the array
-                       $excisedIds = [];
-                       $len = count( $this->trxAtomicLevels );
-                       for ( $i = $pos + 1; $i < $len; ++$i ) {
-                               $excisedFnames[] = $this->trxAtomicLevels[$i][0];
-                               $excisedIds[] = $this->trxAtomicLevels[$i][1];
-                       }
-                       $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
-                       $this->modifyCallbacksForCancel( $excisedIds );
-               }
 
-               // Check if the current section matches $fname
-               $pos = count( $this->trxAtomicLevels ) - 1;
-               list( $savedFname, $savedSectionId, $savepointId ) = $this->trxAtomicLevels[$pos];
+                       // Check if the current section matches $fname
+                       $pos = count( $this->trxAtomicLevels ) - 1;
+                       list( $savedFname, $savedSectionId, $savepointId ) = $this->trxAtomicLevels[$pos];
 
-               if ( $excisedFnames ) {
-                       $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname) " .
-                               "and descendants " . implode( ', ', $excisedFnames ) );
-               } else {
-                       $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname)" );
-               }
+                       if ( $excisedFnames ) {
+                               $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname) " .
+                                       "and descendants " . implode( ', ', $excisedFnames ) );
+                       } else {
+                               $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname)" );
+                       }
 
-               if ( $savedFname !== $fname ) {
-                       throw new DBUnexpectedError(
-                               $this,
-                               "Invalid atomic section ended (got $fname but expected $savedFname)."
-                       );
-               }
+                       if ( $savedFname !== $fname ) {
+                               throw new DBUnexpectedError(
+                                       $this,
+                                       "Invalid atomic section ended (got $fname but expected $savedFname)."
+                               );
+                       }
 
-               // Remove the last section (no need to re-index the array)
-               array_pop( $this->trxAtomicLevels );
-               $this->modifyCallbacksForCancel( [ $savedSectionId ] );
+                       // Remove the last section (no need to re-index the array)
+                       array_pop( $this->trxAtomicLevels );
+                       $excisedIds[] = $savedSectionId;
+                       $newTopSection = $this->currentAtomicSectionId();
 
-               if ( $savepointId !== null ) {
-                       // Rollback the transaction to the state just before this atomic section
-                       if ( $savepointId === self::$NOT_APPLICABLE ) {
-                               $this->rollback( $fname, self::FLUSHING_INTERNAL );
-                       } else {
-                               $this->doRollbackToSavepoint( $savepointId, $fname );
-                               $this->trxStatus = self::STATUS_TRX_OK; // no exception; recovered
-                               $this->trxStatusIgnoredCause = null;
+                       if ( $savepointId !== null ) {
+                               // Rollback the transaction to the state just before this atomic section
+                               if ( $savepointId === self::$NOT_APPLICABLE ) {
+                                       $this->rollback( $fname, self::FLUSHING_INTERNAL );
+                                       // Note: rollback() will run trxSectionCancelCallbacks
+                               } else {
+                                       $this->doRollbackToSavepoint( $savepointId, $fname );
+                                       $this->trxStatus = self::STATUS_TRX_OK; // no exception; recovered
+                                       $this->trxStatusIgnoredCause = null;
+
+                                       // Run trxSectionCancelCallbacks now.
+                                       $this->runOnAtomicSectionCancelCallbacks( self::TRIGGER_CANCEL, $excisedIds );
+                               }
+                       } elseif ( $this->trxStatus > self::STATUS_TRX_ERROR ) {
+                               // Put the transaction into an error state if it's not already in one
+                               $this->trxStatus = self::STATUS_TRX_ERROR;
+                               $this->trxStatusCause = new DBUnexpectedError(
+                                       $this,
+                                       "Uncancelable atomic section canceled (got $fname)."
+                               );
                        }
-               } elseif ( $this->trxStatus > self::STATUS_TRX_ERROR ) {
-                       // Put the transaction into an error state if it's not already in one
-                       $this->trxStatus = self::STATUS_TRX_ERROR;
-                       $this->trxStatusCause = new DBUnexpectedError(
-                               $this,
-                               "Uncancelable atomic section canceled (got $fname)."
-                       );
+               } finally {
+                       // Fix up callbacks owned by the sections that were just cancelled.
+                       // All callbacks should have an owner that is present in trxAtomicLevels.
+                       $this->modifyCallbacksForCancel( $excisedIds, $newTopSection );
                }
 
                $this->affectedRowCount = 0; // for the sake of consistency
@@ -4697,6 +4802,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        // Open a new connection resource without messing with the old one
                        $this->conn = false;
                        $this->trxEndCallbacks = []; // don't copy
+                       $this->trxSectionCancelCallbacks = []; // don't copy
                        $this->handleSessionLossPreconnect(); // no trx or locks anymore
                        $this->open(
                                $this->server,
index a462916..fca2c00 100644 (file)
@@ -42,6 +42,8 @@ interface IDatabase {
        const TRIGGER_COMMIT = 2;
        /** @var int Callback triggered by ROLLBACK */
        const TRIGGER_ROLLBACK = 3;
+       /** @var int Callback triggered by atomic section cancel (ROLLBACK TO SAVEPOINT) */
+       const TRIGGER_CANCEL = 4;
 
        /** @var string Transaction is requested by regular caller outside of the DB layer */
        const TRANSACTION_EXPLICIT = '';
@@ -1580,6 +1582,9 @@ interface IDatabase {
         *
         * This is useful for combining cooperative locks and DB transactions.
         *
+        * Note this is called when the whole transaction is resolved. To take action immediately
+        * when an atomic section is cancelled, use onAtomicSectionCancel().
+        *
         * @note do not assume that *other* IDatabase instances will be AUTOCOMMIT mode
         *
         * The callback takes the following arguments:
@@ -1661,6 +1666,31 @@ interface IDatabase {
         */
        public function onTransactionPreCommitOrIdle( callable $callback, $fname = __METHOD__ );
 
+       /**
+        * Run a callback when the atomic section is cancelled.
+        *
+        * The callback is run just after the current atomic section, any outer
+        * atomic section, or the whole transaction is rolled back.
+        *
+        * An error is thrown if no atomic section is pending. The atomic section
+        * need not have been created with the ATOMIC_CANCELABLE flag.
+        *
+        * Queries in the function may be running in the context of an outer
+        * transaction or may be running in AUTOCOMMIT mode. The callback should
+        * use atomic sections if necessary.
+        *
+        * @note do not assume that *other* IDatabase instances will be AUTOCOMMIT mode
+        *
+        * The callback takes the following arguments:
+        *   - IDatabase::TRIGGER_CANCEL or IDatabase::TRIGGER_ROLLBACK
+        *   - This IDatabase instance
+        *
+        * @param callable $callback
+        * @param string $fname Caller name
+        * @since 1.34
+        */
+       public function onAtomicSectionCancel( callable $callback, $fname = __METHOD__ );
+
        /**
         * Run a callback after each time any transaction commits or rolls back
         *
index 0e133d8..3d79afe 100644 (file)
@@ -1488,7 +1488,8 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                $triggerMap = [
                        '-' => '-',
                        IDatabase::TRIGGER_COMMIT => 'tCommit',
-                       IDatabase::TRIGGER_ROLLBACK => 'tRollback'
+                       IDatabase::TRIGGER_ROLLBACK => 'tRollback',
+                       IDatabase::TRIGGER_CANCEL => 'tCancel',
                ];
                $pcCallback = function ( IDatabase $db ) use ( $fname ) {
                        $this->database->query( "SELECT 0", $fname );
@@ -1518,6 +1519,11 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                $this->database->cancelAtomic( __METHOD__ );
                $this->assertLastSql( 'BEGIN; ROLLBACK; SELECT 1, tRollback AS t' );
 
+               $this->database->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
+               $this->database->onAtomicSectionCancel( $callback1, __METHOD__ );
+               $this->database->cancelAtomic( __METHOD__ );
+               $this->assertLastSql( 'BEGIN; ROLLBACK; SELECT 1, tRollback AS t' );
+
                $this->database->startAtomic( __METHOD__ . '_outer' );
                $this->database->onTransactionPreCommitOrIdle( $pcCallback, __METHOD__ );
                $this->database->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
@@ -1567,6 +1573,21 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                        'SELECT 3, tCommit AS t'
                ] ) );
 
+               $this->database->startAtomic( __METHOD__ . '_outer' );
+               $this->database->onAtomicSectionCancel( $callback1, __METHOD__ );
+               $this->database->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
+               $this->database->onAtomicSectionCancel( $callback2, __METHOD__ );
+               $this->database->cancelAtomic( __METHOD__ );
+               $this->database->onAtomicSectionCancel( $callback3, __METHOD__ );
+               $this->database->endAtomic( __METHOD__ . '_outer' );
+               $this->assertLastSql( implode( "; ", [
+                       'BEGIN',
+                       'SAVEPOINT wikimedia_rdbms_atomic1',
+                       'ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1',
+                       'SELECT 2, tCancel AS t',
+                       'COMMIT',
+               ] ) );
+
                $makeCallback = function ( $id ) use ( $fname, $triggerMap ) {
                        return function ( $trigger = '-' ) use ( $id, $fname, $triggerMap ) {
                                $this->database->query( "SELECT $id, {$triggerMap[$trigger]} AS t", $fname );
@@ -1609,6 +1630,29 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                        'SELECT 3, tRollback AS t',
                        'SELECT 4, tCommit AS t'
                ] ) );
+
+               $this->database->startAtomic( __METHOD__ . '_level1', IDatabase::ATOMIC_CANCELABLE );
+               $this->database->onAtomicSectionCancel( $makeCallback( 1 ), __METHOD__ );
+               $this->database->startAtomic( __METHOD__ . '_level2' );
+               $this->database->startAtomic( __METHOD__ . '_level3', IDatabase::ATOMIC_CANCELABLE );
+               $this->database->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
+               $this->database->onAtomicSectionCancel( $makeCallback( 2 ), __METHOD__ );
+               $this->database->endAtomic( __METHOD__ );
+               $this->database->onAtomicSectionCancel( $makeCallback( 3 ), __METHOD__ );
+               $this->database->cancelAtomic( __METHOD__ . '_level3' );
+               $this->database->endAtomic( __METHOD__ . '_level2' );
+               $this->database->onAtomicSectionCancel( $makeCallback( 4 ), __METHOD__ );
+               $this->database->endAtomic( __METHOD__ . '_level1' );
+               $this->assertLastSql( implode( "; ", [
+                       'BEGIN',
+                       'SAVEPOINT wikimedia_rdbms_atomic1',
+                       'SAVEPOINT wikimedia_rdbms_atomic2',
+                       'RELEASE SAVEPOINT wikimedia_rdbms_atomic2',
+                       'ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1',
+                       'SELECT 2, tCancel AS t',
+                       'SELECT 3, tCancel AS t',
+                       'COMMIT',
+               ] ) );
        }
 
        /**
@@ -1692,6 +1736,16 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                        $callback3Called = $trigger;
                        $this->database->query( "SELECT 3", $fname );
                };
+               $callback4Called = 0;
+               $callback4 = function () use ( $fname, &$callback4Called ) {
+                       $callback4Called++;
+                       $this->database->query( "SELECT 4", $fname );
+               };
+               $callback5Called = 0;
+               $callback5 = function () use ( $fname, &$callback5Called ) {
+                       $callback5Called++;
+                       $this->database->query( "SELECT 5", $fname );
+               };
 
                $this->database->startAtomic( __METHOD__ . '_outer' );
                $this->database->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
@@ -1699,57 +1753,67 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                $this->database->onTransactionCommitOrIdle( $callback1, __METHOD__ );
                $this->database->onTransactionPreCommitOrIdle( $callback2, __METHOD__ );
                $this->database->onTransactionResolution( $callback3, __METHOD__ );
+               $this->database->onAtomicSectionCancel( $callback4, __METHOD__ );
                $this->database->endAtomic( __METHOD__ . '_inner' );
                $this->database->cancelAtomic( __METHOD__ );
                $this->database->endAtomic( __METHOD__ . '_outer' );
                $this->assertNull( $callback1Called );
                $this->assertNull( $callback2Called );
                $this->assertEquals( IDatabase::TRIGGER_ROLLBACK, $callback3Called );
+               $this->assertEquals( 1, $callback4Called );
                // phpcs:ignore Generic.Files.LineLength
-               $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; COMMIT; SELECT 3' );
+               $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; SELECT 4; COMMIT; SELECT 3' );
 
                $callback1Called = null;
                $callback2Called = null;
                $callback3Called = null;
+               $callback4Called = 0;
                $this->database->startAtomic( __METHOD__ . '_outer' );
                $this->database->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
                $this->database->startAtomic( __METHOD__ . '_inner', IDatabase::ATOMIC_CANCELABLE );
                $this->database->onTransactionCommitOrIdle( $callback1, __METHOD__ );
                $this->database->onTransactionPreCommitOrIdle( $callback2, __METHOD__ );
                $this->database->onTransactionResolution( $callback3, __METHOD__ );
+               $this->database->onAtomicSectionCancel( $callback4, __METHOD__ );
                $this->database->endAtomic( __METHOD__ . '_inner' );
                $this->database->cancelAtomic( __METHOD__ );
                $this->database->endAtomic( __METHOD__ . '_outer' );
                $this->assertNull( $callback1Called );
                $this->assertNull( $callback2Called );
                $this->assertEquals( IDatabase::TRIGGER_ROLLBACK, $callback3Called );
+               $this->assertEquals( 1, $callback4Called );
                // phpcs:ignore Generic.Files.LineLength
-               $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; SAVEPOINT wikimedia_rdbms_atomic2; RELEASE SAVEPOINT wikimedia_rdbms_atomic2; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; COMMIT; SELECT 3' );
+               $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; SAVEPOINT wikimedia_rdbms_atomic2; RELEASE SAVEPOINT wikimedia_rdbms_atomic2; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; SELECT 4; COMMIT; SELECT 3' );
 
                $callback1Called = null;
                $callback2Called = null;
                $callback3Called = null;
+               $callback4Called = 0;
                $this->database->startAtomic( __METHOD__ . '_outer' );
                $atomicId = $this->database->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
                $this->database->startAtomic( __METHOD__ . '_inner' );
                $this->database->onTransactionCommitOrIdle( $callback1, __METHOD__ );
                $this->database->onTransactionPreCommitOrIdle( $callback2, __METHOD__ );
                $this->database->onTransactionResolution( $callback3, __METHOD__ );
+               $this->database->onAtomicSectionCancel( $callback4, __METHOD__ );
                $this->database->cancelAtomic( __METHOD__, $atomicId );
                $this->database->endAtomic( __METHOD__ . '_outer' );
                $this->assertNull( $callback1Called );
                $this->assertNull( $callback2Called );
                $this->assertEquals( IDatabase::TRIGGER_ROLLBACK, $callback3Called );
+               $this->assertEquals( 1, $callback4Called );
 
                $callback1Called = null;
                $callback2Called = null;
                $callback3Called = null;
+               $callback4Called = 0;
                $this->database->startAtomic( __METHOD__ . '_outer' );
                $atomicId = $this->database->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
                $this->database->startAtomic( __METHOD__ . '_inner' );
                $this->database->onTransactionCommitOrIdle( $callback1, __METHOD__ );
                $this->database->onTransactionPreCommitOrIdle( $callback2, __METHOD__ );
                $this->database->onTransactionResolution( $callback3, __METHOD__ );
+               $this->database->onAtomicSectionCancel( $callback4, __METHOD__ );
                try {
                        $this->database->cancelAtomic( __METHOD__ . '_X', $atomicId );
                } catch ( DBUnexpectedError $e ) {
@@ -1764,30 +1828,65 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                $this->assertNull( $callback1Called );
                $this->assertNull( $callback2Called );
                $this->assertEquals( IDatabase::TRIGGER_ROLLBACK, $callback3Called );
+               $this->assertEquals( 1, $callback4Called );
 
+               $callback4Called = 0;
+               $callback5Called = 0;
+               $this->database->getLastSqls(); // flush
                $this->database->startAtomic( __METHOD__ . '_outer' );
                $this->database->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
-               $this->database->startAtomic( __METHOD__ . '_inner' );
-               $this->database->onTransactionCommitOrIdle( $callback1, __METHOD__ );
-               $this->database->onTransactionPreCommitOrIdle( $callback2, __METHOD__ );
-               $this->database->onTransactionResolution( $callback3, __METHOD__ );
+               $this->database->onAtomicSectionCancel( $callback5, __METHOD__ );
+               $this->database->startAtomic( __METHOD__ . '_inner', IDatabase::ATOMIC_CANCELABLE );
+               $this->database->onAtomicSectionCancel( $callback4, __METHOD__ );
                $this->database->cancelAtomic( __METHOD__ . '_inner' );
                $this->database->cancelAtomic( __METHOD__ );
                $this->database->endAtomic( __METHOD__ . '_outer' );
-               $this->assertNull( $callback1Called );
-               $this->assertNull( $callback2Called );
-               $this->assertEquals( IDatabase::TRIGGER_ROLLBACK, $callback3Called );
+               // phpcs:ignore Generic.Files.LineLength
+               $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; SAVEPOINT wikimedia_rdbms_atomic2; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic2; SELECT 4; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; SELECT 5; COMMIT' );
+               $this->assertEquals( 1, $callback4Called );
+               $this->assertEquals( 1, $callback5Called );
+
+               $callback4Called = 0;
+               $callback5Called = 0;
+               $this->database->startAtomic( __METHOD__ . '_outer' );
+               $this->database->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
+               $this->database->onAtomicSectionCancel( $callback5, __METHOD__ );
+               $this->database->startAtomic( __METHOD__ . '_inner', IDatabase::ATOMIC_CANCELABLE );
+               $this->database->onAtomicSectionCancel( $callback4, __METHOD__ );
+               $this->database->endAtomic( __METHOD__ . '_inner' );
+               $this->database->cancelAtomic( __METHOD__ );
+               $this->database->endAtomic( __METHOD__ . '_outer' );
+               // phpcs:ignore Generic.Files.LineLength
+               $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; SAVEPOINT wikimedia_rdbms_atomic2; RELEASE SAVEPOINT wikimedia_rdbms_atomic2; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; SELECT 5; SELECT 4; COMMIT' );
+               $this->assertEquals( 1, $callback4Called );
+               $this->assertEquals( 1, $callback5Called );
+
+               $callback4Called = 0;
+               $callback5Called = 0;
+               $this->database->startAtomic( __METHOD__ . '_outer' );
+               $sectionId = $this->database->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
+               $this->database->onAtomicSectionCancel( $callback5, __METHOD__ );
+               $this->database->startAtomic( __METHOD__ . '_inner', IDatabase::ATOMIC_CANCELABLE );
+               $this->database->onAtomicSectionCancel( $callback4, __METHOD__ );
+               $this->database->cancelAtomic( __METHOD__, $sectionId );
+               $this->database->endAtomic( __METHOD__ . '_outer' );
+               // phpcs:ignore Generic.Files.LineLength
+               $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; SAVEPOINT wikimedia_rdbms_atomic2; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; SELECT 5; SELECT 4; COMMIT' );
+               $this->assertEquals( 1, $callback4Called );
+               $this->assertEquals( 1, $callback5Called );
 
                $wrapper = TestingAccessWrapper::newFromObject( $this->database );
                $callback1Called = null;
                $callback2Called = null;
                $callback3Called = null;
+               $callback4Called = 0;
                $this->database->startAtomic( __METHOD__ . '_outer' );
                $this->database->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
                $this->database->startAtomic( __METHOD__ . '_inner' );
                $this->database->onTransactionCommitOrIdle( $callback1, __METHOD__ );
                $this->database->onTransactionPreCommitOrIdle( $callback2, __METHOD__ );
                $this->database->onTransactionResolution( $callback3, __METHOD__ );
+               $this->database->onAtomicSectionCancel( $callback4, __METHOD__ );
                $wrapper->trxStatus = Database::STATUS_TRX_ERROR;
                $this->database->cancelAtomic( __METHOD__ . '_inner' );
                $this->database->cancelAtomic( __METHOD__ );
@@ -1795,6 +1894,7 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                $this->assertNull( $callback1Called );
                $this->assertNull( $callback2Called );
                $this->assertEquals( IDatabase::TRIGGER_ROLLBACK, $callback3Called );
+               $this->assertEquals( 1, $callback4Called );
        }
 
        /**
@@ -1876,6 +1976,22 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                }
        }
 
+       /**
+        * @covers \Wikimedia\Rdbms\Database::onAtomicSectionCancel
+        */
+       public function testNoAtomicSectionForCallback() {
+               try {
+                       $this->database->onAtomicSectionCancel( function () {
+                       }, __METHOD__ );
+                       $this->fail( 'Expected exception not thrown' );
+               } catch ( DBUnexpectedError $ex ) {
+                       $this->assertSame(
+                               'No atomic section is open (got ' . __METHOD__ . ').',
+                               $ex->getMessage()
+                       );
+               }
+       }
+
        /**
         * @expectedException \Wikimedia\Rdbms\DBTransactionStateError
         * @covers \Wikimedia\Rdbms\Database::assertQueryIsCurrentlyAllowed
@@ -2091,6 +2207,9 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                        $this->database->onTransactionCommitOrIdle( function () use ( $fname ) {
                                $this->database->query( 'SELECT 1', $fname );
                        } );
+                       $this->database->onAtomicSectionCancel( function () use ( $fname ) {
+                               $this->database->query( 'SELECT 2', $fname );
+                       } );
                        $this->database->delete( 'x', [ 'field' => 3 ], __METHOD__ );
                        $this->database->close();
                        $this->fail( 'Expected exception not thrown' );
@@ -2103,7 +2222,7 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                }
 
                $this->assertFalse( $this->database->isOpen() );
-               $this->assertLastSql( 'BEGIN; DELETE FROM x WHERE field = \'3\'; ROLLBACK' );
+               $this->assertLastSql( 'BEGIN; DELETE FROM x WHERE field = \'3\'; ROLLBACK; SELECT 2' );
                $this->assertEquals( 0, $this->database->trxLevel() );
        }