rdbms: avoid LoadBalancer::getConnection waste when using $groups
[lhc/web/wiklou.git] / includes / libs / rdbms / loadbalancer / LoadBalancer.php
index d78f0d7..ffb7a34 100644 (file)
@@ -73,7 +73,7 @@ class LoadBalancer implements ILoadBalancer {
        /** @var array[] Map of (server index => server config array) */
        private $servers;
        /** @var float[] Map of (server index => weight) */
-       private $loads;
+       private $genericLoads;
        /** @var array[] Map of (group => server index => weight) */
        private $groupLoads;
        /** @var bool Whether to disregard replica DB lag as a factor in replica DB selection */
@@ -103,8 +103,10 @@ class LoadBalancer implements ILoadBalancer {
 
        /** @var Database DB connection object that caused a problem */
        private $errorConnection;
-       /** @var int The generic (not query grouped) replica DB index (of $mServers) */
-       private $readIndex;
+       /** @var int The generic (not query grouped) replica DB index */
+       private $genericReadIndex = -1;
+       /** @var int[] The group replica DB indexes keyed by group */
+       private $readIndexByGroup = [];
        /** @var bool|DBMasterPos False if not set */
        private $waitForPos;
        /** @var bool Whether the generic reader fell back to a lagged replica DB */
@@ -122,6 +124,8 @@ class LoadBalancer implements ILoadBalancer {
        /** @var bool Whether any connection has been attempted yet */
        private $connectionAttempted = false;
 
+       /** @var int|null An integer ID of the managing LBFactory instance or null */
+       private $ownerId;
        /** @var string|bool String if a requested DBO_TRX transaction round is active */
        private $trxRoundId = false;
        /** @var string Stage of the current transaction round in the transaction round life-cycle */
@@ -181,7 +185,6 @@ class LoadBalancer implements ILoadBalancer {
 
                $this->waitTimeout = $params['waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
 
-               $this->readIndex = -1;
                $this->conns = [
                        // Connection were transaction rounds may be applied
                        self::KEY_LOCAL => [],
@@ -192,7 +195,7 @@ class LoadBalancer implements ILoadBalancer {
                        self::KEY_FOREIGN_INUSE_NOROUND => [],
                        self::KEY_FOREIGN_FREE_NOROUND => []
                ];
-               $this->loads = [];
+               $this->genericLoads = [];
                $this->waitForPos = false;
                $this->allowLagged = false;
 
@@ -206,7 +209,7 @@ class LoadBalancer implements ILoadBalancer {
                $this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ];
 
                foreach ( $params['servers'] as $i => $server ) {
-                       $this->loads[$i] = $server['load'];
+                       $this->genericLoads[$i] = $server['load'];
                        if ( isset( $server['groupLoads'] ) ) {
                                foreach ( $server['groupLoads'] as $group => $ratio ) {
                                        if ( !isset( $this->groupLoads[$group] ) ) {
@@ -250,6 +253,7 @@ class LoadBalancer implements ILoadBalancer {
                }
 
                $this->defaultGroup = $params['defaultGroup'] ?? null;
+               $this->ownerId = $params['ownerId'] ?? null;
        }
 
        public function getLocalDomainID() {
@@ -396,9 +400,12 @@ class LoadBalancer implements ILoadBalancer {
                if ( count( $this->servers ) == 1 ) {
                        // Skip the load balancing if there's only one server
                        return $this->getWriterIndex();
-               } elseif ( $group === false && $this->readIndex >= 0 ) {
-                       // Shortcut if the generic reader index was already cached
-                       return $this->readIndex;
+               }
+
+               $index = $this->getExistingReaderIndex( $group );
+               if ( $index >= 0 ) {
+                       // A reader index was already selected and "waitForPos" was handled
+                       return $index;
                }
 
                if ( $group !== false ) {
@@ -413,7 +420,7 @@ class LoadBalancer implements ILoadBalancer {
                        }
                } else {
                        // Use the generic load group
-                       $loads = $this->loads;
+                       $loads = $this->genericLoads;
                }
 
                // Scale the configured load ratios according to each server's load and state
@@ -422,27 +429,22 @@ class LoadBalancer implements ILoadBalancer {
                // Pick a server to use, accounting for weights, load, lag, and "waitForPos"
                list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
                if ( $i === false ) {
-                       // Replica DB connection unsuccessful
+                       // DB connection unsuccessful
                        return false;
                }
 
-               if ( $this->waitForPos && $i != $this->getWriterIndex() ) {
-                       // Before any data queries are run, wait for the server to catch up to the
-                       // specified position. This is used to improve session consistency. Note that
-                       // when LoadBalancer::waitFor() sets "waitForPos", the waiting triggers here,
-                       // so update laggedReplicaMode as needed for consistency.
-                       if ( !$this->doWait( $i ) ) {
-                               $laggedReplicaMode = true;
-                       }
+               // If data seen by queries is expected to reflect the transactions committed as of
+               // or after a given replication position then wait for the DB to apply those changes
+               if ( $this->waitForPos && $i != $this->getWriterIndex() && !$this->doWait( $i ) ) {
+                       // Data will be outdated compared to what was expected
+                       $laggedReplicaMode = true;
                }
 
-               if ( $this->readIndex <= 0 && $this->loads[$i] > 0 && $group === false ) {
-                       // Cache the generic reader index for future ungrouped DB_REPLICA handles
-                       $this->readIndex = $i;
-                       // Record if the generic reader index is in "lagged replica DB" mode
-                       if ( $laggedReplicaMode ) {
-                               $this->laggedReplicaMode = true;
-                       }
+               // Cache the reader index for future DB_REPLICA handles
+               $this->setExistingReaderIndex( $group, $i );
+               // Record whether the generic reader index is in "lagged replica DB" mode
+               if ( $group === false && $laggedReplicaMode ) {
+                       $this->laggedReplicaMode = true;
                }
 
                $serverName = $this->getServerName( $i );
@@ -451,6 +453,40 @@ class LoadBalancer implements ILoadBalancer {
                return $i;
        }
 
+       /**
+        * Get the server index chosen by the load balancer for use with the given query group
+        *
+        * @param string|bool $group Query group; use false for the generic group
+        * @return int Server index or -1 if none was chosen
+        */
+       protected function getExistingReaderIndex( $group ) {
+               if ( $group === false ) {
+                       $index = $this->genericReadIndex;
+               } else {
+                       $index = $this->readIndexByGroup[$group] ?? -1;
+               }
+
+               return $index;
+       }
+
+       /**
+        * Set the server index chosen by the load balancer for use with the given query group
+        *
+        * @param string|bool $group Query group; use false for the generic group
+        * @param int $index The index of a specific server
+        */
+       private function setExistingReaderIndex( $group, $index ) {
+               if ( $index < 0 ) {
+                       throw new UnexpectedValueException( "Cannot set a negative read server index" );
+               }
+
+               if ( $group === false ) {
+                       $this->genericReadIndex = $index;
+               } else {
+                       $this->readIndexByGroup[$group] = $index;
+               }
+       }
+
        /**
         * @param array $loads List of server weights
         * @param string|bool $domain
@@ -537,10 +573,10 @@ class LoadBalancer implements ILoadBalancer {
                try {
                        $this->waitForPos = $pos;
                        // If a generic reader connection was already established, then wait now
-                       $i = $this->readIndex;
-                       if ( ( $i > 0 ) && !$this->doWait( $i ) ) {
+                       if ( $this->genericReadIndex > 0 && !$this->doWait( $this->genericReadIndex ) ) {
                                $this->laggedReplicaMode = true;
                        }
+                       // Otherwise, wait until a connection is established in getReaderIndex()
                } finally {
                        // Restore the older position if it was higher since this is used for lag-protection
                        $this->setWaitForPositionIfHigher( $oldPos );
@@ -552,10 +588,10 @@ class LoadBalancer implements ILoadBalancer {
                try {
                        $this->waitForPos = $pos;
 
-                       $i = $this->readIndex;
+                       $i = $this->genericReadIndex;
                        if ( $i <= 0 ) {
                                // Pick a generic replica DB if there isn't one yet
-                               $readLoads = $this->loads;
+                               $readLoads = $this->genericLoads;
                                unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
                                $readLoads = array_filter( $readLoads ); // with non-zero load
                                $i = ArrayUtils::pickRandom( $readLoads );
@@ -584,7 +620,7 @@ class LoadBalancer implements ILoadBalancer {
 
                        $ok = true;
                        for ( $i = 1; $i < $serverCount; $i++ ) {
-                               if ( $this->loads[$i] > 0 ) {
+                               if ( $this->genericLoads[$i] > 0 ) {
                                        $start = microtime( true );
                                        $ok = $this->doWait( $i, true, $timeout ) && $ok;
                                        $timeout -= intval( microtime( true ) - $start );
@@ -779,7 +815,7 @@ class LoadBalancer implements ILoadBalancer {
                        $this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
                }
 
-               if ( $masterOnly ) {
+               if ( $serverIndex == $this->getWriterIndex() ) {
                        // If the load balancer is read-only, perhaps due to replication lag, then master
                        // DB handles will reflect that. Note that Database::assertIsWritableMaster() takes
                        // care of replica DB handles whereas getReadOnlyReason() would cause infinite loops.
@@ -1242,7 +1278,7 @@ class LoadBalancer implements ILoadBalancer {
        }
 
        public function isNonZeroLoad( $i ) {
-               return array_key_exists( $i, $this->servers ) && $this->loads[$i] != 0;
+               return array_key_exists( $i, $this->servers ) && $this->genericLoads[$i] != 0;
        }
 
        public function getServerCount() {
@@ -1334,13 +1370,14 @@ class LoadBalancer implements ILoadBalancer {
                $conn->close();
        }
 
-       public function commitAll( $fname = __METHOD__ ) {
-               $this->commitMasterChanges( $fname );
+       public function commitAll( $fname = __METHOD__, $owner = null ) {
+               $this->commitMasterChanges( $fname, $owner );
                $this->flushMasterSnapshots( $fname );
                $this->flushReplicaSnapshots( $fname );
        }
 
-       public function finalizeMasterChanges() {
+       public function finalizeMasterChanges( $fname = __METHOD__, $owner = null ) {
+               $this->assertOwnership( $fname, $owner );
                $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
 
                $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
@@ -1364,7 +1401,8 @@ class LoadBalancer implements ILoadBalancer {
                return $total;
        }
 
-       public function approveMasterChanges( array $options ) {
+       public function approveMasterChanges( array $options, $fname = __METHOD__, $owner = null ) {
+               $this->assertOwnership( $fname, $owner );
                $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
 
                $limit = $options['maxWriteDuration'] ?? 0;
@@ -1397,7 +1435,8 @@ class LoadBalancer implements ILoadBalancer {
                $this->trxRoundStage = self::ROUND_APPROVED;
        }
 
-       public function beginMasterChanges( $fname = __METHOD__ ) {
+       public function beginMasterChanges( $fname = __METHOD__, $owner = null ) {
+               $this->assertOwnership( $fname, $owner );
                if ( $this->trxRoundId !== false ) {
                        throw new DBTransactionError(
                                null,
@@ -1421,7 +1460,8 @@ class LoadBalancer implements ILoadBalancer {
                $this->trxRoundStage = self::ROUND_CURSORY;
        }
 
-       public function commitMasterChanges( $fname = __METHOD__ ) {
+       public function commitMasterChanges( $fname = __METHOD__, $owner = null ) {
+               $this->assertOwnership( $fname, $owner );
                $this->assertTransactionRoundStage( self::ROUND_APPROVED );
 
                $failures = [];
@@ -1459,7 +1499,8 @@ class LoadBalancer implements ILoadBalancer {
                $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
        }
 
-       public function runMasterTransactionIdleCallbacks() {
+       public function runMasterTransactionIdleCallbacks( $fname = __METHOD__, $owner = null ) {
+               $this->assertOwnership( $fname, $owner );
                if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
                        $type = IDatabase::TRIGGER_COMMIT;
                } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
@@ -1528,7 +1569,8 @@ class LoadBalancer implements ILoadBalancer {
                return $e;
        }
 
-       public function runMasterTransactionListenerCallbacks() {
+       public function runMasterTransactionListenerCallbacks( $fname = __METHOD__, $owner = null ) {
+               $this->assertOwnership( $fname, $owner );
                if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
                        $type = IDatabase::TRIGGER_COMMIT;
                } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
@@ -1555,7 +1597,9 @@ class LoadBalancer implements ILoadBalancer {
                return $e;
        }
 
-       public function rollbackMasterChanges( $fname = __METHOD__ ) {
+       public function rollbackMasterChanges( $fname = __METHOD__, $owner = null ) {
+               $this->assertOwnership( $fname, $owner );
+
                $restore = ( $this->trxRoundId !== false );
                $this->trxRoundId = false;
                $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
@@ -1573,6 +1617,7 @@ class LoadBalancer implements ILoadBalancer {
 
        /**
         * @param string|string[] $stage
+        * @throws DBTransactionError
         */
        private function assertTransactionRoundStage( $stage ) {
                $stages = (array)$stage;
@@ -1591,6 +1636,20 @@ class LoadBalancer implements ILoadBalancer {
                }
        }
 
+       /**
+        * @param string $fname
+        * @param int|null $owner Owner ID of the caller
+        * @throws DBTransactionError
+        */
+       private function assertOwnership( $fname, $owner ) {
+               if ( $this->ownerId !== null && $owner !== $this->ownerId ) {
+                       throw new DBTransactionError(
+                               null,
+                               "$fname: LoadBalancer is owned by LBFactory #{$this->ownerId} (got '$owner')."
+                       );
+               }
+       }
+
        /**
         * Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round
         *
@@ -1695,13 +1754,14 @@ class LoadBalancer implements ILoadBalancer {
                if (
                        // Avoid recursion if there is only one DB
                        $this->getServerCount() > 1 &&
+                       // Avoid recursion if the (non-zero load) master DB was picked for generic reads
+                       $this->genericReadIndex !== $this->getWriterIndex() &&
                        // Stay in lagged replica mode during the load balancer instance lifetime
                        !$this->laggedReplicaMode
                ) {
                        try {
-                               // See if laggedReplicaMode gets set
-                               $conn = $this->getConnection( self::DB_REPLICA, false, $domain );
-                               $this->reuseConnection( $conn );
+                               // Calling this method will set "laggedReplicaMode" as needed
+                               $this->getReaderIndex( false, $domain );
                        } catch ( DBConnectionError $e ) {
                                // Avoid expensive re-connect attempts and failures
                                $this->allReplicasDownMode = true;
@@ -1839,7 +1899,7 @@ class LoadBalancer implements ILoadBalancer {
 
                $lagTimes = $this->getLagTimes( $domain );
                foreach ( $lagTimes as $i => $lag ) {
-                       if ( $this->loads[$i] > 0 && $lag > $maxLag ) {
+                       if ( $this->genericLoads[$i] > 0 && $lag > $maxLag ) {
                                $maxLag = $lag;
                                $host = $this->servers[$i]['host'];
                                $maxIndex = $i;