rdbms: merge openConnection into getConnection in LoadBalancer
authorAaron Schulz <aschulz@wikimedia.org>
Mon, 17 Jun 2019 18:16:12 +0000 (19:16 +0100)
committerAaron Schulz <aschulz@wikimedia.org>
Wed, 19 Jun 2019 11:13:16 +0000 (11:13 +0000)
Add error suppression feature to getConnection() itself so that
callers can easily be migrated to it. Deprecate openConnection(),
which is poorly named and had a poor division of responsibility
with getConnection().

Split out some new private methods for readability:
* sanitizeConnectionFlags(), which warns about and strips unusable flags
* enforceConnectionFlags(), which handles some DBO_TRX sanity logic
* lazyLoadReplicationPositions(), which handles the chronology callback

Make sure "waitForPos" is loaded before pickReaderIndex() so that the
optimization of being extra picky about replication lag (in seconds)
actually applies.

Change-Id: I225ed6e6edf39cdf237f2322ea59f35d30f2e01a

includes/libs/rdbms/loadbalancer/ILoadBalancer.php
includes/libs/rdbms/loadbalancer/LoadBalancer.php
includes/libs/rdbms/loadmonitor/LoadMonitor.php

index 0258878..a2202b6 100644 (file)
@@ -86,6 +86,8 @@ interface ILoadBalancer {
 
        /** @var int DB handle should have DBO_TRX disabled and the caller will leave it as such */
        const CONN_TRX_AUTOCOMMIT = 1;
+       /** @var int Return null on connection failure instead of throwing an exception */
+       const CONN_SILENCE_ERRORS = 2;
 
        /** @var string Manager of ILoadBalancer instances is running post-commit callbacks */
        const STAGE_POSTCOMMIT_CALLBACKS = 'stage-postcommit-callbacks';
@@ -148,10 +150,13 @@ interface ILoadBalancer {
        /**
         * Get the index of the reader connection, which may be a replica DB
         *
-        * This takes into account load ratios and lag times. It should
-        * always return a consistent index during a given invocation.
+        * This takes into account load ratios and lag times. It should return a consistent
+        * index during the life time of the load balancer. This initially checks replica DBs
+        * for connectivity to avoid returning an unusable server. This means that connections
+        * might be attempted by calling this method (usally one at the most but possibly more).
+        * Subsequent calls with the same $group will not need to make new connection attempts
+        * since the acquired connection for each group is preserved.
         *
-        * Side effect: opens connections to databases
         * @param string|bool $group Query group, or false for the generic group
         * @param string|bool $domain Domain ID, or false for the current domain
         * @throws DBError
@@ -224,8 +229,8 @@ interface ILoadBalancer {
         *
         * @note This method throws DBAccessError if ILoadBalancer::disable() was called
         *
-        * @throws DBError
-        * @return Database
+        * @throws DBError If any error occurs that prevents the yielding of a (live) IDatabase
+        * @return IDatabase|bool This returns false on failure if CONN_SILENCE_ERRORS is set
         */
        public function getConnection( $i, $groups = [], $domain = false, $flags = 0 );
 
@@ -300,31 +305,6 @@ interface ILoadBalancer {
         */
        public function getMaintenanceConnectionRef( $i, $groups = [], $domain = false, $flags = 0 );
 
-       /**
-        * Open a connection to the server given by the specified index
-        *
-        * The index must be an actual index into the array. If a connection to the server is
-        * already open and not considered an "in use" foreign connection, this simply returns it.
-        *
-        * Avoid using CONN_TRX_AUTOCOMMIT for databases with ATTR_DB_LEVEL_LOCKING (e.g. sqlite)
-        * in order to avoid deadlocks. ILoadBalancer::getServerAttributes() can be used to check
-        * such flags beforehand.
-        *
-        * If the caller uses $domain or sets CONN_TRX_AUTOCOMMIT in $flags, then it must
-        * also call ILoadBalancer::reuseConnection() on the handle when finished using it.
-        * In all other cases, this is not necessary, though not harmful either.
-        * Avoid the use of begin() or startAtomic() on any such connections.
-        *
-        * @note This method throws DBAccessError if ILoadBalancer::disable() was called
-        *
-        * @param int $i Server index (does not support DB_MASTER/DB_REPLICA)
-        * @param string|bool $domain Domain ID, or false for the current domain
-        * @param int $flags Bitfield of CONN_* class constants (e.g. CONN_TRX_AUTOCOMMIT)
-        * @return Database|bool Returns false on errors
-        * @throws DBAccessError
-        */
-       public function openConnection( $i, $domain = false, $flags = 0 );
-
        /**
         * @return int
         */
index ffb7a34..757e52d 100644 (file)
@@ -107,7 +107,7 @@ class LoadBalancer implements ILoadBalancer {
        private $genericReadIndex = -1;
        /** @var int[] The group replica DB indexes keyed by group */
        private $readIndexByGroup = [];
-       /** @var bool|DBMasterPos False if not set */
+       /** @var bool|DBMasterPos Replication sync position or false if not set */
        private $waitForPos;
        /** @var bool Whether the generic reader fell back to a lagged replica DB */
        private $laggedReplicaMode = false;
@@ -270,6 +270,49 @@ class LoadBalancer implements ILoadBalancer {
        }
 
        /**
+        * @param int $flags
+        * @return bool
+        */
+       private function sanitizeConnectionFlags( $flags ) {
+               if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) {
+                       // Assuming all servers are of the same type (or similar), which is overwhelmingly
+                       // the case, use the master server information to get the attributes. The information
+                       // for $i cannot be used since it might be DB_REPLICA, which might require connection
+                       // attempts in order to be resolved into a real server index.
+                       $attributes = $this->getServerAttributes( $this->getWriterIndex() );
+                       if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
+                               // Callers sometimes want to (a) escape REPEATABLE-READ stateness without locking
+                               // rows (e.g. FOR UPDATE) or (b) make small commits during a larger transactions
+                               // to reduce lock contention. None of these apply for sqlite and using separate
+                               // connections just causes self-deadlocks.
+                               $flags &= ~self::CONN_TRX_AUTOCOMMIT;
+                               $this->connLogger->info( __METHOD__ .
+                                       ': ignoring CONN_TRX_AUTOCOMMIT to avoid deadlocks.' );
+                       }
+               }
+
+               return $flags;
+       }
+
+       /**
+        * @param IDatabase $conn
+        * @param int $flags
+        * @throws DBUnexpectedError
+        */
+       private function enforceConnectionFlags( IDatabase $conn, $flags ) {
+               if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
+                       if ( $conn->trxLevel() ) { // sanity
+                               throw new DBUnexpectedError(
+                                       $conn,
+                                       'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
+                               );
+                       }
+
+                       $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
+               }
+       }
+
+               /**
         * Get a LoadMonitor instance
         *
         * @return ILoadMonitor
@@ -354,7 +397,7 @@ class LoadBalancer implements ILoadBalancer {
         * @param int $i
         * @param array $groups
         * @param string|bool $domain
-        * @return int
+        * @return int The index of a specific server (replica DBs are checked for connectivity)
         */
        private function getConnectionIndex( $i, $groups, $domain ) {
                // Check one "group" per default: the generic pool
@@ -364,9 +407,9 @@ class LoadBalancer implements ILoadBalancer {
                        ? $defaultGroups
                        : (array)$groups;
 
-               if ( $i == self::DB_MASTER ) {
+               if ( $i === self::DB_MASTER ) {
                        $i = $this->getWriterIndex();
-               } elseif ( $i == self::DB_REPLICA ) {
+               } elseif ( $i === self::DB_REPLICA ) {
                        # Try to find an available server in any the query groups (in order)
                        foreach ( $groups as $group ) {
                                $groupIndex = $this->getReaderIndex( $group, $domain );
@@ -378,7 +421,7 @@ class LoadBalancer implements ILoadBalancer {
                }
 
                # Operation-based index
-               if ( $i == self::DB_REPLICA ) {
+               if ( $i === self::DB_REPLICA ) {
                        $this->lastError = 'Unknown error'; // reset error string
                        # Try the general server pool if $groups are unavailable.
                        $i = ( $groups === [ false ] )
@@ -389,7 +432,7 @@ class LoadBalancer implements ILoadBalancer {
                                $this->lastError = 'No working replica DB server: ' . $this->lastError;
                                // Throw an exception
                                $this->reportConnectionError();
-                               return null; // not reached
+                               return null; // unreachable due to exception
                        }
                }
 
@@ -427,6 +470,7 @@ class LoadBalancer implements ILoadBalancer {
                $this->getLoadMonitor()->scaleLoads( $loads, $domain );
 
                // Pick a server to use, accounting for weights, load, lag, and "waitForPos"
+               $this->lazyLoadReplicationPositions(); // optimizes server candidate selection
                list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
                if ( $i === false ) {
                        // DB connection unsuccessful
@@ -510,6 +554,7 @@ class LoadBalancer implements ILoadBalancer {
                        } else {
                                $i = false;
                                if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
+                                       $this->replLogger->debug( __METHOD__ . ": replication positions detected" );
                                        // "chronologyCallback" sets "waitForPos" for session consistency.
                                        // This triggers doWait() after connect, so it's especially good to
                                        // avoid lagged servers so as to avoid excessive delay in that method.
@@ -542,7 +587,7 @@ class LoadBalancer implements ILoadBalancer {
                        $serverName = $this->getServerName( $i );
                        $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
 
-                       $conn = $this->openConnection( $i, $domain );
+                       $conn = $this->getConnection( $i, [], $domain, self::CONN_SILENCE_ERRORS );
                        if ( !$conn ) {
                                $this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" );
                                unset( $currentLoads[$i] ); // avoid this server next iteration
@@ -714,20 +759,20 @@ class LoadBalancer implements ILoadBalancer {
                                );
 
                                return false;
-                       } else {
-                               $conn = $this->openConnection( $index, self::DOMAIN_ANY );
-                               if ( !$conn ) {
-                                       $this->replLogger->warning(
-                                               __METHOD__ . ': failed to connect to {dbserver}',
-                                               [ 'dbserver' => $server ]
-                                       );
+                       }
+                       // Open a temporary new connection in order to wait for replication
+                       $conn = $this->getConnection( $index, [], self::DOMAIN_ANY, self::CONN_SILENCE_ERRORS );
+                       if ( !$conn ) {
+                               $this->replLogger->warning(
+                                       __METHOD__ . ': failed to connect to {dbserver}',
+                                       [ 'dbserver' => $server ]
+                               );
 
-                                       return false;
-                               }
-                               // Avoid connection spam in waitForAll() when connections
-                               // are made just for the sake of doing this lag check.
-                               $close = true;
+                               return false;
                        }
+                       // Avoid connection spam in waitForAll() when connections
+                       // are made just for the sake of doing this lag check.
+                       $close = true;
                }
 
                $this->replLogger->info(
@@ -773,39 +818,32 @@ class LoadBalancer implements ILoadBalancer {
        }
 
        public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
-               if ( $i === null || $i === false ) {
+               if ( !is_int( $i ) ) {
                        throw new InvalidArgumentException( "Cannot connect without a server index" );
+               } elseif ( $groups && $i > 0 ) {
+                       throw new InvalidArgumentException( "Got query groups with server index #$i" );
                }
 
                $domain = $this->resolveDomainID( $domain );
+               $flags = $this->sanitizeConnectionFlags( $flags );
                $masterOnly = ( $i == self::DB_MASTER || $i == $this->getWriterIndex() );
 
-               if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) {
-                       // Assuming all servers are of the same type (or similar), which is overwhelmingly
-                       // the case, use the master server information to get the attributes. The information
-                       // for $i cannot be used since it might be DB_REPLICA, which might require connection
-                       // attempts in order to be resolved into a real server index.
-                       $attributes = $this->getServerAttributes( $this->getWriterIndex() );
-                       if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
-                               // Callers sometimes want to (a) escape REPEATABLE-READ stateness without locking
-                               // rows (e.g. FOR UPDATE) or (b) make small commits during a larger transactions
-                               // to reduce lock contention. None of these apply for sqlite and using separate
-                               // connections just causes self-deadlocks.
-                               $flags &= ~self::CONN_TRX_AUTOCOMMIT;
-                               $this->connLogger->info( __METHOD__ .
-                                       ': ignoring CONN_TRX_AUTOCOMMIT to avoid deadlocks.' );
-                       }
-               }
-
                // Number of connections made before getting the server index and handle
                $priorConnectionsMade = $this->connsOpened;
-               // Decide which server to use (might trigger new connections)
+
+               // Choose a server if $i is DB_MASTER/DB_REPLICA (might trigger new connections)
                $serverIndex = $this->getConnectionIndex( $i, $groups, $domain );
                // Get an open connection to that server (might trigger a new connection)
-               $conn = $this->openConnection( $serverIndex, $domain, $flags );
-               if ( !$conn ) {
-                       $this->reportConnectionError();
-                       return null; // unreachable due to exception
+               $conn = $this->localDomain->equals( $domain )
+                       ? $this->getLocalConnection( $serverIndex, $flags )
+                       : $this->getForeignConnection( $serverIndex, $domain, $flags );
+               // Throw an error or bail out if the connection attempt failed
+               if ( !( $conn instanceof IDatabase ) ) {
+                       if ( ( $flags & self::CONN_SILENCE_ERRORS ) != self::CONN_SILENCE_ERRORS ) {
+                               $this->reportConnectionError();
+                       }
+
+                       return false;
                }
 
                // Profile any new connections caused by this method
@@ -815,6 +853,15 @@ class LoadBalancer implements ILoadBalancer {
                        $this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
                }
 
+               if ( !$conn->isOpen() ) {
+                       // Connection was made but later unrecoverably lost for some reason.
+                       // Do not return a handle that will just throw exceptions on use,
+                       // but let the calling code (e.g. getReaderIndex) try another server.
+                       $this->errorConnection = $conn;
+                       return false;
+               }
+
+               $this->enforceConnectionFlags( $conn, $flags );
                if ( $serverIndex == $this->getWriterIndex() ) {
                        // If the load balancer is read-only, perhaps due to replication lag, then master
                        // DB handles will reflect that. Note that Database::assertIsWritableMaster() takes
@@ -917,43 +964,15 @@ class LoadBalancer implements ILoadBalancer {
                        : self::DB_REPLICA;
        }
 
+       /**
+        * @param int $i
+        * @param bool $domain
+        * @param int $flags
+        * @return Database|bool Live database handle or false on failure
+        * @deprecated Since 1.34 Use getConnection() instead
+        */
        public function openConnection( $i, $domain = false, $flags = 0 ) {
-               $domain = $this->resolveDomainID( $domain );
-
-               if ( !$this->connectionAttempted && $this->chronologyCallback ) {
-                       // Load any "waitFor" positions before connecting so that doWait() is triggered
-                       $this->connLogger->debug( __METHOD__ . ': calling initLB() before first connection.' );
-                       $this->connectionAttempted = true;
-                       ( $this->chronologyCallback )( $this );
-               }
-
-               $conn = $this->localDomain->equals( $domain )
-                       ? $this->openLocalConnection( $i, $flags )
-                       : $this->openForeignConnection( $i, $domain, $flags );
-
-               if ( $conn instanceof IDatabase && !$conn->isOpen() ) {
-                       // Connection was made but later unrecoverably lost for some reason.
-                       // Do not return a handle that will just throw exceptions on use,
-                       // but let the calling code (e.g. getReaderIndex) try another server.
-                       $this->errorConnection = $conn;
-                       $conn = false;
-               }
-
-               if (
-                       $conn instanceof IDatabase &&
-                       ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT )
-               ) {
-                       if ( $conn->trxLevel() ) { // sanity
-                               throw new DBUnexpectedError(
-                                       $conn,
-                                       'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
-                               );
-                       }
-
-                       $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
-               }
-
-               return $conn;
+               return $this->getConnection( $i, [], $domain, $flags | self::CONN_SILENCE_ERRORS );
        }
 
        /**
@@ -968,7 +987,7 @@ class LoadBalancer implements ILoadBalancer {
         * @param int $flags Class CONN_* constant bitfield
         * @return Database
         */
-       private function openLocalConnection( $i, $flags = 0 ) {
+       private function getLocalConnection( $i, $flags = 0 ) {
                // Connection handles required to be in auto-commit mode use a separate connection
                // pool since the main pool is effected by implicit and explicit transaction rounds
                $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
@@ -1033,7 +1052,7 @@ class LoadBalancer implements ILoadBalancer {
         * @return Database|bool Returns false on connection error
         * @throws DBError When database selection fails
         */
-       private function openForeignConnection( $i, $domain, $flags = 0 ) {
+       private function getForeignConnection( $i, $domain, $flags = 0 ) {
                $domainInstance = DatabaseDomain::newFromId( $domain );
                // Connection handles required to be in auto-commit mode use a separate connection
                // pool since the main pool is effected by implicit and explicit transaction rounds
@@ -1235,9 +1254,22 @@ class LoadBalancer implements ILoadBalancer {
                        }
                }
 
+               $this->lazyLoadReplicationPositions(); // session consistency
+
                return $db;
        }
 
+       /**
+        * Make sure that any "waitForPos" positions are loaded and available to doWait()
+        */
+       private function lazyLoadReplicationPositions() {
+               if ( !$this->connectionAttempted && $this->chronologyCallback ) {
+                       $this->connectionAttempted = true;
+                       ( $this->chronologyCallback )( $this ); // generally calls waitFor()
+                       $this->connLogger->debug( __METHOD__ . ': executed chronology callback.' );
+               }
+       }
+
        /**
         * @throws DBConnectionError
         */
@@ -1944,11 +1976,12 @@ class LoadBalancer implements ILoadBalancer {
 
                if ( !$pos ) {
                        // Get the current master position, opening a connection if needed
-                       $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
+                       $index = $this->getWriterIndex();
+                       $masterConn = $this->getAnyOpenConnection( $index );
                        if ( $masterConn ) {
                                $pos = $masterConn->getMasterPos();
                        } else {
-                               $masterConn = $this->openConnection( $this->getWriterIndex(), self::DOMAIN_ANY );
+                               $masterConn = $this->getConnection( $index, [], self::DOMAIN_ANY, self::CONN_SILENCE_ERRORS );
                                if ( !$masterConn ) {
                                        throw new DBReplicationWaitError(
                                                null,
index 180baed..aa1e9b2 100644 (file)
@@ -154,12 +154,12 @@ class LoadMonitor implements ILoadMonitor {
 
                        # Handles with open transactions are avoided since they might be subject
                        # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
-                       $flags = ILoadBalancer::CONN_TRX_AUTOCOMMIT;
+                       $flags = ILoadBalancer::CONN_TRX_AUTOCOMMIT | ILoadBalancer::CONN_SILENCE_ERRORS;
                        $conn = $this->parent->getAnyOpenConnection( $i, $flags );
                        if ( $conn ) {
                                $close = false; // already open
                        } else {
-                               $conn = $this->parent->openConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
+                               $conn = $this->parent->getConnection( $i, [], ILoadBalancer::DOMAIN_ANY, $flags );
                                $close = true; // new connection
                        }