Merge "Change "slave" => "replica DB" in /includes"
[lhc/web/wiklou.git] / includes / jobqueue / JobRunner.php
index 4b906a7..134ba9d 100644 (file)
@@ -21,6 +21,7 @@
  * @ingroup JobQueue
  */
 
+use MediaWiki\MediaWikiServices;
 use MediaWiki\Logger\LoggerFactory;
 use Psr\Log\LoggerAwareInterface;
 use Psr\Log\LoggerInterface;
@@ -41,7 +42,7 @@ class JobRunner implements LoggerAwareInterface {
        protected $logger;
 
        const MAX_ALLOWED_LAG = 3; // abort if more than this much DB lag is present
-       const LAG_CHECK_PERIOD = 1.0; // check slave lag this many seconds
+       const LAG_CHECK_PERIOD = 1.0; // check replica DB lag this many seconds
        const ERROR_BACKOFF_TTL = 1; // seconds to back off a queue due to errors
 
        /**
@@ -122,9 +123,10 @@ class JobRunner implements LoggerAwareInterface {
                }
 
                // Flush any pending DB writes for sanity
-               wfGetLBFactory()->commitAll( __METHOD__ );
+               $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+               $lbFactory->commitAll( __METHOD__ );
 
-               // Catch huge single updates that lead to slave lag
+               // Catch huge single updates that lead to replica DB lag
                $trxProfiler = Profiler::instance()->getTransactionProfiler();
                $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
                $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
@@ -139,7 +141,7 @@ class JobRunner implements LoggerAwareInterface {
                $jobsPopped = 0;
                $timeMsTotal = 0;
                $startTime = microtime( true ); // time since jobs started running
-               $lastCheckTime = 1; // timestamp of last slave check
+               $lastCheckTime = 1; // timestamp of last replica DB check
                do {
                        // Sync the persistent backoffs with concurrent runners
                        $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
@@ -176,9 +178,11 @@ class JobRunner implements LoggerAwareInterface {
                                        $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
                                }
 
+                               $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes
                                $info = $this->executeJob( $job, $stats, $popTime );
                                if ( $info['status'] !== false || !$job->allowRetries() ) {
                                        $group->ack( $job ); // succeeded or job cannot be retried
+                                       $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes
                                }
 
                                // Back off of certain jobs for a while (for throttling and for errors)
@@ -206,13 +210,13 @@ class JobRunner implements LoggerAwareInterface {
                                        break;
                                }
 
-                               // Don't let any of the main DB slaves get backed up.
+                               // Don't let any of the main DB replica DBs get backed up.
                                // This only waits for so long before exiting and letting
                                // other wikis in the farm (on different masters) get a chance.
                                $timePassed = microtime( true ) - $lastCheckTime;
                                if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
                                        try {
-                                               wfGetLBFactory()->waitForReplication( [
+                                               $lbFactory->waitForReplication( [
                                                        'ifWritesSince' => $lastCheckTime,
                                                        'timeout' => self::MAX_ALLOWED_LAG
                                                ] );
@@ -222,7 +226,7 @@ class JobRunner implements LoggerAwareInterface {
                                        }
                                        $lastCheckTime = microtime( true );
                                }
-                               // Don't let any queue slaves/backups fall behind
+                               // Don't let any queue replica DBs/backups fall behind
                                if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) {
                                        $group->waitForBackups();
                                }
@@ -257,6 +261,7 @@ class JobRunner implements LoggerAwareInterface {
                $msg = $job->toString() . " STARTING";
                $this->logger->debug( $msg );
                $this->debugCallback( $msg );
+               $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
 
                // Run the job...
                $rssStart = $this->getMaxRssKb();
@@ -283,8 +288,8 @@ class JobRunner implements LoggerAwareInterface {
 
                // Commit all outstanding connections that are in a transaction
                // to get a fresh repeatable read snapshot on every connection.
-               // Note that jobs are still responsible for handling slave lag.
-               wfGetLBFactory()->commitAll( __METHOD__ );
+               // Note that jobs are still responsible for handling replica DB lag.
+               $lbFactory->flushReplicaSnapshots( __METHOD__ );
                // Clear out title cache data from prior snapshots
                LinkCache::singleton()->clear();
                $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
@@ -485,7 +490,7 @@ class JobRunner implements LoggerAwareInterface {
        /**
         * Issue a commit on all masters who are currently in a transaction and have
         * made changes to the database. It also supports sometimes waiting for the
-        * local wiki's slaves to catch up. See the documentation for
+        * local wiki's replica DBs to catch up. See the documentation for
         * $wgJobSerialCommitThreshold for more.
         *
         * @param Job $job
@@ -498,16 +503,21 @@ class JobRunner implements LoggerAwareInterface {
                if ( $wgJobSerialCommitThreshold !== false && $lb->getServerCount() > 1 ) {
                        // Generally, there is one master connection to the local DB
                        $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
+                       // We need natively blocking fast locks
+                       if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
+                               $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
+                               if ( $time < $wgJobSerialCommitThreshold ) {
+                                       $dbwSerial = false;
+                               }
+                       } else {
+                               $dbwSerial = false;
+                       }
                } else {
+                       // There are no replica DBs or writes are all to foreign DB (we don't handle that)
                        $dbwSerial = false;
                }
 
-               if ( !$dbwSerial
-                       || !$dbwSerial->namedLocksEnqueue()
-                       || $dbwSerial->pendingWriteQueryDuration() < $wgJobSerialCommitThreshold
-               ) {
-                       // Writes are all to foreign DBs, named locks don't form queues,
-                       // or $wgJobSerialCommitThreshold is not reached; commit changes now
+               if ( !$dbwSerial ) {
                        wfGetLBFactory()->commitMasterChanges( __METHOD__ );
                        return;
                }
@@ -522,23 +532,12 @@ class JobRunner implements LoggerAwareInterface {
                        // This will trigger a rollback in the main loop
                        throw new DBError( $dbwSerial, "Timed out waiting on commit queue." );
                }
-               // Wait for the slave DBs to catch up
+               // Wait for the replica DBs to catch up
                $pos = $lb->getMasterPos();
                if ( $pos ) {
                        $lb->waitForAll( $pos );
                }
 
-               $fname = __METHOD__;
-               // Re-ping all masters with transactions. This throws DBError if some
-               // connection died while waiting on locks/slaves, triggering a rollback.
-               wfGetLBFactory()->forEachLB( function( LoadBalancer $lb ) use ( $fname ) {
-                       $lb->forEachOpenConnection( function( IDatabase $conn ) use ( $fname ) {
-                               if ( $conn->writesOrCallbacksPending() ) {
-                                       $conn->ping();
-                               }
-                       } );
-               } );
-
                // Actually commit the DB master changes
                wfGetLBFactory()->commitMasterChanges( __METHOD__ );