Merge "Use {{int:}} on MediaWiki:Blockedtext and MediaWiki:Autoblockedtext"
[lhc/web/wiklou.git] / includes / libs / rdbms / loadbalancer / LoadBalancer.php
index 8de6064..360be42 100644 (file)
@@ -120,6 +120,8 @@ class LoadBalancer implements ILoadBalancer {
        private $connectionAttempted = false;
        /** @var int */
        private $maxLag = self::MAX_LAG_DEFAULT;
+       /** @var string Stage of the current transaction round in the transaction round life-cycle */
+       private $trxRoundStage = self::ROUND_CURSORY;
 
        /** @var int Warn when this many connection are held */
        const CONN_HELD_WARN_THRESHOLD = 10;
@@ -139,6 +141,19 @@ class LoadBalancer implements ILoadBalancer {
        const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
        const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
 
+       /** @var string Transaction round, explicit or implicit, has not finished writing */
+       const ROUND_CURSORY = 'cursory';
+       /** @var string Transaction round writes are complete and ready for pre-commit checks */
+       const ROUND_FINALIZED = 'finalized';
+       /** @var string Transaction round passed final pre-commit checks */
+       const ROUND_APPROVED = 'approved';
+       /** @var string Transaction round was committed and post-commit callbacks must be run */
+       const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
+       /** @var string Transaction round was rolled back and post-rollback callbacks must be run */
+       const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
+       /** @var string Transaction round encountered an error */
+       const ROUND_ERROR = 'error';
+
        public function __construct( array $params ) {
                if ( !isset( $params['servers'] ) ) {
                        throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
@@ -246,6 +261,14 @@ class LoadBalancer implements ILoadBalancer {
                if ( isset( $params['chronologyCallback'] ) ) {
                        $this->chronologyCallback = $params['chronologyCallback'];
                }
+
+               if ( isset( $params['roundStage'] ) ) {
+                       if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
+                               $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
+                       } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
+                               $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
+                       }
+               }
        }
 
        /**
@@ -1242,44 +1265,41 @@ class LoadBalancer implements ILoadBalancer {
        }
 
        public function commitAll( $fname = __METHOD__ ) {
-               $failures = [];
-
-               $restore = ( $this->trxRoundId !== false );
-               $this->trxRoundId = false;
-               $this->forEachOpenConnection(
-                       function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
-                               try {
-                                       $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
-                               } catch ( DBError $e ) {
-                                       call_user_func( $this->errorLogger, $e );
-                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
-                               }
-                               if ( $restore && $conn->getLBInfo( 'master' ) ) {
-                                       $this->undoTransactionRoundFlags( $conn );
-                               }
-                       }
-               );
-
-               if ( $failures ) {
-                       throw new DBExpectedError(
-                               null,
-                               "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
-                       );
-               }
+               $this->commitMasterChanges( $fname );
+               $this->flushMasterSnapshots( $fname );
+               $this->flushReplicaSnapshots( $fname );
        }
 
        public function finalizeMasterChanges() {
+               $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
+
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+               // Loop until callbacks stop adding callbacks on other connections
+               $total = 0;
+               do {
+                       $count = 0; // callbacks execution attempts
+                       $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$count ) {
+                               // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
+                               // Any error should cause all (peer) transactions to be rolled back together.
+                               $count += $conn->runOnTransactionPreCommitCallbacks();
+                       } );
+                       $total += $count;
+               } while ( $count > 0 );
+               // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
                $this->forEachOpenMasterConnection( function ( Database $conn ) {
-                       // Any error should cause all DB transactions to be rolled back together
-                       $conn->setTrxEndCallbackSuppression( false );
-                       $conn->runOnTransactionPreCommitCallbacks();
-                       // Defer post-commit callbacks until COMMIT finishes for all DBs
                        $conn->setTrxEndCallbackSuppression( true );
                } );
+               $this->trxRoundStage = self::ROUND_FINALIZED;
+
+               return $total;
        }
 
        public function approveMasterChanges( array $options ) {
+               $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
+
                $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
+
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
                $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
                        // If atomic sections or explicit transactions are still open, some caller must have
                        // caught an exception but failed to properly rollback any changes. Detect that and
@@ -1309,6 +1329,7 @@ class LoadBalancer implements ILoadBalancer {
                                );
                        }
                } );
+               $this->trxRoundStage = self::ROUND_APPROVED;
        }
 
        public function beginMasterChanges( $fname = __METHOD__ ) {
@@ -1318,32 +1339,26 @@ class LoadBalancer implements ILoadBalancer {
                                "$fname: Transaction round '{$this->trxRoundId}' already started."
                        );
                }
-               $this->trxRoundId = $fname;
+               $this->assertTransactionRoundStage( self::ROUND_CURSORY );
 
-               $failures = [];
-               $this->forEachOpenMasterConnection(
-                       function ( Database $conn ) use ( $fname, &$failures ) {
-                               $conn->setTrxEndCallbackSuppression( true );
-                               try {
-                                       $conn->flushSnapshot( $fname );
-                               } catch ( DBError $e ) {
-                                       call_user_func( $this->errorLogger, $e );
-                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
-                               }
-                               $conn->setTrxEndCallbackSuppression( false );
-                               $this->applyTransactionRoundFlags( $conn );
-                       }
-               );
+               // Clear any empty transactions (no writes/callbacks) from the implicit round
+               $this->flushMasterSnapshots( $fname );
 
-               if ( $failures ) {
-                       throw new DBExpectedError(
-                               null,
-                               "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
-                       );
-               }
+               $this->trxRoundId = $fname;
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+               // Mark applicable handles as participating in this explicit transaction round.
+               // For each of these handles, any writes and callbacks will be tied to a single
+               // transaction. The (peer) handles will reject begin()/commit() calls unless they
+               // are part of an en masse commit or an en masse rollback.
+               $this->forEachOpenMasterConnection( function ( Database $conn ) {
+                       $this->applyTransactionRoundFlags( $conn );
+               } );
+               $this->trxRoundStage = self::ROUND_CURSORY;
        }
 
        public function commitMasterChanges( $fname = __METHOD__ ) {
+               $this->assertTransactionRoundStage( self::ROUND_APPROVED );
+
                $failures = [];
 
                /** @noinspection PhpUnusedLocalVariableInspection */
@@ -1351,62 +1366,125 @@ class LoadBalancer implements ILoadBalancer {
 
                $restore = ( $this->trxRoundId !== false );
                $this->trxRoundId = false;
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+               // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
+               // Note that callbacks should already be suppressed due to finalizeMasterChanges().
                $this->forEachOpenMasterConnection(
-                       function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
+                       function ( IDatabase $conn ) use ( $fname, &$failures ) {
                                try {
-                                       if ( $conn->writesOrCallbacksPending() ) {
-                                               $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
-                                       } elseif ( $restore ) {
-                                               $conn->flushSnapshot( $fname );
-                                       }
+                                       $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
                                } catch ( DBError $e ) {
                                        call_user_func( $this->errorLogger, $e );
                                        $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
                                }
-                               if ( $restore ) {
-                                       $this->undoTransactionRoundFlags( $conn );
-                               }
                        }
                );
-
                if ( $failures ) {
-                       throw new DBExpectedError(
+                       throw new DBTransactionError(
                                null,
                                "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
                        );
                }
+               if ( $restore ) {
+                       // Unmark handles as participating in this explicit transaction round
+                       $this->forEachOpenMasterConnection( function ( Database $conn ) {
+                               $this->undoTransactionRoundFlags( $conn );
+                       } );
+               }
+               $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
        }
 
-       public function runMasterPostTrxCallbacks( $type ) {
+       public function runMasterTransactionIdleCallbacks() {
+               if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
+                       $type = IDatabase::TRIGGER_COMMIT;
+               } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
+                       $type = IDatabase::TRIGGER_ROLLBACK;
+               } else {
+                       throw new DBTransactionError(
+                               null,
+                               "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
+                       );
+               }
+
+               $oldStage = $this->trxRoundStage;
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+
+               // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
+               $this->forEachOpenMasterConnection( function ( Database $conn ) {
+                       $conn->setTrxEndCallbackSuppression( false );
+               } );
+
                $e = null; // first exception
+               // Loop until callbacks stop adding callbacks on other connections
+               do {
+                       // Run any pending callbacks for each connection...
+                       $count = 0; // callback execution attempts
+                       $this->forEachOpenMasterConnection(
+                               function ( Database $conn ) use ( $type, &$e, &$count ) {
+                                       if ( $conn->trxLevel() ) {
+                                               return; // retry in the next iteration, after commit() is called
+                                       }
+                                       try {
+                                               $count += $conn->runOnTransactionIdleCallbacks( $type );
+                                       } catch ( Exception $ex ) {
+                                               $e = $e ?: $ex;
+                                       }
+                               }
+                       );
+                       // Clear out any active transactions left over from callbacks...
+                       $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$e ) {
+                               if ( $conn->writesPending() ) {
+                                       // A callback from another handle wrote to this one and DBO_TRX is set
+                                       $this->queryLogger->warning( __METHOD__ . ": found writes pending." );
+                                       $fnames = implode( ', ', $conn->pendingWriteAndCallbackCallers() );
+                                       $this->queryLogger->warning(
+                                               __METHOD__ . ": found writes pending ($fnames).",
+                                               [
+                                                       'db_server' => $conn->getServer(),
+                                                       'db_name' => $conn->getDBname()
+                                               ]
+                                       );
+                               } elseif ( $conn->trxLevel() ) {
+                                       // A callback from another handle read from this one and DBO_TRX is set,
+                                       // which can easily happen if there is only one DB (no replicas)
+                                       $this->queryLogger->debug( __METHOD__ . ": found empty transaction." );
+                               }
+                               try {
+                                       $conn->commit( __METHOD__, $conn::FLUSHING_ALL_PEERS );
+                               } catch ( Exception $ex ) {
+                                       $e = $e ?: $ex;
+                               }
+                       } );
+               } while ( $count > 0 );
+
+               $this->trxRoundStage = $oldStage;
+
+               return $e;
+       }
+
+       public function runMasterTransactionListenerCallbacks() {
+               if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
+                       $type = IDatabase::TRIGGER_COMMIT;
+               } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
+                       $type = IDatabase::TRIGGER_ROLLBACK;
+               } else {
+                       throw new DBTransactionError(
+                               null,
+                               "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
+                       );
+               }
+
+               $e = null;
+
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
                $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
-                       $conn->setTrxEndCallbackSuppression( false );
-                       // Callbacks run in AUTO-COMMIT mode, so make sure no transactions are pending...
-                       if ( $conn->writesPending() ) {
-                               // This happens if onTransactionIdle() callbacks write to *other* handles
-                               // (which already finished their callbacks). Let any callbacks run in the final
-                               // commitMasterChanges() in LBFactory::shutdown(), when the transaction is gone.
-                               $this->queryLogger->warning( __METHOD__ . ": found writes pending." );
-                               return;
-                       } elseif ( $conn->trxLevel() ) {
-                               // This happens for single-DB setups where DB_REPLICA uses the master DB,
-                               // thus leaving an implicit read-only transaction open at this point. It
-                               // also happens if onTransactionIdle() callbacks leave implicit transactions
-                               // open on *other* DBs (which is slightly improper). Let these COMMIT on the
-                               // next call to commitMasterChanges(), possibly in LBFactory::shutdown().
-                               return;
-                       }
-                       try {
-                               $conn->runOnTransactionIdleCallbacks( $type );
-                       } catch ( Exception $ex ) {
-                               $e = $e ?: $ex;
-                       }
                        try {
                                $conn->runTransactionListenerCallbacks( $type );
                        } catch ( Exception $ex ) {
                                $e = $e ?: $ex;
                        }
                } );
+               $this->trxRoundStage = self::ROUND_CURSORY;
 
                return $e;
        }
@@ -1414,20 +1492,37 @@ class LoadBalancer implements ILoadBalancer {
        public function rollbackMasterChanges( $fname = __METHOD__ ) {
                $restore = ( $this->trxRoundId !== false );
                $this->trxRoundId = false;
-               $this->forEachOpenMasterConnection(
-                       function ( IDatabase $conn ) use ( $fname, $restore ) {
-                               $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
-                               if ( $restore ) {
-                                       $this->undoTransactionRoundFlags( $conn );
-                               }
-                       }
-               );
+               $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+               $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
+                       $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
+               } );
+               if ( $restore ) {
+                       // Unmark handles as participating in this explicit transaction round
+                       $this->forEachOpenMasterConnection( function ( Database $conn ) {
+                               $this->undoTransactionRoundFlags( $conn );
+                       } );
+               }
+               $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
        }
 
-       public function suppressTransactionEndCallbacks() {
-               $this->forEachOpenMasterConnection( function ( Database $conn ) {
-                       $conn->setTrxEndCallbackSuppression( true );
-               } );
+       /**
+        * @param string|string[] $stage
+        */
+       private function assertTransactionRoundStage( $stage ) {
+               $stages = (array)$stage;
+
+               if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
+                       $stageList = implode(
+                               '/',
+                               array_map( function ( $v ) {
+                                       return "'$v'";
+                               }, $stages )
+                       );
+                       throw new DBTransactionError(
+                               null,
+                               "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
+                       );
+               }
        }
 
        /**
@@ -1437,9 +1532,9 @@ class LoadBalancer implements ILoadBalancer {
         * transaction rounds and remain in auto-commit mode. Such behavior might be desired
         * when a DB server is used for something like simple key/value storage.
         *
-        * @param IDatabase $conn
+        * @param Database $conn
         */
-       private function applyTransactionRoundFlags( IDatabase $conn ) {
+       private function applyTransactionRoundFlags( Database $conn ) {
                if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
                        return; // transaction rounds do not apply to these connections
                }
@@ -1456,9 +1551,9 @@ class LoadBalancer implements ILoadBalancer {
        }
 
        /**
-        * @param IDatabase $conn
+        * @param Database $conn
         */
-       private function undoTransactionRoundFlags( IDatabase $conn ) {
+       private function undoTransactionRoundFlags( Database $conn ) {
                if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
                        return; // transaction rounds do not apply to these connections
                }
@@ -1473,11 +1568,25 @@ class LoadBalancer implements ILoadBalancer {
        }
 
        public function flushReplicaSnapshots( $fname = __METHOD__ ) {
-               $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) {
-                       $conn->flushSnapshot( __METHOD__ );
+               $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) use ( $fname ) {
+                       $conn->flushSnapshot( $fname );
                } );
        }
 
+       public function flushMasterSnapshots( $fname = __METHOD__ ) {
+               $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
+                       $conn->flushSnapshot( $fname );
+               } );
+       }
+
+       /**
+        * @return string
+        * @since 1.32
+        */
+       public function getTransactionRoundStage() {
+               return $this->trxRoundStage;
+       }
+
        public function hasMasterConnection() {
                return $this->isOpen( $this->getWriterIndex() );
        }