Add more estimation modes to pendingWriteQueryDuration()
authorAaron Schulz <aschulz@wikimedia.org>
Sat, 27 Aug 2016 10:57:26 +0000 (03:57 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Tue, 30 Aug 2016 01:36:17 +0000 (18:36 -0700)
* Use this to exclude some common cases of harmless queries that
  happen to block on row-level locks for a long time. This does
  not apply to UPDATE/DELETE however, due to the ambiguity of
  time spent scanning vs locking.
* Update commitMasterChanges() and JobRunner to use the new
  mode to avoid pointless rollback or lag checks.

Change-Id: Ifc2743f2d8cd109840c45cda5028fbb4df55d231

includes/db/DBConnRef.php
includes/db/Database.php
includes/db/IDatabase.php
includes/db/loadbalancer/LoadBalancer.php
includes/jobqueue/JobRunner.php

index 790a073..86d40f4 100644 (file)
@@ -103,7 +103,7 @@ class DBConnRef implements IDatabase {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
 
-       public function pendingWriteQueryDuration() {
+       public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL ) {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
 
@@ -477,8 +477,10 @@ class DBConnRef implements IDatabase {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
 
-       public function ping() {
-               return $this->__call( __FUNCTION__, func_get_args() );
+       public function ping( &$rtt = null ) {
+               return func_num_args()
+                       ? $this->__call( __FUNCTION__, [ &$rtt ] )
+                       : $this->__call( __FUNCTION__, [] ); // method cares about null vs missing
        }
 
        public function getLag() {
index 874e9c4..1adf73d 100644 (file)
@@ -39,6 +39,11 @@ abstract class DatabaseBase implements IDatabase {
 
        /** How long before it is worth doing a dummy query to test the connection */
        const PING_TTL = 1.0;
+       const PING_QUERY = 'SELECT 1 AS ping';
+
+       const TINY_WRITE_SEC = .010;
+       const SLOW_WRITE_SEC = .500;
+       const SMALL_WRITE_ROWS = 100;
 
        /** @var string SQL query */
        protected $mLastQuery = '';
@@ -102,7 +107,6 @@ abstract class DatabaseBase implements IDatabase {
         * @var int
         */
        protected $mTrxLevel = 0;
-
        /**
         * Either a short hexidecimal string if a transaction is active or ""
         *
@@ -110,7 +114,6 @@ abstract class DatabaseBase implements IDatabase {
         * @see DatabaseBase::mTrxLevel
         */
        protected $mTrxShortId = '';
-
        /**
         * The UNIX time that the transaction started. Callers can assume that if
         * snapshot isolation is used, then the data is *at least* up to date to that
@@ -120,10 +123,8 @@ abstract class DatabaseBase implements IDatabase {
         * @see DatabaseBase::mTrxLevel
         */
        private $mTrxTimestamp = null;
-
        /** @var float Lag estimate at the time of BEGIN */
        private $mTrxSlaveLag = null;
-
        /**
         * Remembers the function name given for starting the most recent transaction via begin().
         * Used to provide additional context for error reporting.
@@ -132,7 +133,6 @@ abstract class DatabaseBase implements IDatabase {
         * @see DatabaseBase::mTrxLevel
         */
        private $mTrxFname = null;
-
        /**
         * Record if possible write queries were done in the last transaction started
         *
@@ -140,7 +140,6 @@ abstract class DatabaseBase implements IDatabase {
         * @see DatabaseBase::mTrxLevel
         */
        private $mTrxDoneWrites = false;
-
        /**
         * Record if the current transaction was started implicitly due to DBO_TRX being set.
         *
@@ -148,34 +147,44 @@ abstract class DatabaseBase implements IDatabase {
         * @see DatabaseBase::mTrxLevel
         */
        private $mTrxAutomatic = false;
-
        /**
         * Array of levels of atomicity within transactions
         *
         * @var array
         */
        private $mTrxAtomicLevels = [];
-
        /**
         * Record if the current transaction was started implicitly by DatabaseBase::startAtomic
         *
         * @var bool
         */
        private $mTrxAutomaticAtomic = false;
-
        /**
         * Track the write query callers of the current transaction
         *
         * @var string[]
         */
        private $mTrxWriteCallers = [];
-
        /**
-        * Track the seconds spent in write queries for the current transaction
-        *
-        * @var float
+        * @var float Seconds spent in write queries for the current transaction
         */
        private $mTrxWriteDuration = 0.0;
+       /**
+        * @var integer Number of write queries for the current transaction
+        */
+       private $mTrxWriteQueryCount = 0;
+       /**
+        * @var float Like mTrxWriteQueryCount but excludes lock-bound, easy to replicate, queries
+        */
+       private $mTrxWriteAdjDuration = 0.0;
+       /**
+        * @var integer Number of write queries counted in mTrxWriteAdjDuration
+        */
+       private $mTrxWriteAdjQueryCount = 0;
+       /**
+        * @var float RTT time estimate
+        */
+       private $mRTTEstimate = 0.0;
 
        /** @var array Map of (name => 1) for locks obtained via lock() */
        private $mNamedLocksHeld = [];
@@ -419,8 +428,26 @@ abstract class DatabaseBase implements IDatabase {
                );
        }
 
-       public function pendingWriteQueryDuration() {
-               return $this->mTrxLevel ? $this->mTrxWriteDuration : false;
+       public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL ) {
+               if ( !$this->mTrxLevel ) {
+                       return false;
+               } elseif ( !$this->mTrxDoneWrites ) {
+                       return 0.0;
+               }
+
+               switch ( $type ) {
+                       case self::ESTIMATE_DB_APPLY:
+                               $this->ping( $rtt );
+                               $rttAdjTotal = $this->mTrxWriteAdjQueryCount * $rtt;
+                               $applyTime = max( $this->mTrxWriteAdjDuration - $rttAdjTotal, 0 );
+                               // For omitted queries, make them count as something at least
+                               $omitted = $this->mTrxWriteQueryCount - $this->mTrxWriteAdjQueryCount;
+                               $applyTime += self::TINY_WRITE_SEC * $omitted;
+
+                               return $applyTime;
+                       default: // everything
+                               return $this->mTrxWriteDuration;
+               }
        }
 
        public function pendingWriteCallers() {
@@ -808,7 +835,16 @@ abstract class DatabaseBase implements IDatabase {
         * @return bool
         */
        protected function isWriteQuery( $sql ) {
-               return !preg_match( '/^(?:SELECT|BEGIN|ROLLBACK|COMMIT|SET|SHOW|EXPLAIN|\(SELECT)\b/i', $sql );
+               return !preg_match(
+                       '/^(?:SELECT|BEGIN|ROLLBACK|COMMIT|SET|SHOW|EXPLAIN|\(SELECT)\b/i', $sql );
+       }
+
+       /**
+        * @param $sql
+        * @return string|null
+        */
+       protected function getQueryVerb( $sql ) {
+               return preg_match( '/^\s*([a-z]+)/i', $sql, $m ) ? strtoupper( $m[1] ) : null;
        }
 
        /**
@@ -821,8 +857,8 @@ abstract class DatabaseBase implements IDatabase {
         * @return bool
         */
        protected function isTransactableQuery( $sql ) {
-               $verb = substr( $sql, 0, strcspn( $sql, " \t\r\n" ) );
-               return !in_array( $verb, [ 'BEGIN', 'COMMIT', 'ROLLBACK', 'SHOW', 'SET' ] );
+               $verb = $this->getQueryVerb( $sql );
+               return !in_array( $verb, [ 'BEGIN', 'COMMIT', 'ROLLBACK', 'SHOW', 'SET' ], true );
        }
 
        public function query( $sql, $fname = __METHOD__, $tempIgnore = false ) {
@@ -945,18 +981,22 @@ abstract class DatabaseBase implements IDatabase {
                $this->profiler->profileIn( $queryProf );
                $ret = $this->doQuery( $commentedSql );
                $this->profiler->profileOut( $queryProf );
-               $queryRuntime = microtime( true ) - $startTime;
+               $queryRuntime = max( microtime( true ) - $startTime, 0.0 );
 
                unset( $queryProfSection ); // profile out (if set)
 
                if ( $ret !== false ) {
                        $this->lastPing = $startTime;
                        if ( $isWrite && $this->mTrxLevel ) {
-                               $this->mTrxWriteDuration += $queryRuntime;
+                               $this->updateTrxWriteQueryTime( $sql, $queryRuntime );
                                $this->mTrxWriteCallers[] = $fname;
                        }
                }
 
+               if ( $sql === self::PING_QUERY ) {
+                       $this->mRTTEstimate = $queryRuntime;
+               }
+
                $this->getTransactionProfiler()->recordQueryCompletion(
                        $queryProf, $startTime, $isWrite, $this->affectedRows()
                );
@@ -965,6 +1005,37 @@ abstract class DatabaseBase implements IDatabase {
                return $ret;
        }
 
+       /**
+        * Update the estimated run-time of a query, not counting large row lock times
+        *
+        * LoadBalancer can be set to rollback transactions that will create huge replication
+        * lag. It bases this estimate off of pendingWriteQueryDuration(). Certain simple
+        * queries, like inserting a row can take a long time due to row locking. This method
+        * uses some simple heuristics to discount those cases.
+        *
+        * @param string $sql
+        * @param float $runtime Total runtime, including RTT
+        */
+       private function updateTrxWriteQueryTime( $sql, $runtime ) {
+               $indicativeOfSlaveRuntime = true;
+               if ( $runtime > self::SLOW_WRITE_SEC ) {
+                       $verb = $this->getQueryVerb( $sql );
+                       // insert(), upsert(), replace() are fast unless bulky in size or blocked on locks
+                       if ( $verb === 'INSERT' ) {
+                               $indicativeOfSlaveRuntime = $this->affectedRows() > self::SMALL_WRITE_ROWS;
+                       } elseif ( $verb === 'REPLACE' ) {
+                               $indicativeOfSlaveRuntime = $this->affectedRows() > self::SMALL_WRITE_ROWS / 2;
+                       }
+               }
+
+               $this->mTrxWriteDuration += $runtime;
+               $this->mTrxWriteQueryCount += 1;
+               if ( $indicativeOfSlaveRuntime ) {
+                       $this->mTrxWriteAdjDuration += $runtime;
+                       $this->mTrxWriteAdjQueryCount += 1;
+               }
+       }
+
        private function canRecoverFromDisconnect( $sql, $priorWritesPending ) {
                # Transaction dropped; this can mean lost writes, or REPEATABLE-READ snapshots.
                # Dropped connections also mean that named locks are automatically released.
@@ -2739,6 +2810,9 @@ abstract class DatabaseBase implements IDatabase {
                $this->mTrxAtomicLevels = [];
                $this->mTrxShortId = wfRandomString( 12 );
                $this->mTrxWriteDuration = 0.0;
+               $this->mTrxWriteQueryCount = 0;
+               $this->mTrxWriteAdjDuration = 0.0;
+               $this->mTrxWriteAdjQueryCount = 0;
                $this->mTrxWriteCallers = [];
                // First SELECT after BEGIN will establish the snapshot in REPEATABLE-READ.
                // Get an estimate of the slave lag before then, treating estimate staleness
@@ -2967,17 +3041,24 @@ abstract class DatabaseBase implements IDatabase {
                }
        }
 
-       public function ping() {
+       public function ping( &$rtt = null ) {
+               // Avoid hitting the server if it was hit recently
                if ( $this->isOpen() && ( microtime( true ) - $this->lastPing ) < self::PING_TTL ) {
-                       return true;
+                       if ( !func_num_args() || $this->mRTTEstimate > 0 ) {
+                               $rtt = $this->mRTTEstimate;
+                               return true; // don't care about $rtt
+                       }
                }
 
-               $ignoreErrors = true;
-               $this->clearFlag( DBO_TRX, self::REMEMBER_PRIOR );
                // This will reconnect if possible or return false if not
-               $ok = (bool)$this->query( "SELECT 1 AS ping", __METHOD__, $ignoreErrors );
+               $this->clearFlag( DBO_TRX, self::REMEMBER_PRIOR );
+               $ok = ( $this->query( self::PING_QUERY, __METHOD__, true ) !== false );
                $this->restoreFlags( self::RESTORE_PRIOR );
 
+               if ( $ok ) {
+                       $rtt = $this->mRTTEstimate;
+               }
+
                return $ok;
        }
 
index b689e82..5c632ca 100644 (file)
@@ -59,6 +59,11 @@ interface IDatabase {
        /** @var string Restore to the initial flag state */
        const RESTORE_INITIAL = 'initial';
 
+       /** @var string Estimate total time (RTT, scanning, waiting on locks, applying) */
+       const ESTIMATE_TOTAL = 'total';
+       /** @var string Estimate time to apply (scanning, applying) */
+       const ESTIMATE_DB_APPLY = 'apply';
+
        /**
         * A string describing the current software version, and possibly
         * other details in a user-friendly way. Will be listed on Special:Version, etc.
@@ -210,10 +215,11 @@ interface IDatabase {
         *
         * High times could be due to scanning, updates, locking, and such
         *
+        * @param string $type IDatabase::ESTIMATE_* constant [default: ESTIMATE_ALL]
         * @return float|bool Returns false if not transaction is active
         * @since 1.26
         */
-       public function pendingWriteQueryDuration();
+       public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL );
 
        /**
         * Get the list of method names that did write queries for this transaction
@@ -1485,9 +1491,10 @@ interface IDatabase {
        /**
         * Ping the server and try to reconnect if it there is no connection
         *
+        * @param float|null &$rtt Value to store the estimated RTT [optional]
         * @return bool Success or failure
         */
-       public function ping();
+       public function ping( &$rtt = null );
 
        /**
         * Get slave lag. Currently supported only by MySQL.
index 65cd3b3..32729dd 100644 (file)
@@ -1101,7 +1101,7 @@ class LoadBalancer {
                        }
                        // Assert that the time to replicate the transaction will be sane.
                        // If this fails, then all DB transactions will be rollback back together.
-                       $time = $conn->pendingWriteQueryDuration();
+                       $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
                        if ( $limit > 0 && $time > $limit ) {
                                throw new DBTransactionError(
                                        $conn,
index 5f48dca..f25c1ba 100644 (file)
@@ -503,16 +503,21 @@ class JobRunner implements LoggerAwareInterface {
                if ( $wgJobSerialCommitThreshold !== false && $lb->getServerCount() > 1 ) {
                        // Generally, there is one master connection to the local DB
                        $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
+                       // We need natively blocking fast locks
+                       if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
+                               $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
+                               if ( $time < $wgJobSerialCommitThreshold ) {
+                                       $dbwSerial = false;
+                               }
+                       } else {
+                               $dbwSerial = false;
+                       }
                } else {
+                       // There are no slaves or writes are all to foreign DB (we don't handle that)
                        $dbwSerial = false;
                }
 
-               if ( !$dbwSerial
-                       || !$dbwSerial->namedLocksEnqueue()
-                       || $dbwSerial->pendingWriteQueryDuration() < $wgJobSerialCommitThreshold
-               ) {
-                       // Writes are all to foreign DBs, named locks don't form queues,
-                       // or $wgJobSerialCommitThreshold is not reached; commit changes now
+               if ( !$dbwSerial ) {
                        wfGetLBFactory()->commitMasterChanges( __METHOD__ );
                        return;
                }