rdbms: Add IDatabase::cancelAtomic()
authorBrad Jorsch <bjorsch@wikimedia.org>
Sat, 17 Mar 2018 21:59:56 +0000 (17:59 -0400)
committerAaron Schulz <aschulz@wikimedia.org>
Thu, 22 Mar 2018 05:57:42 +0000 (05:57 +0000)
Atomic sections are currently useful if you want to wrap some SQL
statements in a transaction when you might be called from inside someone
else's transaction, and you expect the caller to roll back everything if
you fail.

But there are some cases where you want to allow the caller to recover
from errors, in which case you need to roll back just the atomic
section. Savepoints are supported by all our databases and can be used
for this purpose, so let's do so.

Bug: T188660
Change-Id: Iee548619df89fd7fbd581b01106b8b41d3df71cc

RELEASE-NOTES-1.31
includes/libs/rdbms/database/DBConnRef.php
includes/libs/rdbms/database/Database.php
includes/libs/rdbms/database/DatabaseMssql.php
includes/libs/rdbms/database/IDatabase.php
tests/phpunit/includes/libs/rdbms/database/DatabaseSQLTest.php

index 48b0500..cd0fd4c 100644 (file)
@@ -66,6 +66,11 @@ production.
     the SQL query. The ActorMigration class may also be used to get feature-flagged
     information needed to access actor-related fields during the migration
     period.
+* Added Wikimedia\Rdbms\IDatabase::cancelAtomic(), to roll back an atomic
+  section without having to roll back the whole transaction.
+* Wikimedia\Rdbms\IDatabase::doAtomicSection(), non-native ::insertSelect(),
+  and non-MySQL ::replace() and ::upsert() no longer roll back the whole
+  transaction on failure.
 
 === External library changes in 1.31 ===
 
index 1f8e56c..c0855df 100644 (file)
@@ -519,6 +519,10 @@ class DBConnRef implements IDatabase {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
 
+       public function cancelAtomic( $fname = __METHOD__ ) {
+               return $this->__call( __FUNCTION__, func_get_args() );
+       }
+
        public function doAtomicSection( $fname, callable $callback ) {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
index 417f64c..c3e36da 100644 (file)
@@ -187,6 +187,12 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         * @see Database::trxLevel
         */
        private $trxAutomatic = false;
+       /**
+        * Counter for atomic savepoint identifiers. Reset when a new transaction begins.
+        *
+        * @var int
+        */
+       private $trxAtomicCounter = 0;
        /**
         * Array of levels of atomicity within transactions
         *
@@ -1241,6 +1247,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         */
        private function handleSessionLoss() {
                $this->trxLevel = 0;
+               $this->trxAtomicCounter = 0;
                $this->trxIdleCallbacks = []; // T67263; transaction already lost
                $this->trxPreCommitCallbacks = []; // T67263; transaction already lost
                $this->sessionTempTables = [];
@@ -2547,7 +2554,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        $this->endAtomic( $fname );
                        $this->affectedRowCount = $affectedRowCount;
                } catch ( Exception $e ) {
-                       $this->rollback( $fname, self::FLUSHING_INTERNAL );
+                       $this->cancelAtomic( $fname );
                        throw $e;
                }
        }
@@ -2630,7 +2637,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        $this->endAtomic( $fname );
                        $this->affectedRowCount = $affectedRowCount;
                } catch ( Exception $e ) {
-                       $this->rollback( $fname, self::FLUSHING_INTERNAL );
+                       $this->cancelAtomic( $fname );
                        throw $e;
                }
 
@@ -2798,11 +2805,11 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                                $this->endAtomic( $fname );
                                $this->affectedRowCount = $affectedRowCount;
                        } else {
-                               $this->rollback( $fname, self::FLUSHING_INTERNAL );
+                               $this->cancelAtomic( $fname );
                        }
                        return $ok;
                } catch ( Exception $e ) {
-                       $this->rollback( $fname, self::FLUSHING_INTERNAL );
+                       $this->cancelAtomic( $fname );
                        throw $e;
                }
        }
@@ -3093,7 +3100,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                                call_user_func( $callback );
                                $this->endAtomic( __METHOD__ );
                        } catch ( Exception $e ) {
-                               $this->rollback( __METHOD__, self::FLUSHING_INTERNAL );
+                               $this->cancelAtomic( __METHOD__ );
                                throw $e;
                        }
                }
@@ -3230,6 +3237,48 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                }
        }
 
+       /**
+        * Create a savepoint
+        *
+        * This is used internally to implement atomic sections. It should not be
+        * used otherwise.
+        *
+        * @since 1.31
+        * @param string $identifier Identifier for the savepoint
+        * @param string $fname Calling function name
+        */
+       protected function doSavepoint( $identifier, $fname ) {
+               $this->query( 'SAVEPOINT ' . $this->addIdentifierQuotes( $identifier ), $fname );
+       }
+
+       /**
+        * Release a savepoint
+        *
+        * This is used internally to implement atomic sections. It should not be
+        * used otherwise.
+        *
+        * @since 1.31
+        * @param string $identifier Identifier for the savepoint
+        * @param string $fname Calling function name
+        */
+       protected function doReleaseSavepoint( $identifier, $fname ) {
+               $this->query( 'RELEASE SAVEPOINT ' . $this->addIdentifierQuotes( $identifier ), $fname );
+       }
+
+       /**
+        * Rollback to a savepoint
+        *
+        * This is used internally to implement atomic sections. It should not be
+        * used otherwise.
+        *
+        * @since 1.31
+        * @param string $identifier Identifier for the savepoint
+        * @param string $fname Calling function name
+        */
+       protected function doRollbackToSavepoint( $identifier, $fname ) {
+               $this->query( 'ROLLBACK TO SAVEPOINT ' . $this->addIdentifierQuotes( $identifier ), $fname );
+       }
+
        final public function startAtomic( $fname = __METHOD__ ) {
                if ( !$this->trxLevel ) {
                        $this->begin( $fname, self::TRANSACTION_INTERNAL );
@@ -3238,32 +3287,68 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        if ( !$this->getFlag( self::DBO_TRX ) ) {
                                $this->trxAutomaticAtomic = true;
                        }
+                       $savepointId = null;
+               } else {
+                       $savepointId = 'wikimedia_rdbms_atomic' . ++$this->trxAtomicCounter;
+                       if ( strlen( $savepointId ) > 30 ) { // 30 == Oracle's identifier length limit (pre 12c)
+                               $this->queryLogger->warning(
+                                       'There have been an excessively large number of atomic sections in a transaction'
+                                       . " started by $this->trxFname, reusing IDs (at $fname)",
+                                       [ 'trace' => ( new RuntimeException() )->getTraceAsString() ]
+                               );
+                               $this->trxAtomicCounter = 0;
+                               $savepointId = 'wikimedia_rdbms_atomic' . ++$this->trxAtomicCounter;
+                       }
+                       $this->doSavepoint( $savepointId, $fname );
                }
 
-               $this->trxAtomicLevels[] = $fname;
+               $this->trxAtomicLevels[] = [ $fname, $savepointId ];
        }
 
        final public function endAtomic( $fname = __METHOD__ ) {
                if ( !$this->trxLevel ) {
                        throw new DBUnexpectedError( $this, "No atomic transaction is open (got $fname)." );
                }
-               if ( !$this->trxAtomicLevels ||
-                       array_pop( $this->trxAtomicLevels ) !== $fname
-               ) {
+
+               list( $savedFname, $savepointId ) = $this->trxAtomicLevels
+                       ? array_pop( $this->trxAtomicLevels ) : [ null, null ];
+               if ( $savedFname !== $fname ) {
                        throw new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname)." );
                }
 
-               if ( !$this->trxAtomicLevels && $this->trxAutomaticAtomic ) {
+               if ( !$savepointId ) {
                        $this->commit( $fname, self::FLUSHING_INTERNAL );
+               } else {
+                       $this->doReleaseSavepoint( $savepointId, $fname );
                }
        }
 
+       final public function cancelAtomic( $fname = __METHOD__ ) {
+               if ( !$this->trxLevel ) {
+                       throw new DBUnexpectedError( $this, "No atomic transaction is open (got $fname)." );
+               }
+
+               list( $savedFname, $savepointId ) = $this->trxAtomicLevels
+                       ? array_pop( $this->trxAtomicLevels ) : [ null, null ];
+               if ( $savedFname !== $fname ) {
+                       throw new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname)." );
+               }
+
+               if ( !$savepointId ) {
+                       $this->rollback( $fname, self::FLUSHING_INTERNAL );
+               } else {
+                       $this->doRollbackToSavepoint( $savepointId, $fname );
+               }
+
+               $this->affectedRowCount = 0; // for the sake of consistency
+       }
+
        final public function doAtomicSection( $fname, callable $callback ) {
                $this->startAtomic( $fname );
                try {
                        $res = call_user_func_array( $callback, [ $this, $fname ] );
                } catch ( Exception $e ) {
-                       $this->rollback( $fname, self::FLUSHING_INTERNAL );
+                       $this->cancelAtomic( $fname );
                        throw $e;
                }
                $this->endAtomic( $fname );
@@ -3275,7 +3360,9 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                // Protect against mismatched atomic section, transaction nesting, and snapshot loss
                if ( $this->trxLevel ) {
                        if ( $this->trxAtomicLevels ) {
-                               $levels = implode( ', ', $this->trxAtomicLevels );
+                               $levels = array_reduce( $this->trxAtomicLevels, function ( $accum, $v ) {
+                                       return $accum === null ? $v[0] : "$accum, " . $v[0];
+                               } );
                                $msg = "$fname: Got explicit BEGIN while atomic section(s) $levels are open.";
                                throw new DBUnexpectedError( $this, $msg );
                        } elseif ( !$this->trxAutomatic ) {
@@ -3294,6 +3381,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                $this->assertOpen();
 
                $this->doBegin( $fname );
+               $this->trxAtomicCounter = 0;
                $this->trxTimestamp = microtime( true );
                $this->trxFname = $fname;
                $this->trxDoneWrites = false;
@@ -3331,7 +3419,9 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        final public function commit( $fname = __METHOD__, $flush = '' ) {
                if ( $this->trxLevel && $this->trxAtomicLevels ) {
                        // There are still atomic sections open. This cannot be ignored
-                       $levels = implode( ', ', $this->trxAtomicLevels );
+                       $levels = array_reduce( $this->trxAtomicLevels, function ( $accum, $v ) {
+                               return $accum === null ? $v[0] : "$accum, " . $v[0];
+                       } );
                        throw new DBUnexpectedError(
                                $this,
                                "$fname: Got COMMIT while atomic sections $levels are still open."
index 1f6132b..773e548 100644 (file)
@@ -1053,6 +1053,19 @@ class DatabaseMssql extends Database {
                return false;
        }
 
+       protected function doSavepoint( $identifier, $fname ) {
+               $this->query( 'SAVE TRANSACTION ' . $this->addIdentifierQuotes( $identifier ), $fname );
+       }
+
+       protected function doReleaseSavepoint( $identifier, $fname ) {
+               // Not supported. Also not really needed, a new doSavepoint() for the
+               // same identifier will overwrite the old.
+       }
+
+       protected function doRollbackToSavepoint( $identifier, $fname ) {
+               $this->query( 'ROLLBACK TRANSACTION ' . $this->addIdentifierQuotes( $identifier ), $fname );
+       }
+
        /**
         * Begin a transaction, committing any previously open transaction
         * @param string $fname
index a5392c8..47954b6 100644 (file)
@@ -1580,19 +1580,19 @@ interface IDatabase {
        /**
         * Begin an atomic section of statements
         *
-        * If a transaction has been started already, just keep track of the given
-        * section name to make sure the transaction is not committed pre-maturely.
-        * This function can be used in layers (with sub-sections), so use a stack
-        * to keep track of the different atomic sections. If there is no transaction,
-        * start one implicitly.
+        * If a transaction has been started already, sets a savepoint and tracks
+        * the given section name to make sure the transaction is not committed
+        * pre-maturely. This function can be used in layers (with sub-sections),
+        * so use a stack to keep track of the different atomic sections. If there
+        * is no transaction, one is started implicitly.
         *
         * The goal of this function is to create an atomic section of SQL queries
         * without having to start a new transaction if it already exists.
         *
-        * All atomic levels *must* be explicitly closed using IDatabase::endAtomic(),
-        * and any database transactions cannot be began or committed until all atomic
-        * levels are closed. There is no such thing as implicitly opening or closing
-        * an atomic section.
+        * All atomic levels *must* be explicitly closed using IDatabase::endAtomic()
+        * or IDatabase::cancelAtomic(), and any database transactions cannot be
+        * began or committed until all atomic levels are closed. There is no such
+        * thing as implicitly opening or closing an atomic section.
         *
         * @since 1.23
         * @param string $fname
@@ -1613,6 +1613,26 @@ interface IDatabase {
         */
        public function endAtomic( $fname = __METHOD__ );
 
+       /**
+        * Cancel an atomic section of SQL statements
+        *
+        * This will roll back only the statements executed since the start of the
+        * most recent atomic section, and close that section. If a transaction was
+        * open before the corresponding startAtomic() call, any statements before
+        * that call are *not* rolled back and the transaction remains open. If the
+        * corresponding startAtomic() implicitly started a transaction, that
+        * transaction is rolled back.
+        *
+        * Note that a call to IDatabase::rollback() will also roll back any open
+        * atomic sections.
+        *
+        * @since 1.31
+        * @see IDatabase::startAtomic
+        * @param string $fname
+        * @throws DBError
+        */
+       public function cancelAtomic( $fname = __METHOD__ );
+
        /**
         * Run a callback to do an atomic set of updates for this database
         *
@@ -1620,17 +1640,18 @@ interface IDatabase {
         *   - This database object
         *   - The value of $fname
         *
-        * If any exception occurs in the callback, then rollback() will be called and the error will
-        * be re-thrown. It may also be that the rollback itself fails with an exception before then.
-        * In any case, such errors are expected to terminate the request, without any outside caller
-        * attempting to catch errors and commit anyway. Note that any rollback undoes all prior
-        * atomic section and uncommitted updates, which trashes the current request, requiring an
-        * error to be displayed.
+        * If any exception occurs in the callback, then cancelAtomic() will be
+        * called to back out any statements executed by the callback and the error
+        * will be re-thrown. It may also be that the cancel itself fails with an
+        * exception before then. In any case, such errors are expected to
+        * terminate the request, without any outside caller attempting to catch
+        * errors and commit anyway.
         *
-        * This can be an alternative to explicit startAtomic()/endAtomic() calls.
+        * This can be an alternative to explicit startAtomic()/endAtomic()/cancelAtomic() calls.
         *
         * @see Database::startAtomic
         * @see Database::endAtomic
+        * @see Database::cancelAtomic
         *
         * @param string $fname Caller name (usually __METHOD__)
         * @param callable $callback Callback that issues DB updates
@@ -1638,7 +1659,9 @@ interface IDatabase {
         * @throws DBError
         * @throws RuntimeException
         * @throws UnexpectedValueException
-        * @since 1.27
+        * @since 1.27; prior to 1.31 this did a rollback() instead of
+        *  cancelAtomic(), and assumed no callers up the stack would ever try to
+        *  catch the exception.
         */
        public function doAtomicSection( $fname, callable $callback );
 
index b883c11..badba96 100644 (file)
@@ -1352,4 +1352,110 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                $this->assertSame( 'CAST( fieldName AS INTEGER )', $output );
        }
 
+       /**
+        * @covers \Wikimedia\Rdbms\Database::doSavepoint
+        * @covers \Wikimedia\Rdbms\Database::doReleaseSavepoint
+        * @covers \Wikimedia\Rdbms\Database::doRollbackToSavepoint
+        * @covers \Wikimedia\Rdbms\Database::startAtomic
+        * @covers \Wikimedia\Rdbms\Database::endAtomic
+        * @covers \Wikimedia\Rdbms\Database::cancelAtomic
+        * @covers \Wikimedia\Rdbms\Database::doAtomicSection
+        */
+       public function testAtomicSections() {
+               $this->database->startAtomic( __METHOD__ );
+               $this->database->endAtomic( __METHOD__ );
+               $this->assertLastSql( 'BEGIN; COMMIT' );
+
+               $this->database->startAtomic( __METHOD__ );
+               $this->database->cancelAtomic( __METHOD__ );
+               $this->assertLastSql( 'BEGIN; ROLLBACK' );
+
+               $this->database->begin( __METHOD__ );
+               $this->database->startAtomic( __METHOD__ );
+               $this->database->endAtomic( __METHOD__ );
+               $this->database->commit( __METHOD__ );
+               // phpcs:ignore Generic.Files.LineLength
+               $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; RELEASE SAVEPOINT wikimedia_rdbms_atomic1; COMMIT' );
+
+               $this->database->begin( __METHOD__ );
+               $this->database->startAtomic( __METHOD__ );
+               $this->database->cancelAtomic( __METHOD__ );
+               $this->database->commit( __METHOD__ );
+               // phpcs:ignore Generic.Files.LineLength
+               $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; COMMIT' );
+
+               $this->database->startAtomic( __METHOD__ );
+               $this->database->startAtomic( __METHOD__ );
+               $this->database->cancelAtomic( __METHOD__ );
+               $this->database->endAtomic( __METHOD__ );
+               // phpcs:ignore Generic.Files.LineLength
+               $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; COMMIT' );
+
+               $this->database->doAtomicSection( __METHOD__, function () {
+               } );
+               $this->assertLastSql( 'BEGIN; COMMIT' );
+
+               $this->database->begin( __METHOD__ );
+               $this->database->doAtomicSection( __METHOD__, function () {
+               } );
+               $this->database->rollback( __METHOD__ );
+               // phpcs:ignore Generic.Files.LineLength
+               $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; RELEASE SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK' );
+
+               $this->database->begin( __METHOD__ );
+               try {
+                       $this->database->doAtomicSection( __METHOD__, function () {
+                               throw new RuntimeException( 'Test exception' );
+                       } );
+                       $this->fail( 'Expected exception not thrown' );
+               } catch ( RuntimeException $ex ) {
+                       $this->assertSame( 'Test exception', $ex->getMessage() );
+               }
+               $this->database->commit( __METHOD__ );
+               // phpcs:ignore Generic.Files.LineLength
+               $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; COMMIT' );
+       }
+
+       public static function provideAtomicSectionMethodsForErrors() {
+               return [
+                       [ 'endAtomic' ],
+                       [ 'cancelAtomic' ],
+               ];
+       }
+
+       /**
+        * @dataProvider provideAtomicSectionMethodsForErrors
+        * @covers \Wikimedia\Rdbms\Database::endAtomic
+        * @covers \Wikimedia\Rdbms\Database::cancelAtomic
+        */
+       public function testNoAtomicSection( $method ) {
+               try {
+                       $this->database->$method( __METHOD__ );
+                       $this->fail( 'Expected exception not thrown' );
+               } catch ( DBUnexpectedError $ex ) {
+                       $this->assertSame(
+                               'No atomic transaction is open (got ' . __METHOD__ . ').',
+                               $ex->getMessage()
+                       );
+               }
+       }
+
+       /**
+        * @dataProvider provideAtomicSectionMethodsForErrors
+        * @covers \Wikimedia\Rdbms\Database::endAtomic
+        * @covers \Wikimedia\Rdbms\Database::cancelAtomic
+        */
+       public function testInvalidAtomicSectionEnded( $method ) {
+               $this->database->startAtomic( __METHOD__ . 'X' );
+               try {
+                       $this->database->$method( __METHOD__ );
+                       $this->fail( 'Expected exception not thrown' );
+               } catch ( DBUnexpectedError $ex ) {
+                       $this->assertSame(
+                               'Invalid atomic section ended (got ' . __METHOD__ . ').',
+                               $ex->getMessage()
+                       );
+               }
+       }
+
 }