Added $wgJobSerialCommitThreshold setting
authorAaron Schulz <aschulz@wikimedia.org>
Wed, 22 Apr 2015 06:13:31 +0000 (23:13 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Fri, 24 Apr 2015 18:38:16 +0000 (11:38 -0700)
* This is used to avoid lag by certain jobs

Bug: T95501
Change-Id: Id707c9a840fa23d56407e03aaae4e25149a1f906

includes/DefaultSettings.php
includes/db/Database.php
includes/db/DatabaseMysqlBase.php
includes/db/LoadBalancer.php
includes/jobqueue/JobRunner.php

index 3cfeb8c..e323ec6 100644 (file)
@@ -6451,6 +6451,21 @@ $wgJobTypesExcludedFromDefaultQueue = array( 'AssembleUploadChunks', 'PublishSta
  */
 $wgJobBackoffThrottling = array();
 
+/**
+ * Make job runners commit changes for slave-lag prone jobs one job at a time.
+ * This is useful if there are many job workers that race on slave lag checks.
+ * If set, jobs taking this many seconds of DB write time have serialized commits.
+ *
+ * Note that affected jobs may have worse lock contention. Also, if they affect
+ * several DBs at once they may have a smaller chance of being atomic due to the
+ * possibility of connection loss while queueing up to commit. Affected jobs may
+ * also fail due to the commit lock acquisition timeout.
+ *
+ * @var float|bool
+ * @since 1.26
+ */
+$wgJobSerialCommitThreshold = false;
+
 /**
  * Map of job types to configuration arrays.
  * This determines which queue class and storage system is used for each job type.
index 8c1ebf9..2c1ebea 100644 (file)
@@ -4235,7 +4235,7 @@ abstract class DatabaseBase implements IDatabase {
        }
 
        /**
-        * Check to see if a named lock is available. This is non-blocking.
+        * Check to see if a named lock is available (non-blocking)
         *
         * @param string $lockName Name of lock to poll
         * @param string $method Name of method calling us
@@ -4249,8 +4249,7 @@ abstract class DatabaseBase implements IDatabase {
        /**
         * Acquire a named lock
         *
-        * Abstracted from Filestore::lock() so child classes can implement for
-        * their own needs.
+        * Named locks are not related to transactions
         *
         * @param string $lockName Name of lock to aquire
         * @param string $method Name of method calling us
@@ -4262,7 +4261,9 @@ abstract class DatabaseBase implements IDatabase {
        }
 
        /**
-        * Release a lock.
+        * Release a lock
+        *
+        * Named locks are not related to transactions
         *
         * @param string $lockName Name of lock to release
         * @param string $method Name of method calling us
@@ -4275,6 +4276,16 @@ abstract class DatabaseBase implements IDatabase {
                return true;
        }
 
+       /**
+        * Check to see if a named lock used by lock() use blocking queues
+        *
+        * @return bool
+        * @since 1.26
+        */
+       public function namedLocksEnqueue() {
+               return false;
+       }
+
        /**
         * Lock specific tables
         *
index aac95a8..64917cc 100644 (file)
@@ -873,6 +873,10 @@ abstract class DatabaseMysqlBase extends DatabaseBase {
                return ( $row->lockstatus == 1 );
        }
 
+       public function namedLocksEnqueue() {
+               return true;
+       }
+
        /**
         * @param array $read
         * @param array $write
index 624f46b..be67d75 100644 (file)
@@ -845,8 +845,9 @@ class LoadBalancer {
 
        /**
         * @return int
+        * @since 1.26
         */
-       private function getWriterIndex() {
+       public function getWriterIndex() {
                return 0;
        }
 
index 9425423..2311ea2 100644 (file)
@@ -35,6 +35,11 @@ class JobRunner implements LoggerAwareInterface {
        /** @var callable|null Debug output handler */
        protected $debug;
 
+       /**
+        * @var LoggerInterface $logger
+        */
+       protected $logger;
+
        /**
         * @param callable $debug Optional debug output handler
         */
@@ -42,13 +47,9 @@ class JobRunner implements LoggerAwareInterface {
                $this->debug = $debug;
        }
 
-       /**
-        * @var LoggerInterface $logger
-        */
-       protected $logger;
-
        /**
         * @param LoggerInterface $logger
+        * @return void
         */
        public function setLogger( LoggerInterface $logger ) {
                $this->logger = $logger;
@@ -183,7 +184,7 @@ class JobRunner implements LoggerAwareInterface {
                                        ++$jobsRun;
                                        $status = $job->run();
                                        $error = $job->getLastError();
-                                       wfGetLBFactory()->commitMasterChanges();
+                                       $this->commitMasterChanges( $job );
                                } catch ( Exception $e ) {
                                        MWExceptionHandler::rollbackMasterChangesAndLog( $e );
                                        $status = false;
@@ -304,7 +305,6 @@ class JobRunner implements LoggerAwareInterface {
         * @return array Map of (job type => backoff expiry timestamp)
         */
        private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
-
                $file = wfTempDir() . '/mw-runJobs-backoffs.json';
                if ( is_file( $file ) ) {
                        $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
@@ -342,7 +342,6 @@ class JobRunner implements LoggerAwareInterface {
         * @return array The new backoffs account for $backoffs and the latest file data
         */
        private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
-
                if ( !$deltas ) {
                        return $this->loadBackoffs( $backoffs, $mode );
                }
@@ -409,4 +408,64 @@ class JobRunner implements LoggerAwareInterface {
                        call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) );
                }
        }
+
+       /**
+        * Commit any DB master changes from a job on all load balancers
+        *
+        * @param Job $job
+        * @throws DBError
+        */
+       private function commitMasterChanges( Job $job ) {
+               global $wgJobSerialCommitThreshold;
+
+               $lb = wfGetLB( wfWikiID() );
+               if ( $wgJobSerialCommitThreshold !== false ) {
+                       // Generally, there is one master connection to the local DB
+                       $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
+               } else {
+                       $dbwSerial = false;
+               }
+
+               if ( !$dbwSerial
+                       || !$dbwSerial->namedLocksEnqueue()
+                       || $dbwSerial->pendingWriteQueryDuration() < $wgJobSerialCommitThreshold
+               ) {
+                       // Writes are all to foreign DBs, named locks don't form queues,
+                       // or $wgJobSerialCommitThreshold is not reached; commit changes now
+                       wfGetLBFactory()->commitMasterChanges();
+                       return;
+               }
+
+               $ms = intval( 1000 * $dbwSerial->pendingWriteQueryDuration() );
+               $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]";
+               $this->logger->info( $msg );
+               $this->debugCallback( $msg );
+
+               // Wait for an exclusive lock to commit
+               if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 30 ) ) {
+                       // This will trigger a rollback in the main loop
+                       throw new DBError( $dbwSerial, "Timed out waiting on commit queue." );
+               }
+               // Wait for the generic slave to catch up
+               $pos = $lb->getMasterPos();
+               if ( $pos ) {
+                       $lb->waitForOne( $pos );
+               }
+
+               // Re-ping all masters with transactions. This throws DBError if some
+               // connection died while waiting on locks/slaves, triggering a rollback.
+               wfGetLBFactory()->forEachLB( function( LoadBalancer $lb ) {
+                       $lb->forEachOpenConnection( function( DatabaseBase $conn ) {
+                               if ( $conn->writesOrCallbacksPending() ) {
+                                       $conn->query( "SELECT 1" );
+                               }
+                       } );
+               } );
+
+               // Actually commit the DB master changes
+               wfGetLBFactory()->commitMasterChanges();
+
+               // Release the lock
+               $dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ );
+       }
 }