rdbms: allow cancelation of dangling nested atomic sections
authorAaron Schulz <aschulz@wikimedia.org>
Thu, 29 Mar 2018 07:23:10 +0000 (00:23 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Tue, 10 Apr 2018 23:34:31 +0000 (16:34 -0700)
* Make startAtomic() return a token that can be used with cancelAtomic()
  cancel any nested atomic sections that have not yet been ended.
* Make doAtomicSection() clear dangling nested sections by default.
* Also give doAtomicSection() a $cancelable parameter, having the
  same default as startAtomic().

Change-Id: I75fa234cb1dcfef17dc9a973a3b02d2607efa98e

autoload.php
includes/libs/rdbms/database/AtomicSectionIdentifier.php [new file with mode: 0644]
includes/libs/rdbms/database/DBConnRef.php
includes/libs/rdbms/database/Database.php
includes/libs/rdbms/database/IDatabase.php
tests/phpunit/includes/libs/rdbms/database/DatabaseSQLTest.php

index 4076620..fc610cf 100644 (file)
@@ -1675,6 +1675,7 @@ $wgAutoloadLocalClasses = [
        'WikiTextStructure' => __DIR__ . '/includes/content/WikiTextStructure.php',
        'Wikimedia\\Http\\HttpAcceptNegotiator' => __DIR__ . '/includes/libs/http/HttpAcceptNegotiator.php',
        'Wikimedia\\Http\\HttpAcceptParser' => __DIR__ . '/includes/libs/http/HttpAcceptParser.php',
        'WikiTextStructure' => __DIR__ . '/includes/content/WikiTextStructure.php',
        'Wikimedia\\Http\\HttpAcceptNegotiator' => __DIR__ . '/includes/libs/http/HttpAcceptNegotiator.php',
        'Wikimedia\\Http\\HttpAcceptParser' => __DIR__ . '/includes/libs/http/HttpAcceptParser.php',
+       'Wikimedia\\Rdbms\\AtomicSectionIdentifier' => __DIR__ . '/includes/libs/rdbms/database/AtomicSectionIdentifier.php',
        'Wikimedia\\Rdbms\\Blob' => __DIR__ . '/includes/libs/rdbms/encasing/Blob.php',
        'Wikimedia\\Rdbms\\ChronologyProtector' => __DIR__ . '/includes/libs/rdbms/ChronologyProtector.php',
        'Wikimedia\\Rdbms\\ConnectionManager' => __DIR__ . '/includes/libs/rdbms/connectionmanager/ConnectionManager.php',
        'Wikimedia\\Rdbms\\Blob' => __DIR__ . '/includes/libs/rdbms/encasing/Blob.php',
        'Wikimedia\\Rdbms\\ChronologyProtector' => __DIR__ . '/includes/libs/rdbms/ChronologyProtector.php',
        'Wikimedia\\Rdbms\\ConnectionManager' => __DIR__ . '/includes/libs/rdbms/connectionmanager/ConnectionManager.php',
diff --git a/includes/libs/rdbms/database/AtomicSectionIdentifier.php b/includes/libs/rdbms/database/AtomicSectionIdentifier.php
new file mode 100644 (file)
index 0000000..c6e3d44
--- /dev/null
@@ -0,0 +1,27 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Database
+ */
+namespace Wikimedia\Rdbms;
+
+/**
+ * Class used for token representing identifiers for atomic sections from IDatabase instances
+ */
+class AtomicSectionIdentifier {
+}
index 11ce957..c94f62f 100644 (file)
@@ -505,11 +505,13 @@ class DBConnRef implements IDatabase {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
 
                return $this->__call( __FUNCTION__, func_get_args() );
        }
 
-       public function cancelAtomic( $fname = __METHOD__ ) {
+       public function cancelAtomic( $fname = __METHOD__, AtomicSectionIdentifier $sectionId = null ) {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
 
                return $this->__call( __FUNCTION__, func_get_args() );
        }
 
-       public function doAtomicSection( $fname, callable $callback ) {
+       public function doAtomicSection(
+               $fname, callable $callback, $cancelable = self::ATOMIC_NOT_CANCELABLE
+       ) {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
 
                return $this->__call( __FUNCTION__, func_get_args() );
        }
 
index 5b259bd..452b4f8 100644 (file)
@@ -212,7 +212,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        /**
         * Array of levels of atomicity within transactions
         *
        /**
         * Array of levels of atomicity within transactions
         *
-        * @var array
+        * @var array List of (name, unique ID, savepoint ID)
         */
        private $trxAtomicLevels = [];
        /**
         */
        private $trxAtomicLevels = [];
        /**
@@ -1293,7 +1293,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                if ( $this->trxStatus < self::STATUS_TRX_OK ) {
                        throw new DBTransactionStateError(
                                $this,
                if ( $this->trxStatus < self::STATUS_TRX_OK ) {
                        throw new DBTransactionStateError(
                                $this,
-                               "Cannot execute query from $fname while transaction status is ERROR. ",
+                               "Cannot execute query from $fname while transaction status is ERROR.",
                                [],
                                $this->trxStatusCause
                        );
                                [],
                                $this->trxStatusCause
                        );
@@ -3453,58 +3453,104 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        $this->doSavepoint( $savepointId, $fname );
                }
 
                        $this->doSavepoint( $savepointId, $fname );
                }
 
-               $this->trxAtomicLevels[] = [ $fname, $savepointId ];
+               $sectionId = new AtomicSectionIdentifier;
+               $this->trxAtomicLevels[] = [ $fname, $sectionId, $savepointId ];
+
+               return $sectionId;
        }
 
        final public function endAtomic( $fname = __METHOD__ ) {
        }
 
        final public function endAtomic( $fname = __METHOD__ ) {
-               if ( !$this->trxLevel ) {
-                       throw new DBUnexpectedError( $this, "No atomic transaction is open (got $fname)." );
+               if ( !$this->trxLevel || !$this->trxAtomicLevels ) {
+                       throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
                }
 
                }
 
-               list( $savedFname, $savepointId ) = $this->trxAtomicLevels
-                       ? array_pop( $this->trxAtomicLevels ) : [ null, null ];
+               // Check if the current section matches $fname
+               $pos = count( $this->trxAtomicLevels ) - 1;
+               list( $savedFname, , $savepointId ) = $this->trxAtomicLevels[$pos];
+
                if ( $savedFname !== $fname ) {
                if ( $savedFname !== $fname ) {
-                       throw new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname)." );
+                       throw new DBUnexpectedError(
+                               $this,
+                               "Invalid atomic section ended (got $fname but expected $savedFname)."
+                       );
                }
 
                }
 
+               // Remove the last section and re-index the array
+               $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos );
+
                if ( !$this->trxAtomicLevels && $this->trxAutomaticAtomic ) {
                        $this->commit( $fname, self::FLUSHING_INTERNAL );
                if ( !$this->trxAtomicLevels && $this->trxAutomaticAtomic ) {
                        $this->commit( $fname, self::FLUSHING_INTERNAL );
-               } elseif ( $savepointId && $savepointId !== 'n/a' ) {
+               } elseif ( $savepointId !== null && $savepointId !== 'n/a' ) {
                        $this->doReleaseSavepoint( $savepointId, $fname );
                }
        }
 
                        $this->doReleaseSavepoint( $savepointId, $fname );
                }
        }
 
-       final public function cancelAtomic( $fname = __METHOD__ ) {
-               if ( !$this->trxLevel ) {
-                       throw new DBUnexpectedError( $this, "No atomic transaction is open (got $fname)." );
+       final public function cancelAtomic(
+               $fname = __METHOD__, AtomicSectionIdentifier $sectionId = null
+       ) {
+               if ( !$this->trxLevel || !$this->trxAtomicLevels ) {
+                       throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
                }
 
                }
 
-               list( $savedFname, $savepointId ) = $this->trxAtomicLevels
-                       ? array_pop( $this->trxAtomicLevels ) : [ null, null ];
-               if ( $savedFname !== $fname ) {
-                       throw new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname)." );
+               if ( $sectionId !== null ) {
+                       // Find the (last) section with the given $sectionId
+                       $pos = -1;
+                       foreach ( $this->trxAtomicLevels as $i => list( $asFname, $asId, $spId ) ) {
+                               if ( $asId === $sectionId ) {
+                                       $pos = $i;
+                               }
+                       }
+                       if ( $pos < 0 ) {
+                               throw new DBUnexpectedError( "Atomic section not found (for $fname)" );
+                       }
+                       // Remove all descendant sections and re-index the array
+                       $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
                }
                }
-               if ( !$savepointId ) {
-                       throw new DBUnexpectedError( $this, "Uncancelable atomic section canceled (got $fname)." );
+
+               // Check if the current section matches $fname
+               $pos = count( $this->trxAtomicLevels ) - 1;
+               list( $savedFname, , $savepointId ) = $this->trxAtomicLevels[$pos];
+
+               if ( $savedFname !== $fname ) {
+                       throw new DBUnexpectedError(
+                               $this,
+                               "Invalid atomic section ended (got $fname but expected $savedFname)."
+                       );
                }
 
                }
 
-               if ( !$this->trxAtomicLevels && $this->trxAutomaticAtomic ) {
-                       $this->rollback( $fname, self::FLUSHING_INTERNAL );
-               } elseif ( $savepointId !== 'n/a' ) {
-                       $this->doRollbackToSavepoint( $savepointId, $fname );
-                       $this->trxStatus = self::STATUS_TRX_OK; // no exception; recovered
-                       $this->trxStatusIgnoredCause = null;
+               // Remove the last section and re-index the array
+               $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos );
+
+               if ( $savepointId !== null ) {
+                       // Rollback the transaction to the state just before this atomic section
+                       if ( $savepointId === 'n/a' ) {
+                               $this->rollback( $fname, self::FLUSHING_INTERNAL );
+                       } else {
+                               $this->doRollbackToSavepoint( $savepointId, $fname );
+                               $this->trxStatus = self::STATUS_TRX_OK; // no exception; recovered
+                               $this->trxStatusIgnoredCause = null;
+                       }
+               } elseif ( $this->trxStatus > self::STATUS_TRX_ERROR ) {
+                       // Put the transaction into an error state if it's not already in one
+                       $this->trxStatus = self::STATUS_TRX_ERROR;
+                       $this->trxStatusCause = new DBUnexpectedError(
+                               $this,
+                               "Uncancelable atomic section canceled (got $fname)."
+                       );
                }
 
                $this->affectedRowCount = 0; // for the sake of consistency
        }
 
                }
 
                $this->affectedRowCount = 0; // for the sake of consistency
        }
 
-       final public function doAtomicSection( $fname, callable $callback ) {
-               $this->startAtomic( $fname, self::ATOMIC_CANCELABLE );
+       final public function doAtomicSection(
+               $fname, callable $callback, $cancelable = self::ATOMIC_NOT_CANCELABLE
+       ) {
+               $sectionId = $this->startAtomic( $fname, $cancelable );
                try {
                        $res = call_user_func_array( $callback, [ $this, $fname ] );
                } catch ( Exception $e ) {
                try {
                        $res = call_user_func_array( $callback, [ $this, $fname ] );
                } catch ( Exception $e ) {
-                       $this->cancelAtomic( $fname );
+                       $this->cancelAtomic( $fname, $sectionId );
+
                        throw $e;
                }
                $this->endAtomic( $fname );
                        throw $e;
                }
                $this->endAtomic( $fname );
index 5876d6b..9d33cf4 100644 (file)
@@ -1575,6 +1575,7 @@ interface IDatabase {
         * @param string $fname
         * @param string $cancelable Pass self::ATOMIC_CANCELABLE to use a
         *  savepoint and enable self::cancelAtomic() for this section.
         * @param string $fname
         * @param string $cancelable Pass self::ATOMIC_CANCELABLE to use a
         *  savepoint and enable self::cancelAtomic() for this section.
+        * @return AtomicSectionIdentifier section ID token
         * @throws DBError
         */
        public function startAtomic( $fname = __METHOD__, $cancelable = self::ATOMIC_NOT_CANCELABLE );
         * @throws DBError
         */
        public function startAtomic( $fname = __METHOD__, $cancelable = self::ATOMIC_NOT_CANCELABLE );
@@ -1610,9 +1611,11 @@ interface IDatabase {
         * @since 1.31
         * @see IDatabase::startAtomic
         * @param string $fname
         * @since 1.31
         * @see IDatabase::startAtomic
         * @param string $fname
+        * @param AtomicSectionIdentifier $sectionId Section ID from startAtomic();
+        *   passing this enables cancellation of unclosed nested sections [optional]
         * @throws DBError
         */
         * @throws DBError
         */
-       public function cancelAtomic( $fname = __METHOD__ );
+       public function cancelAtomic( $fname = __METHOD__, AtomicSectionIdentifier $sectionId = null );
 
        /**
         * Run a callback to do an atomic set of updates for this database
 
        /**
         * Run a callback to do an atomic set of updates for this database
@@ -1636,6 +1639,8 @@ interface IDatabase {
         *
         * @param string $fname Caller name (usually __METHOD__)
         * @param callable $callback Callback that issues DB updates
         *
         * @param string $fname Caller name (usually __METHOD__)
         * @param callable $callback Callback that issues DB updates
+        * @param string $cancelable Pass self::ATOMIC_CANCELABLE to use a
+        *  savepoint and enable self::cancelAtomic() for this section.
         * @return mixed $res Result of the callback (since 1.28)
         * @throws DBError
         * @throws RuntimeException
         * @return mixed $res Result of the callback (since 1.28)
         * @throws DBError
         * @throws RuntimeException
@@ -1644,7 +1649,9 @@ interface IDatabase {
         *  cancelAtomic(), and assumed no callers up the stack would ever try to
         *  catch the exception.
         */
         *  cancelAtomic(), and assumed no callers up the stack would ever try to
         *  catch the exception.
         */
-       public function doAtomicSection( $fname, callable $callback );
+       public function doAtomicSection(
+               $fname, callable $callback, $cancelable = self::ATOMIC_NOT_CANCELABLE
+       );
 
        /**
         * Begin a transaction. If a transaction is already in progress,
 
        /**
         * Begin a transaction. If a transaction is already in progress,
index 665e953..e4f4f7b 100644 (file)
@@ -6,6 +6,7 @@ use Wikimedia\Rdbms\Database;
 use Wikimedia\TestingAccessWrapper;
 use Wikimedia\Rdbms\DBTransactionStateError;
 use Wikimedia\Rdbms\DBUnexpectedError;
 use Wikimedia\TestingAccessWrapper;
 use Wikimedia\Rdbms\DBTransactionStateError;
 use Wikimedia\Rdbms\DBUnexpectedError;
+use Wikimedia\Rdbms\DBTransactionError;
 
 /**
  * Test the parts of the Database abstract class that deal
 
 /**
  * Test the parts of the Database abstract class that deal
@@ -1402,22 +1403,33 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                // phpcs:ignore Generic.Files.LineLength
                $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; COMMIT' );
 
                // phpcs:ignore Generic.Files.LineLength
                $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; COMMIT' );
 
-               $this->database->doAtomicSection( __METHOD__, function () {
-               } );
+               $noOpCallack = function () {
+               };
+
+               $this->database->doAtomicSection( __METHOD__, $noOpCallack, IDatabase::ATOMIC_CANCELABLE );
+               $this->assertLastSql( 'BEGIN; COMMIT' );
+
+               $this->database->doAtomicSection( __METHOD__, $noOpCallack );
                $this->assertLastSql( 'BEGIN; COMMIT' );
 
                $this->database->begin( __METHOD__ );
                $this->assertLastSql( 'BEGIN; COMMIT' );
 
                $this->database->begin( __METHOD__ );
-               $this->database->doAtomicSection( __METHOD__, function () {
-               } );
+               $this->database->doAtomicSection( __METHOD__, $noOpCallack, IDatabase::ATOMIC_CANCELABLE );
                $this->database->rollback( __METHOD__ );
                // phpcs:ignore Generic.Files.LineLength
                $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; RELEASE SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK' );
 
                $this->database->begin( __METHOD__ );
                try {
                $this->database->rollback( __METHOD__ );
                // phpcs:ignore Generic.Files.LineLength
                $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; RELEASE SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK' );
 
                $this->database->begin( __METHOD__ );
                try {
-                       $this->database->doAtomicSection( __METHOD__, function () {
-                               throw new RuntimeException( 'Test exception' );
-                       } );
+                       $this->database->doAtomicSection(
+                               __METHOD__,
+                               function () {
+                                       $this->database->startAtomic( 'inner_func1' );
+                                       $this->database->startAtomic( 'inner_func2' );
+
+                                       throw new RuntimeException( 'Test exception' );
+                               },
+                               IDatabase::ATOMIC_CANCELABLE
+                       );
                        $this->fail( 'Expected exception not thrown' );
                } catch ( RuntimeException $ex ) {
                        $this->assertSame( 'Test exception', $ex->getMessage() );
                        $this->fail( 'Expected exception not thrown' );
                } catch ( RuntimeException $ex ) {
                        $this->assertSame( 'Test exception', $ex->getMessage() );
@@ -1425,6 +1437,21 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                $this->database->commit( __METHOD__ );
                // phpcs:ignore Generic.Files.LineLength
                $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; COMMIT' );
                $this->database->commit( __METHOD__ );
                // phpcs:ignore Generic.Files.LineLength
                $this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; COMMIT' );
+
+               $this->database->begin( __METHOD__ );
+               try {
+                       $this->database->doAtomicSection(
+                               __METHOD__,
+                               function () {
+                                       throw new RuntimeException( 'Test exception' );
+                               }
+                       );
+                       $this->fail( 'Test exception not thrown' );
+               } catch ( RuntimeException $ex ) {
+                       $this->assertSame( 'Test exception', $ex->getMessage() );
+               }
+               $this->database->rollback( __METHOD__ );
+               $this->assertLastSql( 'BEGIN; ROLLBACK' );
        }
 
        public static function provideAtomicSectionMethodsForErrors() {
        }
 
        public static function provideAtomicSectionMethodsForErrors() {
@@ -1445,7 +1472,7 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                        $this->fail( 'Expected exception not thrown' );
                } catch ( DBUnexpectedError $ex ) {
                        $this->assertSame(
                        $this->fail( 'Expected exception not thrown' );
                } catch ( DBUnexpectedError $ex ) {
                        $this->assertSame(
-                               'No atomic transaction is open (got ' . __METHOD__ . ').',
+                               'No atomic section is open (got ' . __METHOD__ . ').',
                                $ex->getMessage()
                        );
                }
                                $ex->getMessage()
                        );
                }
@@ -1463,7 +1490,8 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                        $this->fail( 'Expected exception not thrown' );
                } catch ( DBUnexpectedError $ex ) {
                        $this->assertSame(
                        $this->fail( 'Expected exception not thrown' );
                } catch ( DBUnexpectedError $ex ) {
                        $this->assertSame(
-                               'Invalid atomic section ended (got ' . __METHOD__ . ').',
+                               'Invalid atomic section ended (got ' . __METHOD__ . ' but expected ' .
+                                       __METHOD__ . 'X' . ').',
                                $ex->getMessage()
                        );
                }
                                $ex->getMessage()
                        );
                }
@@ -1476,10 +1504,11 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                $this->database->startAtomic( __METHOD__ );
                try {
                        $this->database->cancelAtomic( __METHOD__ );
                $this->database->startAtomic( __METHOD__ );
                try {
                        $this->database->cancelAtomic( __METHOD__ );
+                       $this->database->select( 'test', '1', [], __METHOD__ );
                        $this->fail( 'Expected exception not thrown' );
                        $this->fail( 'Expected exception not thrown' );
-               } catch ( DBUnexpectedError $ex ) {
+               } catch ( DBTransactionError $ex ) {
                        $this->assertSame(
                        $this->assertSame(
-                               'Uncancelable atomic section canceled (got ' . __METHOD__ . ').',
+                               'Cannot execute query from ' . __METHOD__ . ' while transaction status is ERROR.',
                                $ex->getMessage()
                        );
                }
                                $ex->getMessage()
                        );
                }