* Submit jobs in batches and wait for each batch, to avoid overflowing the queue
authorTim Starling <tstarling@users.mediawiki.org>
Thu, 26 Feb 2009 08:04:28 +0000 (08:04 +0000)
committerTim Starling <tstarling@users.mediawiki.org>
Thu, 26 Feb 2009 08:04:28 +0000 (08:04 +0000)
* Report success before running exec() so that the client doesn't wait forever

maintenance/gearman/gearman.inc
maintenance/gearman/gearmanRefreshLinks.php
maintenance/gearman/gearmanWorker.php

index cafef90..0678b82 100644 (file)
@@ -4,12 +4,20 @@ require( 'Net/Gearman/Client.php' );
 require( 'Net/Gearman/Worker.php' );
 
 class MWGearmanJob extends Net_Gearman_Job_Common {
-       function switchWiki( $wiki, $jobParams ) {
+       function switchWiki( $wiki, $params ) {
                echo "Switching to $wiki\n";
+
+               # Pretend that we have completed it right now, because the new process won't do it
+               $this->complete( array( 'result' => true ) );
+               socket_close( $this->conn );
+
+               # Find PHP
                $php = readlink( '/proc/' . posix_getpid() . '/exe' );
+               
+               # Run the worker script
                $args = array( $_SERVER['PHP_SELF'], 
                        '--wiki', $wiki,
-                       '--fake-job', serialize( $jobParams ) );
+                       '--fake-job', serialize( $params ) );
                $args = array_merge( $args, $GLOBALS['args'] );
                pcntl_exec( $php, $args, $_ENV );
                echo "Error running exec\n";
@@ -19,7 +27,7 @@ class MWGearmanJob extends Net_Gearman_Job_Common {
                if ( wfWikiID() !== $params['wiki'] ) {
                        $this->switchWiki( $params['wiki'], $params );
                }
-               self::runNoSwitch( $params );
+               return self::runNoSwitch( $params );
        }
 
        static function runNoSwitch( $params ) {
index 7f93eca..eb3104e 100644 (file)
@@ -9,17 +9,37 @@ if ( !$args ) {
        $args = array( 'localhost' );
 }
 $client = new Net_Gearman_Client( $args );
+$batchSize = 1000;
 
 $dbr = wfGetDB( DB_SLAVE );
-$res = $dbr->select( 'page', array( 'page_namespace', 'page_title' ), false, __METHOD__ );
-foreach ( $res as $row ) {
-       $title = Title::makeTitle( $row->page_namespace, $row->page_title );
-       $params = array(
-               'wiki' => wfWikiID(),
-               'title' => $title->getPrefixedDBkey(),
-               'command' => 'refreshLinks',
-               'params' => false,
+$startId = 0;
+$endId = $dbr->selectField( 'page', 'MAX(page_id)', false, __METHOD__ );
+while ( true ) {
+       $res = $dbr->select( 
+               'page', 
+               array( 'page_namespace', 'page_title', 'page_id' ),
+               array( 'page_id > ' . intval( $startId ) ), 
+               __METHOD__,
+               array( 'LIMIT' => $batchSize ) 
        );
-       $client->mw_job( $params );
+
+       if ( $res->numRows() == 0 ) {
+               break;
+       }
+       $set = new Net_Gearman_Set;
+       foreach ( $res as $row ) {
+               $startId = $row->page_id;
+               $title = Title::makeTitle( $row->page_namespace, $row->page_title );
+               $params = array(
+                       'wiki' => wfWikiID(),
+                       'title' => $title->getPrefixedDBkey(),
+                       'command' => 'refreshLinks',
+                       'params' => false,
+               );
+               $task = new Net_Gearman_Task( 'mw_job', $params );
+               $set->addTask( $task );
+       }
+       $client->runSet( $set );
+       print "$startId / $endId\n";
 }
 
index 711ec4a..f800b67 100644 (file)
@@ -28,5 +28,15 @@ if ( isset( $options['fake-job'] ) ) {
 
 $worker = new NonScaryGearmanWorker( $args );
 $worker->addAbility( 'mw_job' );
-$worker->beginWork();
+$worker->beginWork( 'wfGearmanMonitor' );
 
+function wfGearmanMonitor( $idle, $lastJob ) {
+       static $lastSleep = 0;
+       $interval = 5;
+       $now = time();
+       if ( $now - $lastSleep >= $interval ) {
+               wfWaitForSlaves( $interval );
+               $lastSleep = $now;
+       }
+       return false;
+}