Make JobRunner flush DeferredUpdates after each job
authorAaron Schulz <aschulz@wikimedia.org>
Mon, 18 May 2015 21:20:35 +0000 (14:20 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Tue, 19 May 2015 01:52:34 +0000 (18:52 -0700)
Change-Id: Iff6625ddc04a15751d2bb07dc6558145e7ceb14a

includes/deferred/DeferredUpdates.php
includes/jobqueue/JobRunner.php
tests/phpunit/includes/deferred/DeferredUpdatesTest.php

index 42816dd..29bb8d7 100644 (file)
@@ -34,13 +34,17 @@ interface DeferrableUpdate {
 }
 
 /**
- * Class for managing the deferred updates.
+ * Class for managing the deferred updates
+ *
+ * Deferred updates can be run at the end of the request,
+ * after the HTTP response has been sent. In CLI mode, updates
+ * are only deferred until there is no local master DB transaction.
  *
  * @since 1.19
  */
 class DeferredUpdates {
        /**
-        * Store of updates to be deferred until the end of the request.
+        * @var array Updates to be deferred until the end of the request.
         */
        private static $updates = array();
 
@@ -49,7 +53,28 @@ class DeferredUpdates {
         * @param DeferrableUpdate $update Some object that implements doUpdate()
         */
        public static function addUpdate( DeferrableUpdate $update ) {
+               global $wgCommandLineMode;
+
                array_push( self::$updates, $update );
+
+               // CLI scripts may forget to periodically flush these updates,
+               // so try to handle that rather than OOMing and losing them.
+               // Try to run the updates as soon as there is no local transaction.
+               static $waitingOnTrx = false; // de-duplicate callback
+               if ( $wgCommandLineMode && !$waitingOnTrx ) {
+                       $lb = wfGetLB();
+                       $dbw = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
+                       // Do the update as soon as there is no transaction
+                       if ( $dbw->trxLevel() ) {
+                               $waitingOnTrx = true;
+                               $dbw->onTransactionIdle( function() use ( &$waitingOnTrx ) {
+                                       DeferredUpdates::doUpdates();
+                                       $waitingOnTrx = false;
+                               } );
+                       } else {
+                               self::doUpdates();
+                       }
+               }
        }
 
        /**
@@ -84,19 +109,7 @@ class DeferredUpdates {
 
                $updates = array_merge( $wgDeferredUpdateList, self::$updates );
 
-               // No need to get master connections in case of empty updates array
-               if ( !count( $updates ) ) {
-
-                       return;
-               }
-
-               $dbw = false;
-               $doCommit = $commit == 'commit';
-               if ( $doCommit ) {
-                       $dbw = wfGetDB( DB_MASTER );
-               }
-
-               while ( $updates ) {
+               while ( count( $updates ) ) {
                        self::clearPendingUpdates();
 
                        /** @var DeferrableUpdate $update */
@@ -104,8 +117,8 @@ class DeferredUpdates {
                                try {
                                        $update->doUpdate();
 
-                                       if ( $doCommit && $dbw->trxLevel() ) {
-                                               $dbw->commit( __METHOD__, 'flush' );
+                                       if ( $commit === 'commit' ) {
+                                               wfGetLBFactory()->commitMasterChanges();
                                        }
                                } catch ( Exception $e ) {
                                        // We don't want exceptions thrown during deferred updates to
@@ -116,9 +129,9 @@ class DeferredUpdates {
                                        }
                                }
                        }
+
                        $updates = array_merge( $wgDeferredUpdateList, self::$updates );
                }
-
        }
 
        /**
index 6bf33aa..b04ab28 100644 (file)
@@ -181,6 +181,9 @@ class JobRunner implements LoggerAwareInterface {
                                        $status = $job->run();
                                        $error = $job->getLastError();
                                        $this->commitMasterChanges( $job );
+
+                                       DeferredUpdates::doUpdates();
+                                       $this->commitMasterChanges( $job );
                                } catch ( Exception $e ) {
                                        MWExceptionHandler::rollbackMasterChangesAndLog( $e );
                                        $status = false;
index 5348c85..df4213a 100644 (file)
@@ -1,8 +1,9 @@
 <?php
 
 class DeferredUpdatesTest extends MediaWikiTestCase {
+       public function testDoUpdatesWeb() {
+               $this->setMwGlobals( 'wgCommandLineMode', false );
 
-       public function testDoUpdates() {
                $updates = array(
                        '1' => 'deferred update 1',
                        '2' => 'deferred update 2',
@@ -35,4 +36,38 @@ class DeferredUpdatesTest extends MediaWikiTestCase {
                DeferredUpdates::doUpdates();
        }
 
+       public function testDoUpdatesCLI() {
+               $this->setMwGlobals( 'wgCommandLineMode', true );
+
+               $updates = array(
+                       '1' => 'deferred update 1',
+                       '2' => 'deferred update 2',
+                       '2-1' => 'deferred update 1 within deferred update 2',
+                       '3' => 'deferred update 3',
+               );
+               DeferredUpdates::addCallableUpdate(
+                       function () use ( $updates ) {
+                               echo $updates['1'];
+                       }
+               );
+               DeferredUpdates::addCallableUpdate(
+                       function () use ( $updates ) {
+                               echo $updates['2'];
+                               DeferredUpdates::addCallableUpdate(
+                                       function () use ( $updates ) {
+                                               echo $updates['2-1'];
+                                       }
+                               );
+                       }
+               );
+               DeferredUpdates::addCallableUpdate(
+                       function () use ( $updates ) {
+                               echo $updates[3];
+                       }
+               );
+
+               $this->expectOutputString( implode( '', $updates ) );
+
+               DeferredUpdates::doUpdates();
+       }
 }