Convert JobRunner to using beginMasterChanges()
authorAaron Schulz <aschulz@wikimedia.org>
Tue, 6 Sep 2016 22:25:53 +0000 (15:25 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Wed, 7 Sep 2016 03:56:37 +0000 (03:56 +0000)
This lets the runJobs.php $wgCommandLineMode hack be removed.

Some fixes based on unit tests:
* Only call applyTransactionRoundFlags() for master connections
  for transaction rounds from beginMasterChanges().
* Also cleaned up the commitAndWaitForReplication() reset logic.
* Removed deprecated DataUpdate::doUpdate() calls from jobs
  since they cannot nest in a transaction round.

Change-Id: Ia9b91f539dc11a5c05bdac4bcd99d6615c4dc48d

includes/db/loadbalancer/LBFactory.php
includes/db/loadbalancer/LoadBalancer.php
includes/jobqueue/JobRunner.php
includes/jobqueue/jobs/DeleteLinksJob.php
includes/jobqueue/jobs/RefreshLinksJob.php
maintenance/runJobs.php

index 2d91bb7..aba3fb8 100644 (file)
@@ -537,12 +537,13 @@ abstract class LBFactory implements DestructibleService {
                        return;
                }
 
-               $fnameEffective = $fname;
                // The transaction owner and any caller with the empty transaction ticket can commit
                // so that getEmptyTransactionTicket() callers don't risk seeing DBTransactionError.
                if ( $this->trxRoundId !== false && $fname !== $this->trxRoundId ) {
                        $this->trxLogger->info( "$fname: committing on behalf of {$this->trxRoundId}." );
                        $fnameEffective = $this->trxRoundId;
+               } else {
+                       $fnameEffective = $fname;
                }
 
                $this->commitMasterChanges( $fnameEffective );
@@ -550,7 +551,7 @@ abstract class LBFactory implements DestructibleService {
                // If a nested caller committed on behalf of $fname, start another empty $fname
                // transaction, leaving the caller with the same empty transaction state as before.
                if ( $fnameEffective !== $fname ) {
-                       $this->beginMasterChanges( $fname );
+                       $this->beginMasterChanges( $fnameEffective );
                }
        }
 
index 71286a9..cb22c36 100644 (file)
@@ -864,11 +864,11 @@ class LoadBalancer {
                        $this->getLazyConnectionRef( DB_MASTER, [], $db->getWikiID() )
                );
                $db->setTransactionProfiler( $this->trxProfiler );
-               if ( $this->trxRoundId !== false ) {
-                       $this->applyTransactionRoundFlags( $db );
-               }
 
                if ( $server['serverIndex'] === $this->getWriterIndex() ) {
+                       if ( $this->trxRoundId !== false ) {
+                               $this->applyTransactionRoundFlags( $db );
+                       }
                        foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
                                $db->setTransactionListener( $name, $callback );
                        }
index dfaf972..7a6b5fc 100644 (file)
@@ -181,7 +181,7 @@ class JobRunner implements LoggerAwareInterface {
                                        $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
                                }
 
-                               $info = $this->executeJob( $job, $stats, $popTime );
+                               $info = $this->executeJob( $job, $lbFactory, $stats, $popTime );
                                if ( $info['status'] !== false || !$job->allowRetries() ) {
                                        $group->ack( $job ); // succeeded or job cannot be retried
                                        $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes
@@ -254,27 +254,28 @@ class JobRunner implements LoggerAwareInterface {
 
        /**
         * @param Job $job
+        * @param LBFactory $lbFactory
         * @param StatsdDataFactory $stats
         * @param float $popTime
         * @return array Map of status/error/timeMs
         */
-       private function executeJob( Job $job, $stats, $popTime ) {
+       private function executeJob( Job $job, LBFactory $lbFactory, $stats, $popTime ) {
                $jType = $job->getType();
                $msg = $job->toString() . " STARTING";
                $this->logger->debug( $msg );
                $this->debugCallback( $msg );
-               $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
 
                // Run the job...
                $rssStart = $this->getMaxRssKb();
                $jobStartTime = microtime( true );
                try {
+                       $fnameTrxOwner = get_class( $job ) . '::run'; // give run() outer scope
+                       $lbFactory->beginMasterChanges( $fnameTrxOwner );
                        $status = $job->run();
                        $error = $job->getLastError();
-                       $this->commitMasterChanges( $lbFactory, $job );
-
+                       $this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner );
+                       // Run any deferred update tasks; doUpdates() manages transactions itself
                        DeferredUpdates::doUpdates();
-                       $this->commitMasterChanges( $lbFactory, $job );
                } catch ( Exception $e ) {
                        MWExceptionHandler::rollbackMasterChangesAndLog( $e );
                        $status = false;
@@ -497,9 +498,10 @@ class JobRunner implements LoggerAwareInterface {
         *
         * @param LBFactory $lbFactory
         * @param Job $job
+        * @param string $fnameTrxOwner
         * @throws DBError
         */
-       private function commitMasterChanges( LBFactory $lbFactory, Job $job ) {
+       private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) {
                global $wgJobSerialCommitThreshold;
 
                $lb = $lbFactory->getMainLB( wfWikiID() );
@@ -521,7 +523,7 @@ class JobRunner implements LoggerAwareInterface {
                }
 
                if ( !$dbwSerial ) {
-                       $lbFactory->commitMasterChanges( __METHOD__ );
+                       $lbFactory->commitMasterChanges( $fnameTrxOwner );
                        return;
                }
 
@@ -542,7 +544,7 @@ class JobRunner implements LoggerAwareInterface {
                }
 
                // Actually commit the DB master changes
-               $lbFactory->commitMasterChanges( __METHOD__ );
+               $lbFactory->commitMasterChanges( $fnameTrxOwner );
 
                // Release the lock
                $dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ );
index 8d565bd..5c0f89f 100644 (file)
@@ -59,7 +59,7 @@ class DeleteLinksJob extends Job {
 
                $update = new LinksDeletionUpdate( $page, $pageId, $timestamp );
                $update->setTransactionTicket( $factory->getEmptyTransactionTicket( __METHOD__ ) );
-               DataUpdate::runUpdates( [ $update ] );
+               $update->doUpdate();
 
                return true;
        }
index b0dcd57..a337da4 100644 (file)
@@ -263,7 +263,9 @@ class RefreshLinksJob extends Job {
                        }
                }
 
-               DataUpdate::runUpdates( $updates );
+               foreach ( $updates as $update ) {
+                       $update->doUpdate();
+               }
 
                InfoAction::invalidateCache( $title );
 
index c3c2391..2e011fe 100644 (file)
@@ -53,8 +53,6 @@ class RunJobs extends Maintenance {
        }
 
        public function execute() {
-               global $wgCommandLineMode;
-
                if ( $this->hasOption( 'procs' ) ) {
                        $procs = intval( $this->getOption( 'procs' ) );
                        if ( $procs < 1 || $procs > 1000 ) {
@@ -70,10 +68,6 @@ class RunJobs extends Maintenance {
                $outputJSON = ( $this->getOption( 'result' ) === 'json' );
                $wait = $this->hasOption( 'wait' );
 
-               // Enable DBO_TRX for atomicity; JobRunner manages transactions
-               // and works well in web server mode already (@TODO: this is a hack)
-               $wgCommandLineMode = false;
-
                $runner = new JobRunner( LoggerFactory::getInstance( 'runJobs' ) );
                if ( !$outputJSON ) {
                        $runner->setDebugHandler( [ $this, 'debugInternal' ] );
@@ -111,8 +105,6 @@ class RunJobs extends Maintenance {
 
                        sleep( 1 );
                }
-
-               $wgCommandLineMode = true;
        }
 
        /**