rdbms: allow cancelation of dangling nested atomic sections
[lhc/web/wiklou.git] / includes / libs / rdbms / database / Database.php
index 5bffcea..452b4f8 100644 (file)
@@ -101,6 +101,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        protected $queryLogger;
        /** @var callback Error logging callback */
        protected $errorLogger;
+       /** @var callback Deprecation logging callback */
+       protected $deprecationLogger;
 
        /** @var resource|null Database connection */
        protected $conn = null;
@@ -149,6 +151,11 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         * @var Exception|null The last error that caused the status to become STATUS_TRX_ERROR
         */
        protected $trxStatusCause;
+       /**
+        * @var array|null If wasKnownStatementRollbackError() prevented trxStatus from being set,
+        *  the relevant details are stored here.
+        */
+       protected $trxStatusIgnoredCause;
        /**
         * Either 1 if a transaction is active or 0 otherwise.
         * The other Trx fields may not be meaningfull if this is 0.
@@ -205,7 +212,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        /**
         * Array of levels of atomicity within transactions
         *
-        * @var array
+        * @var array List of (name, unique ID, savepoint ID)
         */
        private $trxAtomicLevels = [];
        /**
@@ -312,6 +319,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                $this->connLogger = $params['connLogger'];
                $this->queryLogger = $params['queryLogger'];
                $this->errorLogger = $params['errorLogger'];
+               $this->deprecationLogger = $params['deprecationLogger'];
 
                if ( isset( $params['nonNativeInsertSelectBatchSize'] ) ) {
                        $this->nonNativeInsertSelectBatchSize = $params['nonNativeInsertSelectBatchSize'];
@@ -396,6 +404,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         *      includes the agent as a SQL comment.
         *   - trxProfiler: Optional TransactionProfiler instance.
         *   - errorLogger: Optional callback that takes an Exception and logs it.
+        *   - deprecationLogger: Optional callback that takes a string and logs it.
         *   - cliMode: Whether to consider the execution context that of a CLI script.
         *   - agent: Optional name used to identify the end-user in query profiling/logging.
         *   - srvCache: Optional BagOStuff instance to an APC-style cache.
@@ -437,6 +446,11 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                                        trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
                                };
                        }
+                       if ( !isset( $p['deprecationLogger'] ) ) {
+                               $p['deprecationLogger'] = function ( $msg ) {
+                                       trigger_error( $msg, E_USER_DEPRECATED );
+                               };
+                       }
 
                        /** @var Database $conn */
                        $conn = new $class( $p );
@@ -658,6 +672,20 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                );
        }
 
+       /**
+        * @return string|null
+        */
+       final protected function getTransactionRoundId() {
+               // If transaction round participation is enabled, see if one is active
+               if ( $this->getFlag( self::DBO_TRX ) ) {
+                       $id = $this->getLBInfo( 'trxRoundId' );
+
+                       return is_string( $id ) ? $id : null;
+               }
+
+               return null;
+       }
+
        public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL ) {
                if ( !$this->trxLevel ) {
                        return false;
@@ -1047,6 +1075,9 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        public function query( $sql, $fname = __METHOD__, $tempIgnore = false ) {
                $this->assertTransactionStatus( $sql, $fname );
 
+               # Avoid fatals if close() was called
+               $this->assertOpen();
+
                $priorWritesPending = $this->writesOrCallbacksPending();
                $this->lastQuery = $sql;
 
@@ -1097,9 +1128,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        $this->queryLogger->debug( "{$this->dbName} {$commentedSql}" );
                }
 
-               # Avoid fatals if close() was called
-               $this->assertOpen();
-
                # Send the query to the server and fetch any corresponding errors
                $ret = $this->doProfiledQuery( $sql, $commentedSql, $isNonTempWrite, $fname );
                $lastError = $this->lastError();
@@ -1125,24 +1153,29 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                }
 
                if ( $ret === false ) {
-                       if ( $this->trxLevel && !$this->wasKnownStatementRollbackError() ) {
-                               # Either the query was aborted or all queries after BEGIN where aborted.
-                               if ( $this->explicitTrxActive() || $priorWritesPending ) {
-                                       # In the first case, the only options going forward are (a) ROLLBACK, or
-                                       # (b) ROLLBACK TO SAVEPOINT (if one was set). If the later case, the only
-                                       # option is ROLLBACK, since the snapshots would have been released.
-                                       if ( is_object( $tempIgnore ) ) {
-                                               // Ugly hack to know that savepoints are in use for postgres
-                                               // FIXME: remove this and make DatabasePostgres use ATOMIC_CANCELABLE
-                                       } else {
+                       if ( $this->trxLevel ) {
+                               if ( !$this->wasKnownStatementRollbackError() ) {
+                                       # Either the query was aborted or all queries after BEGIN where aborted.
+                                       if ( $this->explicitTrxActive() || $priorWritesPending ) {
+                                               # In the first case, the only options going forward are (a) ROLLBACK, or
+                                               # (b) ROLLBACK TO SAVEPOINT (if one was set). If the later case, the only
+                                               # option is ROLLBACK, since the snapshots would have been released.
                                                $this->trxStatus = self::STATUS_TRX_ERROR;
                                                $this->trxStatusCause =
                                                        $this->makeQueryException( $lastError, $lastErrno, $sql, $fname );
                                                $tempIgnore = false; // cannot recover
+                                       } else {
+                                               # Nothing prior was there to lose from the transaction,
+                                               # so just roll it back.
+                                               $this->doRollback( __METHOD__ . " ($fname)" );
+                                               $this->trxStatus = self::STATUS_TRX_OK;
                                        }
+                                       $this->trxStatusIgnoredCause = null;
                                } else {
-                                       # Nothing prior was there to lose from the transaction
-                                       $this->trxStatus = self::STATUS_TRX_OK;
+                                       # We're ignoring an error that caused just the current query to be aborted.
+                                       # But log the cause so we can log a deprecation notice if a
+                                       # caller actually does ignore it.
+                                       $this->trxStatusIgnoredCause = [ $lastError, $lastErrno, $fname ];
                                }
                        }
 
@@ -1253,16 +1286,24 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         * @throws DBTransactionStateError
         */
        private function assertTransactionStatus( $sql, $fname ) {
-               if (
-                       $this->trxStatus < self::STATUS_TRX_OK &&
-                       $this->getQueryVerb( $sql ) !== 'ROLLBACK' // transaction/savepoint
-               ) {
+               if ( $this->getQueryVerb( $sql ) === 'ROLLBACK' ) { // transaction/savepoint
+                       return;
+               }
+
+               if ( $this->trxStatus < self::STATUS_TRX_OK ) {
                        throw new DBTransactionStateError(
                                $this,
-                               "Cannot execute query from $fname while transaction status is ERROR. ",
+                               "Cannot execute query from $fname while transaction status is ERROR.",
                                [],
                                $this->trxStatusCause
                        );
+               } elseif ( $this->trxStatus === self::STATUS_TRX_OK && $this->trxStatusIgnoredCause ) {
+                       list( $iLastError, $iLastErrno, $iFname ) = $this->trxStatusIgnoredCause;
+                       call_user_func( $this->deprecationLogger,
+                               "Caller from $fname ignored an error originally raised from $iFname: " .
+                               "[$iLastErrno] $iLastError"
+                       );
+                       $this->trxStatusIgnoredCause = null;
                }
        }
 
@@ -1304,7 +1345,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        private function handleSessionLoss() {
                // Clean up tracking of session-level things...
                // https://dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
-               // https://www.postgresql.org/docs/9.1/static/sql-createtable.html (ignoring ON COMMIT)
+               // https://www.postgresql.org/docs/9.2/static/sql-createtable.html (ignoring ON COMMIT)
                $this->sessionTempTables = [];
                // https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_get-lock
                // https://www.postgresql.org/docs/9.4/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
@@ -3180,6 +3221,12 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        final public function onTransactionIdle( callable $callback, $fname = __METHOD__ ) {
+               if ( !$this->trxLevel && $this->getTransactionRoundId() ) {
+                       // Start an implicit transaction similar to how query() does
+                       $this->begin( __METHOD__, self::TRANSACTION_INTERNAL );
+                       $this->trxAutomatic = true;
+               }
+
                $this->trxIdleCallbacks[] = [ $callback, $fname ];
                if ( !$this->trxLevel ) {
                        $this->runOnTransactionIdleCallbacks( self::TRIGGER_IDLE );
@@ -3187,10 +3234,13 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        final public function onTransactionPreCommitOrIdle( callable $callback, $fname = __METHOD__ ) {
-               if ( $this->trxLevel || $this->getFlag( self::DBO_TRX ) ) {
-                       // As long as DBO_TRX is set, writes will accumulate until the load balancer issues
-                       // an implicit commit of all peer databases. This is true even if a transaction has
-                       // not yet been triggered by writes; make sure $callback runs *after* any such writes.
+               if ( !$this->trxLevel && $this->getTransactionRoundId() ) {
+                       // Start an implicit transaction similar to how query() does
+                       $this->begin( __METHOD__, self::TRANSACTION_INTERNAL );
+                       $this->trxAutomatic = true;
+               }
+
+               if ( $this->trxLevel ) {
                        $this->trxPreCommitCallbacks[] = [ $callback, $fname ];
                } else {
                        // No transaction is active nor will start implicitly, so make one for this callback
@@ -3403,57 +3453,104 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        $this->doSavepoint( $savepointId, $fname );
                }
 
-               $this->trxAtomicLevels[] = [ $fname, $savepointId ];
+               $sectionId = new AtomicSectionIdentifier;
+               $this->trxAtomicLevels[] = [ $fname, $sectionId, $savepointId ];
+
+               return $sectionId;
        }
 
        final public function endAtomic( $fname = __METHOD__ ) {
-               if ( !$this->trxLevel ) {
-                       throw new DBUnexpectedError( $this, "No atomic transaction is open (got $fname)." );
+               if ( !$this->trxLevel || !$this->trxAtomicLevels ) {
+                       throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
                }
 
-               list( $savedFname, $savepointId ) = $this->trxAtomicLevels
-                       ? array_pop( $this->trxAtomicLevels ) : [ null, null ];
+               // Check if the current section matches $fname
+               $pos = count( $this->trxAtomicLevels ) - 1;
+               list( $savedFname, , $savepointId ) = $this->trxAtomicLevels[$pos];
+
                if ( $savedFname !== $fname ) {
-                       throw new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname)." );
+                       throw new DBUnexpectedError(
+                               $this,
+                               "Invalid atomic section ended (got $fname but expected $savedFname)."
+                       );
                }
 
+               // Remove the last section and re-index the array
+               $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos );
+
                if ( !$this->trxAtomicLevels && $this->trxAutomaticAtomic ) {
                        $this->commit( $fname, self::FLUSHING_INTERNAL );
-               } elseif ( $savepointId && $savepointId !== 'n/a' ) {
+               } elseif ( $savepointId !== null && $savepointId !== 'n/a' ) {
                        $this->doReleaseSavepoint( $savepointId, $fname );
                }
        }
 
-       final public function cancelAtomic( $fname = __METHOD__ ) {
-               if ( !$this->trxLevel ) {
-                       throw new DBUnexpectedError( $this, "No atomic transaction is open (got $fname)." );
+       final public function cancelAtomic(
+               $fname = __METHOD__, AtomicSectionIdentifier $sectionId = null
+       ) {
+               if ( !$this->trxLevel || !$this->trxAtomicLevels ) {
+                       throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
                }
 
-               list( $savedFname, $savepointId ) = $this->trxAtomicLevels
-                       ? array_pop( $this->trxAtomicLevels ) : [ null, null ];
-               if ( $savedFname !== $fname ) {
-                       throw new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname)." );
+               if ( $sectionId !== null ) {
+                       // Find the (last) section with the given $sectionId
+                       $pos = -1;
+                       foreach ( $this->trxAtomicLevels as $i => list( $asFname, $asId, $spId ) ) {
+                               if ( $asId === $sectionId ) {
+                                       $pos = $i;
+                               }
+                       }
+                       if ( $pos < 0 ) {
+                               throw new DBUnexpectedError( "Atomic section not found (for $fname)" );
+                       }
+                       // Remove all descendant sections and re-index the array
+                       $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
                }
-               if ( !$savepointId ) {
-                       throw new DBUnexpectedError( $this, "Uncancelable atomic section canceled (got $fname)." );
+
+               // Check if the current section matches $fname
+               $pos = count( $this->trxAtomicLevels ) - 1;
+               list( $savedFname, , $savepointId ) = $this->trxAtomicLevels[$pos];
+
+               if ( $savedFname !== $fname ) {
+                       throw new DBUnexpectedError(
+                               $this,
+                               "Invalid atomic section ended (got $fname but expected $savedFname)."
+                       );
                }
 
-               if ( !$this->trxAtomicLevels && $this->trxAutomaticAtomic ) {
-                       $this->rollback( $fname, self::FLUSHING_INTERNAL );
-               } elseif ( $savepointId !== 'n/a' ) {
-                       $this->doRollbackToSavepoint( $savepointId, $fname );
-                       $this->trxStatus = self::STATUS_TRX_OK; // no exception; recovered
+               // Remove the last section and re-index the array
+               $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos );
+
+               if ( $savepointId !== null ) {
+                       // Rollback the transaction to the state just before this atomic section
+                       if ( $savepointId === 'n/a' ) {
+                               $this->rollback( $fname, self::FLUSHING_INTERNAL );
+                       } else {
+                               $this->doRollbackToSavepoint( $savepointId, $fname );
+                               $this->trxStatus = self::STATUS_TRX_OK; // no exception; recovered
+                               $this->trxStatusIgnoredCause = null;
+                       }
+               } elseif ( $this->trxStatus > self::STATUS_TRX_ERROR ) {
+                       // Put the transaction into an error state if it's not already in one
+                       $this->trxStatus = self::STATUS_TRX_ERROR;
+                       $this->trxStatusCause = new DBUnexpectedError(
+                               $this,
+                               "Uncancelable atomic section canceled (got $fname)."
+                       );
                }
 
                $this->affectedRowCount = 0; // for the sake of consistency
        }
 
-       final public function doAtomicSection( $fname, callable $callback ) {
-               $this->startAtomic( $fname, self::ATOMIC_CANCELABLE );
+       final public function doAtomicSection(
+               $fname, callable $callback, $cancelable = self::ATOMIC_NOT_CANCELABLE
+       ) {
+               $sectionId = $this->startAtomic( $fname, $cancelable );
                try {
                        $res = call_user_func_array( $callback, [ $this, $fname ] );
                } catch ( Exception $e ) {
-                       $this->cancelAtomic( $fname );
+                       $this->cancelAtomic( $fname, $sectionId );
+
                        throw $e;
                }
                $this->endAtomic( $fname );
@@ -3485,6 +3582,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
 
                $this->doBegin( $fname );
                $this->trxStatus = self::STATUS_TRX_OK;
+               $this->trxStatusIgnoredCause = null;
                $this->trxAtomicCounter = 0;
                $this->trxTimestamp = microtime( true );
                $this->trxFname = $fname;
@@ -3500,6 +3598,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                $this->trxWriteCallers = [];
                // First SELECT after BEGIN will establish the snapshot in REPEATABLE-READ.
                // Get an estimate of the replication lag before any such queries.
+               $this->trxReplicaLag = null; // clear cached value first
                $this->trxReplicaLag = $this->getApproximateLagStatus()['lag'];
                // T147697: make explicitTrxActive() return true until begin() finishes. This way, no
                // caller will think its OK to muck around with the transaction just because startAtomic()
@@ -3789,7 +3888,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        public function getSessionLagStatus() {
-               return $this->getTransactionLagStatus() ?: $this->getApproximateLagStatus();
+               return $this->getRecordedTransactionLagStatus() ?: $this->getApproximateLagStatus();
        }
 
        /**
@@ -3800,11 +3899,13 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         * is this lag plus transaction duration. If they don't, it is still
         * safe to be pessimistic. This returns null if there is no transaction.
         *
+        * This returns null if the lag status for this transaction was not yet recorded.
+        *
         * @return array|null ('lag': seconds or false on error, 'since': UNIX timestamp of BEGIN)
         * @since 1.27
         */
-       final protected function getTransactionLagStatus() {
-               return $this->trxLevel
+       final protected function getRecordedTransactionLagStatus() {
+               return ( $this->trxLevel && $this->trxReplicaLag !== null )
                        ? [ 'lag' => $this->trxReplicaLag, 'since' => $this->trxTimestamp() ]
                        : null;
        }