[JobQueue] Job queue refactoring and generalizing.
authorASchulz <aschulz@wikimedia.org>
Wed, 29 Aug 2012 00:01:31 +0000 (17:01 -0700)
committerTim Starling <tstarling@wikimedia.org>
Mon, 15 Oct 2012 22:39:58 +0000 (09:39 +1100)
* Added support for different queue types and methods for storing queues.
* Treat each job type as being on its own queue, at least logically.
* Added $wgJobTypeConf to configure queue types for each job type.
* Improved the job DB table so that duplicate job checks actually work
  and are faster. Also improved the method for popping rows of the table.
* Disabled duplicate job removal for everything except refreshLinks.
  The DELETE statements just add DB overhead and are not useful for cheap
  jobs, especially ones with start/end params (which are unlikely to have
  exact duplicates).

Change-Id: I49824c7fa855fea4ddcac5c9901ece8c2c0101d0

14 files changed:
includes/AutoLoader.php
includes/DefaultSettings.php
includes/Wiki.php
includes/installer/MysqlUpdater.php
includes/installer/SqliteUpdater.php
includes/job/Job.php
includes/job/JobQueue.php [new file with mode: 0644]
includes/job/JobQueueDB.php [new file with mode: 0644]
includes/job/JobQueueGroup.php [new file with mode: 0644]
includes/job/RefreshLinksJob.php
maintenance/archives/patch-job_token.sql [new file with mode: 0644]
maintenance/runJobs.php
maintenance/sqlite/archives/patch-job_token.sql [new file with mode: 0644]
maintenance/tables.sql

index 3ae5a68..338c424 100644 (file)
@@ -655,6 +655,9 @@ $wgAutoloadLocalClasses = array(
        'EmaillingJob' => 'includes/job/EmaillingJob.php',
        'EnotifNotifyJob' => 'includes/job/EnotifNotifyJob.php',
        'Job' => 'includes/job/Job.php',
+       'JobQueue' => 'includes/job/JobQueue.php',
+       'JobQueueDB' => 'includes/job/JobQueueDB.php',
+       'JobQueueGroup' => 'includes/job/JobQueueGroup.php',
        'RefreshLinksJob' => 'includes/job/RefreshLinksJob.php',
        'RefreshLinksJob2' => 'includes/job/RefreshLinksJob.php',
        'UploadFromUrlJob' => 'includes/job/UploadFromUrlJob.php',
index 0f02efc..6122af0 100644 (file)
@@ -5403,6 +5403,14 @@ $wgJobClasses = array(
  */
 $wgJobTypesExcludedFromDefaultQueue = array();
 
+/**
+ * Map of job types to configuration arrays.
+ * These settings should be global to all wikis.
+ */
+$wgJobTypeConf = array(
+       'default' => array( 'class' => 'JobQueueDB' ),
+);
+
 /**
  * Additional functions to be performed with updateSpecialPages.
  * Expensive Querypages are already updated.
index e6ccbe5..ed02a3d 100644 (file)
@@ -596,28 +596,51 @@ class MediaWiki {
                if ( $wgJobRunRate <= 0 || wfReadOnly() ) {
                        return;
                }
+
                if ( $wgJobRunRate < 1 ) {
                        $max = mt_getrandmax();
                        if ( mt_rand( 0, $max ) > $max * $wgJobRunRate ) {
-                               return;
+                               return; // the higher $wgJobRunRate, the less likely we return here
                        }
                        $n = 1;
                } else {
                        $n = intval( $wgJobRunRate );
                }
 
-               while ( $n-- && false != ( $job = Job::pop() ) ) {
-                       $output = $job->toString() . "\n";
-                       $t = - microtime( true );
-                       $success = $job->run();
-                       $t += microtime( true );
-                       $t = round( $t * 1000 );
-                       if ( !$success ) {
-                               $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n";
-                       } else {
-                               $output .= "Success, Time: $t ms\n";
+               $group = JobQueueGroup::singleton();
+               $types = $group->getDefaultQueueTypes();
+               shuffle( $types ); // avoid starvation
+
+               // Scan the queues for a job N times...
+               do {
+                       $jobFound = false; // found a job in any queue?
+                       // Find a queue with a job on it and run it...
+                       foreach ( $types as $i => $type ) {
+                               $queue = $group->get( $type );
+                               if ( $queue->isEmpty() ) {
+                                       unset( $types[$i] ); // don't keep checking this queue
+                                       continue;
+                               }
+                               $job = $queue->pop();
+                               if ( $job ) {
+                                       $jobFound = true;
+                                       $output = $job->toString() . "\n";
+                                       $t = - microtime( true );
+                                       $success = $job->run();
+                                       $queue->ack( $job ); // done
+                                       $t += microtime( true );
+                                       $t = round( $t * 1000 );
+                                       if ( !$success ) {
+                                               $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n";
+                                       } else {
+                                               $output .= "Success, Time: $t ms\n";
+                                       }
+                                       wfDebugLog( 'jobqueue', $output );
+                                       break;
+                               } else {
+                                       unset( $types[$i] ); // don't keep checking this queue
+                               }
                        }
-                       wfDebugLog( 'jobqueue', $output );
-               }
+               } while ( --$n && $jobFound );
        }
 }
index a6cb13f..82de913 100644 (file)
@@ -225,6 +225,7 @@ class MysqlUpdater extends DatabaseUpdater {
                        array( 'dropField', 'recentchanges', 'rc_moved_to_title',            'patch-rc_moved.sql' ),
                        array( 'addTable', 'sites',                            'patch-sites.sql' ),
                        array( 'addField', 'filearchive',   'fa_sha1',          'patch-fa_sha1.sql' ),
+                       array( 'addField', 'job',           'job_token',         'patch-job_token.sql' ),
                );
        }
 
index e7f3939..c3f7a81 100644 (file)
@@ -105,6 +105,7 @@ class SqliteUpdater extends DatabaseUpdater {
                        array( 'dropField', 'recentchanges', 'rc_moved_to_title', 'patch-rc_moved.sql' ),
                        array( 'addTable', 'sites',                            'patch-sites.sql' ),
                        array( 'addField', 'filearchive',   'fa_sha1',          'patch-fa_sha1.sql' ),
+                       array( 'addField', 'job',           'job_token',         'patch-job_token.sql' ),
                );
        }
 
index 270671e..d11446e 100644 (file)
 
 /**
  * Class to both describe a background job and handle jobs.
+ * This queue aspects of this class are now deprecated.
  *
  * @ingroup JobQueue
  */
 abstract class Job {
-
        /**
         * @var Title
         */
@@ -47,172 +47,12 @@ abstract class Job {
         * Run the job
         * @return boolean success
         */
-       abstract function run();
+       abstract public function run();
 
        /*-------------------------------------------------------------------------
         * Static functions
         *------------------------------------------------------------------------*/
 
-       /**
-        * Pop a job of a certain type.  This tries less hard than pop() to
-        * actually find a job; it may be adversely affected by concurrent job
-        * runners.
-        *
-        * @param $type string
-        *
-        * @return Job
-        */
-       static function pop_type( $type ) {
-               wfProfilein( __METHOD__ );
-
-               $dbw = wfGetDB( DB_MASTER );
-
-               $dbw->begin( __METHOD__ );
-
-               $row = $dbw->selectRow(
-                       'job',
-                       '*',
-                       array( 'job_cmd' => $type ),
-                       __METHOD__,
-                       array( 'LIMIT' => 1, 'FOR UPDATE' )
-               );
-
-               if ( $row === false ) {
-                       $dbw->commit( __METHOD__ );
-                       wfProfileOut( __METHOD__ );
-                       return false;
-               }
-
-               /* Ensure we "own" this row */
-               $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
-               $affected = $dbw->affectedRows();
-               $dbw->commit( __METHOD__ );
-
-               if ( $affected == 0 ) {
-                       wfProfileOut( __METHOD__ );
-                       return false;
-               }
-
-               wfIncrStats( 'job-pop' );
-               $namespace = $row->job_namespace;
-               $dbkey = $row->job_title;
-               $title = Title::makeTitleSafe( $namespace, $dbkey );
-               $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ),
-                       $row->job_id );
-
-               $job->removeDuplicates();
-
-               wfProfileOut( __METHOD__ );
-               return $job;
-       }
-
-       /**
-        * Pop a job off the front of the queue
-        *
-        * @param $offset Integer: Number of jobs to skip
-        * @return Job or false if there's no jobs
-        */
-       static function pop( $offset = 0 ) {
-               wfProfileIn( __METHOD__ );
-
-               $dbr = wfGetDB( DB_SLAVE );
-
-               /* Get a job from the slave, start with an offset,
-                       scan full set afterwards, avoid hitting purged rows
-
-                       NB: If random fetch previously was used, offset
-                               will always be ahead of few entries
-               */
-
-               $conditions = self::defaultQueueConditions();
-
-               $offset = intval( $offset );
-               $options = array( 'ORDER BY' => 'job_id', 'USE INDEX' => 'PRIMARY' );
-
-               $row = $dbr->selectRow( 'job', '*',
-                       array_merge( $conditions, array( "job_id >= $offset" ) ),
-                       __METHOD__,
-                       $options
-               );
-
-               // Refetching without offset is needed as some of job IDs could have had delayed commits
-               // and have lower IDs than jobs already executed, blame concurrency :)
-               //
-               if ( $row === false ) {
-                       if ( $offset != 0 ) {
-                               $row = $dbr->selectRow( 'job', '*', $conditions, __METHOD__, $options );
-                       }
-
-                       if ( $row === false ) {
-                               wfProfileOut( __METHOD__ );
-                               return false;
-                       }
-               }
-
-               // Try to delete it from the master
-               $dbw = wfGetDB( DB_MASTER );
-               $dbw->begin( __METHOD__ );
-               $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
-               $affected = $dbw->affectedRows();
-               $dbw->commit( __METHOD__ );
-
-               if ( !$affected ) {
-                       $dbw->begin( __METHOD__ );
-
-                       // Failed, someone else beat us to it
-                       // Try getting a random row
-                       $row = $dbw->selectRow( 'job', array( 'minjob' => 'MIN(job_id)',
-                               'maxjob' => 'MAX(job_id)' ), '1=1', __METHOD__ );
-                       if ( $row === false || is_null( $row->minjob ) || is_null( $row->maxjob ) ) {
-                               // No jobs to get
-                               $dbw->rollback( __METHOD__ );
-                               wfProfileOut( __METHOD__ );
-                               return false;
-                       }
-                       // Get the random row
-                       $row = $dbw->selectRow( 'job', '*',
-                               'job_id >= ' . mt_rand( $row->minjob, $row->maxjob ), __METHOD__ );
-                       if ( $row === false ) {
-                               // Random job gone before we got the chance to select it
-                               // Give up
-                               $dbw->rollback( __METHOD__ );
-                               wfProfileOut( __METHOD__ );
-                               return false;
-                       }
-                       // Delete the random row
-                       $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
-                       $affected = $dbw->affectedRows();
-                       $dbw->commit( __METHOD__ );
-
-                       if ( !$affected ) {
-                               // Random job gone before we exclusively deleted it
-                               // Give up
-                               wfProfileOut( __METHOD__ );
-                               return false;
-                       }
-               }
-
-               // If execution got to here, there's a row in $row that has been deleted from the database
-               // by this thread. Hence the concurrent pop was successful.
-               wfIncrStats( 'job-pop' );
-               $namespace = $row->job_namespace;
-               $dbkey = $row->job_title;
-               $title = Title::makeTitleSafe( $namespace, $dbkey );
-
-               if ( is_null( $title ) ) {
-                       wfProfileOut( __METHOD__ );
-                       return false;
-               }
-
-               $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id );
-
-               // Remove any duplicates it may have later in the queue
-               $job->removeDuplicates();
-
-               wfProfileOut( __METHOD__ );
-               return $job;
-       }
-
        /**
         * Create the appropriate object to handle a specific job
         *
@@ -223,7 +63,7 @@ abstract class Job {
         * @throws MWException
         * @return Job
         */
-       static function factory( $command, Title $title, $params = false, $id = 0 ) {
+       public static function factory( $command, Title $title, $params = false, $id = 0 ) {
                global $wgJobClasses;
                if( isset( $wgJobClasses[$command] ) ) {
                        $class = $wgJobClasses[$command];
@@ -232,30 +72,6 @@ abstract class Job {
                throw new MWException( "Invalid job command `{$command}`" );
        }
 
-       /**
-        * @param $params
-        * @return string
-        */
-       static function makeBlob( $params ) {
-               if ( $params !== false ) {
-                       return serialize( $params );
-               } else {
-                       return '';
-               }
-       }
-
-       /**
-        * @param $blob
-        * @return bool|mixed
-        */
-       static function extractBlob( $blob ) {
-               if ( (string)$blob !== '' ) {
-                       return unserialize( $blob );
-               } else {
-                       return false;
-               }
-       }
-
        /**
         * Batch-insert a group of jobs into the queue.
         * This will be wrapped in a transaction with a forced commit.
@@ -264,33 +80,10 @@ abstract class Job {
         * removed later on, when the first one is popped.
         *
         * @param $jobs array of Job objects
+        * @deprecated 1.20
         */
-       static function batchInsert( $jobs ) {
-               if ( !count( $jobs ) ) {
-                       return;
-               }
-               $dbw = wfGetDB( DB_MASTER );
-               $rows = array();
-
-               /**
-                * @var $job Job
-                */
-               foreach ( $jobs as $job ) {
-                       $rows[] = $job->insertFields();
-                       if ( count( $rows ) >= 50 ) {
-                               # Do a small transaction to avoid slave lag
-                               $dbw->begin( __METHOD__ );
-                               $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
-                               $dbw->commit( __METHOD__ );
-                               $rows = array();
-                       }
-               }
-               if ( $rows ) { // last chunk
-                       $dbw->begin( __METHOD__ );
-                       $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
-                       $dbw->commit( __METHOD__ );
-               }
-               wfIncrStats( 'job-insert', count( $jobs ) );
+       public static function batchInsert( $jobs ) {
+               return JobQueueGroup::singleton()->push( $jobs );
        }
 
        /**
@@ -301,45 +94,10 @@ abstract class Job {
         * large batches of jobs can cause slave lag.
         *
         * @param $jobs array of Job objects
+        * @deprecated 1.20
         */
-       static function safeBatchInsert( $jobs ) {
-               if ( !count( $jobs ) ) {
-                       return;
-               }
-               $dbw = wfGetDB( DB_MASTER );
-               $rows = array();
-               foreach ( $jobs as $job ) {
-                       $rows[] = $job->insertFields();
-                       if ( count( $rows ) >= 500 ) {
-                               $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
-                               $rows = array();
-                       }
-               }
-               if ( $rows ) { // last chunk
-                       $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
-               }
-               wfIncrStats( 'job-insert', count( $jobs ) );
-       }
-
-
-       /**
-        * SQL conditions to apply on most JobQueue queries
-        *
-        * Whenever we exclude jobs types from the default queue, we want to make
-        * sure that queries to the job queue actually ignore them.
-        *
-        * @return array SQL conditions suitable for Database:: methods
-        */
-       static function defaultQueueConditions( ) {
-               global $wgJobTypesExcludedFromDefaultQueue;
-               $conditions = array();
-               if ( count( $wgJobTypesExcludedFromDefaultQueue ) > 0 ) {
-                       $dbr = wfGetDB( DB_SLAVE );
-                       foreach ( $wgJobTypesExcludedFromDefaultQueue as $cmdType ) {
-                               $conditions[] = "job_cmd != " . $dbr->addQuotes( $cmdType );
-                       }
-               }
-               return $conditions;
+       public static function safeBatchInsert( $jobs ) {
+               return JobQueueGroup::singleton()->push( $jobs, JobQueue::QoS_Atomic );
        }
 
        /*-------------------------------------------------------------------------
@@ -352,77 +110,63 @@ abstract class Job {
         * @param $params array|bool
         * @param $id int
         */
-       function __construct( $command, $title, $params = false, $id = 0 ) {
+       public function __construct( $command, $title, $params = false, $id = 0 ) {
                $this->command = $command;
                $this->title = $title;
                $this->params = $params;
                $this->id = $id;
 
-               // A bit of premature generalisation
-               // Oh well, the whole class is premature generalisation really
-               $this->removeDuplicates = true;
+               $this->removeDuplicates = false; // expensive jobs may set this to true
        }
 
        /**
-        * Insert a single job into the queue.
-        * @return bool true on success
+        * @return integer May be 0 for jobs stored outside the DB
         */
-       function insert() {
-               $fields = $this->insertFields();
+       public function getId() {
+               return $this->id;
+       }
 
-               $dbw = wfGetDB( DB_MASTER );
+       /**
+        * @return string
+        */
+       public function getType() {
+               return $this->command;
+       }
 
-               if ( $this->removeDuplicates ) {
-                       $res = $dbw->select( 'job', array( '1' ), $fields, __METHOD__ );
-                       if ( $dbw->numRows( $res ) ) {
-                               return true;
-                       }
-               }
-               wfIncrStats( 'job-insert' );
-               return $dbw->insert( 'job', $fields, __METHOD__ );
+       /**
+        * @return Title
+        */
+       public function getTitle() {
+               return $this->title;
        }
 
        /**
         * @return array
         */
-       protected function insertFields() {
-               $dbw = wfGetDB( DB_MASTER );
-               return array(
-                       'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
-                       'job_cmd' => $this->command,
-                       'job_namespace' => $this->title->getNamespace(),
-                       'job_title' => $this->title->getDBkey(),
-                       'job_timestamp' => $dbw->timestamp(),
-                       'job_params' => Job::makeBlob( $this->params )
-               );
+       public function getParams() {
+               return $this->params;
        }
 
        /**
-        * Remove jobs in the job queue which are duplicates of this job.
-        * This is deadlock-prone and so starts its own transaction.
+        * @return bool
         */
-       function removeDuplicates() {
-               if ( !$this->removeDuplicates ) {
-                       return;
-               }
+       public function ignoreDuplicates() {
+               return $this->removeDuplicates;
+       }
 
-               $fields = $this->insertFields();
-               unset( $fields['job_id'] );
-               unset( $fields['job_timestamp'] );
-               $dbw = wfGetDB( DB_MASTER );
-               $dbw->begin( __METHOD__ );
-               $dbw->delete( 'job', $fields, __METHOD__ );
-               $affected = $dbw->affectedRows();
-               $dbw->commit( __METHOD__ );
-               if ( $affected ) {
-                       wfIncrStats( 'job-dup-delete', $affected );
-               }
+       /**
+        * Insert a single job into the queue.
+        * @return bool true on success
+        * @deprecated 1.20
+        */
+       public function insert() {
+               return JobQueueGroup::singleton()->push( $this );
        }
 
        /**
         * @return string
         */
-       function toString() {
+       public function toString() {
                $paramString = '';
                if ( $this->params ) {
                        foreach ( $this->params as $key => $value ) {
@@ -448,7 +192,7 @@ abstract class Job {
                $this->error = $error;
        }
 
-       function getLastError() {
+       public function getLastError() {
                return $this->error;
        }
 }
diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php
new file mode 100644 (file)
index 0000000..6409cff
--- /dev/null
@@ -0,0 +1,185 @@
+<?php
+/**
+ * Job queue base code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @defgroup JobQueue JobQueue
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle enqueueing and running of background jobs
+ *
+ * @ingroup JobQueue
+ * @since 1.20
+ */
+abstract class JobQueue {
+       protected $wiki; // string; wiki ID
+       protected $type; // string; job type
+
+       const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
+
+       /**
+        * @param $params array
+        */
+       protected function __construct( array $params ) {
+               $this->wiki = $params['wiki'];
+               $this->type = $params['type'];
+       }
+
+       /**
+        * Get a job queue object of the specified type.
+        * $params includes:
+        *     class : what job class to use (determines job type)
+        *     wiki  : wiki ID of the wiki the jobs are for (defaults to current wiki)
+        *     type  : The name of the job types this queue handles
+        *
+        * @param $params array
+        * @return JobQueue
+        * @throws MWException
+        */
+       final public static function factory( array $params ) {
+               $class = $params['class'];
+               if ( !MWInit::classExists( $class ) ) {
+                       throw new MWException( "Invalid job queue class '$class'." );
+               }
+               $obj = new $class( $params );
+               if ( !( $obj instanceof self ) ) {
+                       throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
+               }
+               return $obj;
+       }
+
+       /**
+        * @return string Wiki ID
+        */
+       final public function getWiki() {
+               return $this->wiki;
+       }
+
+       /**
+        * @return string Job type that this queue handles
+        */
+       final public function getType() {
+               return $this->type;
+       }
+
+       /**
+        * @return bool Quickly check if the queue is empty
+        */
+       final public function isEmpty() {
+               wfProfileIn( __METHOD__ );
+               $res = $this->doIsEmpty();
+               wfProfileOut( __METHOD__ );
+               return $res;
+       }
+
+       /**
+        * @see JobQueue::isEmpty()
+        * @return bool
+        */
+       abstract protected function doIsEmpty();
+
+       /**
+        * Push a batch of jobs into the queue
+        *
+        * @param $jobs array List of Jobs
+        * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic)
+        * @return bool
+        */
+       final public function batchPush( array $jobs, $flags = 0 ) {
+               foreach ( $jobs as $job ) {
+                       if ( $job->getType() !== $this->type ) {
+                               throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+                       }
+               }
+               wfProfileIn( __METHOD__ );
+               $ok = $this->doBatchPush( $jobs, $flags );
+               if ( $ok ) {
+                       wfIncrStats( 'job-insert', count( $jobs ) );
+               }
+               wfProfileOut( __METHOD__ );
+               return $ok;
+       }
+
+       /**
+        * @see JobQueue::batchPush()
+        * @return bool
+        */
+       abstract protected function doBatchPush( array $jobs, $flags );
+
+       /**
+        * Pop a job off of the queue
+        *
+        * @return Job|bool Returns false on failure
+        */
+       final public function pop() {
+               wfProfileIn( __METHOD__ );
+               $job = $this->doPop();
+               if ( $job ) {
+                       wfIncrStats( 'job-pop' );
+               }
+               wfProfileOut( __METHOD__ );
+               return $job;
+       }
+
+       /**
+        * @see JobQueue::pop()
+        * @return Job
+        */
+       abstract protected function doPop();
+
+       /**
+        * Acknowledge that a job was completed
+        *
+        * @param $job Job
+        * @return bool
+        */
+       final public function ack( Job $job ) {
+               if ( $job->getType() !== $this->type ) {
+                       throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+               }
+               wfProfileIn( __METHOD__ );
+               $ok = $this->doAck( $job );
+               wfProfileOut( __METHOD__ );
+               return $ok;
+       }
+
+       /**
+        * @see JobQueue::ack()
+        * @return bool
+        */
+       abstract protected function doAck( Job $job );
+
+       /**
+        * Wait for any slaves or backup servers to catch up
+        *
+        * @return void
+        */
+       final public function waitForBackups() {
+               wfProfileIn( __METHOD__ );
+               $this->doWaitForBackups();
+               wfProfileOut( __METHOD__ );
+       }
+
+       /**
+        * @see JobQueue::waitForBackups()
+        * @return void
+        */
+       protected function doWaitForBackups() {}
+}
diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php
new file mode 100644 (file)
index 0000000..3d584ef
--- /dev/null
@@ -0,0 +1,320 @@
+<?php
+/**
+ * Database-backed job queue code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle job queues stored in the DB
+ *
+ * @ingroup JobQueue
+ * @since 1.20
+ */
+class JobQueueDB extends JobQueue {
+       const CACHE_TTL = 30; // integer; seconds
+
+       /**
+        * @see JobQueue::doIsEmpty()
+        * @return bool
+        */
+       protected function doIsEmpty() {
+               global $wgMemc;
+
+               $key = $this->getEmptinessCacheKey();
+
+               $isEmpty = $wgMemc->get( $key );
+               if ( $isEmpty === 'true' ) {
+                       return true;
+               } elseif ( $isEmpty === 'false' ) {
+                       return false;
+               }
+
+               $found = $this->getSlaveDB()->selectField(
+                       'job', '1', array( 'job_cmd' => $this->type ), __METHOD__
+               );
+
+               $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL );
+       }
+
+       /**
+        * @see JobQueue::doBatchPush()
+        * @return bool
+        */
+       protected function doBatchPush( array $jobs, $flags ) {
+               if ( count( $jobs ) ) {
+                       $dbw = $this->getMasterDB();
+
+                       $rows = array();
+                       foreach ( $jobs as $job ) {
+                               $rows[] = $this->insertFields( $job );
+                       }
+                       $atomic = ( $flags & self::QoS_Atomic );
+                       $key    = $this->getEmptinessCacheKey();
+                       $ttl    = self::CACHE_TTL;
+
+                       $dbw->onTransactionIdle( function() use ( $dbw, $rows, $atomic, $key, $ttl ) {
+                               global $wgMemc;
+
+                               $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled?
+                               if ( $atomic ) {
+                                       $dbw->begin(); // wrap all the job additions in one transaction
+                               } else {
+                                       $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+                               }
+                               try {
+                                       foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { // avoid slave lag
+                                               $dbw->insert( 'job', $rowBatch, __METHOD__ );
+                                       }
+                               } catch ( DBError $e ) {
+                                       if ( $atomic ) {
+                                               $dbw->rollback();
+                                       } else {
+                                               $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin()
+                                       }
+                                       throw $e;
+                               }
+                               if ( $atomic ) {
+                                       $dbw->commit();
+                               } else {
+                                       $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin()
+                               }
+
+                               $wgMemc->set( $key, 'false', $ttl );
+                       } );
+               }
+
+               return true;
+       }
+
+       /**
+        * @see JobQueue::doPop()
+        * @return Job|bool
+        */
+       protected function doPop() {
+               global $wgMemc;
+
+               $uuid = wfRandomString( 32 ); // pop attempt
+
+               $dbw = $this->getMasterDB();
+               if ( $dbw->trxLevel() ) {
+                       wfWarn( "Attempted to pop a job in a transaction; committing first." );
+                       $dbw->commit(); // push existing transaction
+               }
+
+               $job = false; // job popped off
+               $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled?
+               $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+               try {
+                       do { // retry when our row is invalid or deleted as a duplicate
+                               $row = false; // row claimed
+                               $rand = mt_rand( 0, 2147483648 ); // encourage concurrent UPDATEs
+                               $gte  = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
+                               // Try to reserve a DB row...
+                               if ( $this->claim( $uuid, $rand, $gte ) || $this->claim( $uuid, $rand, !$gte ) ) {
+                                       // Fetch any row that we just reserved...
+                                       $row = $dbw->selectRow( 'job', '*',
+                                               array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ );
+                                       // Check if another process deleted it as a duplicate
+                                       if ( !$row ) {
+                                               wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." );
+                                               continue; // try again
+                                       }
+                                       // Get the job object from the row...
+                                       $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
+                                       if ( !$title ) {
+                                               $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
+                                               wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." );
+                                               continue; // try again
+                                       }
+                                       $job = Job::factory( $row->job_cmd, $title,
+                                               self::extractBlob( $row->job_params ), $row->job_id );
+                                       // Delete any *other* duplicate jobs in the queue...
+                                       if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) {
+                                               $dbw->delete( 'job',
+                                                       array( 'job_sha1' => $row->job_sha1,
+                                                               "job_id != {$dbw->addQuotes( $row->job_id )}" ),
+                                                       __METHOD__
+                                               );
+                                       }
+                               } else {
+                                       $wgMemc->set( $this->getEmptinessCacheKey(), 'true', self::CACHE_TTL );
+                               }
+                               break; // done
+                       } while( true );
+               } catch ( DBError $e ) {
+                       $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin()
+                       throw $e;
+               }
+               $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin()
+
+               return $job;
+       }
+
+       /**
+        * Reserve a row with a single UPDATE without holding row locks over RTTs...
+        * @param $uuid string 32 char hex string
+        * @param $rand integer Random unsigned integer (31 bits)
+        * @param $gte bool Search for job_random >= $random (otherwise job_random <= $random)
+        * @return integer Number of affected rows
+        */
+       protected function claim( $uuid, $rand, $gte ) {
+               $dbw  = $this->getMasterDB();
+               $dir  = $gte ? 'ASC' : 'DESC';
+               $ineq = $gte ? '>=' : '<=';
+               if ( $dbw->getType() === 'mysql' ) {
+                       // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
+                       // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
+                       // Oracle and Postgre have no such limitation. However, MySQL offers an
+                       // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
+                       // The DB wrapper functions do not support this, so it's done manually.
+                       $dbw->query( "UPDATE {$dbw->tableName( 'job' )}
+                               SET
+                                       job_token = {$dbw->addQuotes( $uuid ) },
+                                       job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}
+                               WHERE (
+                                       job_cmd = {$dbw->addQuotes( $this->type )}
+                                       AND job_token = {$dbw->addQuotes( '' )}
+                                       AND job_random {$ineq} {$dbw->addQuotes( $rand )}
+                               ) ORDER BY job_random {$dir} LIMIT 1",
+                               __METHOD__
+                       );
+               } else {
+                       // Use a subquery to find the job, within an UPDATE to claim it.
+                       // This uses as much of the DB wrapper functions as possible.
+                       $dbw->update( 'job',
+                               array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ),
+                               array( 'job_id = (' .
+                                       $dbw->selectSQLText( 'job', 'job_id',
+                                               array(
+                                                       'job_cmd'   => $this->type,
+                                                       'job_token' => '',
+                                                       "job_random {$ineq} {$dbw->addQuotes( $rand )}" ),
+                                               __METHOD__,
+                                               array( 'ORDER BY' => "job_random {$dir}", 'LIMIT' => 1 ) ) .
+                                       ')'
+                               ),
+                               __METHOD__
+                       );
+               }
+               return $dbw->affectedRows();
+       }
+
+       /**
+        * @see JobQueue::doAck()
+        * @return Job|bool
+        */
+       protected function doAck( Job $job ) {
+               $dbw = $this->getMasterDB();
+               if ( $dbw->trxLevel() ) {
+                       wfWarn( "Attempted to ack a job in a transaction; committing first." );
+                       $dbw->commit(); // push existing transaction
+               }
+
+               $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled?
+               $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+               try {
+                       // Delete a row with a single DELETE without holding row locks over RTTs...
+                       $dbw->delete( 'job', array( 'job_cmd' => $this->type, 'job_id' => $job->getId() ) );
+               } catch ( Exception $e ) {
+                       $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin()
+                       throw $e;
+               }
+               $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin()
+
+               return true;
+       }
+
+       /**
+        * @see JobQueue::doWaitForBackups()
+        * @return void
+        */
+       protected function doWaitForBackups() {
+               wfWaitForSlaves();
+       }
+
+       /**
+        * @return DatabaseBase
+        */
+       protected function getSlaveDB() {
+               return wfGetDB( DB_SLAVE, array(), $this->wiki );
+       }
+
+       /**
+        * @return DatabaseBase
+        */
+       protected function getMasterDB() {
+               return wfGetDB( DB_MASTER, array(), $this->wiki );
+       }
+
+       /**
+        * @param $job Job
+        * @return array
+        */
+       protected function insertFields( Job $job ) {
+               // Rows that describe the nature of the job
+               $descFields = array(
+                       'job_cmd'       => $job->getType(),
+                       'job_namespace' => $job->getTitle()->getNamespace(),
+                       'job_title'     => $job->getTitle()->getDBkey(),
+                       'job_params'    => self::makeBlob( $job->getParams() ),
+               );
+               // Additional job metadata
+               $dbw = $this->getMasterDB();
+               $metaFields = array(
+                       'job_id'        => $dbw->nextSequenceValue( 'job_job_id_seq' ),
+                       'job_timestamp' => $dbw->timestamp(),
+                       'job_sha1'      => wfBaseConvert( sha1( serialize( $descFields ) ), 16, 36, 32 ),
+                       'job_random'    => mt_rand( 0, 2147483647 ) // [0, 2^31 - 1]
+               );
+               return ( $descFields + $metaFields );
+       }
+
+       /**
+        * @return string
+        */
+       private function getEmptinessCacheKey() {
+               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+               return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'isempty' );
+       }
+
+       /**
+        * @param $params
+        * @return string
+        */
+       protected static function makeBlob( $params ) {
+               if ( $params !== false ) {
+                       return serialize( $params );
+               } else {
+                       return '';
+               }
+       }
+
+       /**
+        * @param $blob
+        * @return bool|mixed
+        */
+       protected static function extractBlob( $blob ) {
+               if ( (string)$blob !== '' ) {
+                       return unserialize( $blob );
+               } else {
+                       return false;
+               }
+       }
+}
diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php
new file mode 100644 (file)
index 0000000..7d01a29
--- /dev/null
@@ -0,0 +1,156 @@
+<?php
+/**
+ * Job queue base code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle enqueueing of background jobs
+ *
+ * @ingroup JobQueue
+ * @since 1.20
+ */
+class JobQueueGroup {
+       /** @var Array */
+       protected static $instances = array();
+
+       protected $wiki; // string; wiki ID
+
+       const TYPE_DEFAULT = 1; // integer; job not in $wgJobTypesExcludedFromDefaultQueue
+       const TYPE_ANY     = 2; // integer; any job
+
+       /**
+        * @param $wiki string Wiki ID
+        */
+       protected function __construct( $wiki ) {
+               $this->wiki = $wiki;
+       }
+
+       /**
+        * @param $wiki string Wiki ID
+        * @return JobQueueGroup
+        */
+       public static function singleton( $wiki = false ) {
+               $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
+               if ( !isset( self::$instances[$wiki] ) ) {
+                       self::$instances[$wiki] = new self( $wiki );
+               }
+               return self::$instances[$wiki];
+       }
+
+       /**
+        * @param $type string
+        * @return JobQueue Job queue object for a given queue type
+        */
+       public function get( $type ) {
+               global $wgJobTypeConf;
+
+               $conf = false;
+               if ( isset( $wgJobTypeConf[$type] ) ) {
+                       $conf = $wgJobTypeConf[$type];
+               } else {
+                       $conf = $wgJobTypeConf['default'];
+               }
+
+               return JobQueue::factory( array(
+                       'class' => $conf['class'],
+                       'wiki'  => $this->wiki,
+                       'type'  => $type,
+               ) );
+       }
+
+       /**
+        * Insert jobs into the respective queues of with the belong
+        *
+        * @param $jobs Job|array A single Job or a list of Jobs
+        * @return bool
+        */
+       public function push( $jobs ) {
+               $jobs = (array)$jobs;
+
+               $jobsByType = array(); // (job type => list of jobs)
+               foreach ( $jobs as $job ) {
+                       $jobsByType[$job->getType()][] = $job;
+               }
+
+               $ok = true;
+               foreach ( $jobsByType as $type => $jobs ) {
+                       if ( !$this->get( $type )->batchPush( $jobs ) ) {
+                               $ok = false;
+                       }
+               }
+
+               return $ok;
+       }
+
+       /**
+        * Pop a job off one of the job queues
+        *
+        * @param $type integer JobQueueGroup::TYPE_* constant
+        * @return Job|bool Returns false on failure
+        */
+       public function pop( $type = self::TYPE_DEFAULT ) {
+               $types = ( $type == self::TYPE_DEFAULT )
+                       ? $this->getDefaultQueueTypes()
+                       : $this->getQueueTypes();
+               shuffle( $types ); // avoid starvation
+
+               foreach ( $types as $type ) { // for each queue...
+                       $job = $this->get( $type )->pop();
+                       if ( $job ) {
+                               return $job; // found
+                       }
+               }
+
+               return false; // no jobs found
+       }
+
+       /**
+        * Acknowledge that a job was completed
+        *
+        * @param $job Job
+        * @return bool
+        */
+       public function ack( Job $job ) {
+               return $this->get( $job->getType() )->ack( $job );
+       }
+
+       /**
+        * Get the list of queue types
+        *
+        * @return array List of strings
+        */
+       public function getQueueTypes() {
+               global $wgJobClasses;
+
+               return array_keys( $wgJobClasses );
+       }
+
+       /**
+        * Get the list of default queue types
+        *
+        * @return array List of strings
+        */
+       public function getDefaultQueueTypes() {
+               global $wgJobTypesExcludedFromDefaultQueue;
+
+               return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
+       }
+}
index c4370f4..a29f29f 100644 (file)
@@ -27,9 +27,9 @@
  * @ingroup JobQueue
  */
 class RefreshLinksJob extends Job {
-
        function __construct( $title, $params = '', $id = 0 ) {
                parent::__construct( 'refreshLinks', $title, $params, $id );
+               $this->removeDuplicates = true; // job is expensive
        }
 
        /**
diff --git a/maintenance/archives/patch-job_token.sql b/maintenance/archives/patch-job_token.sql
new file mode 100644 (file)
index 0000000..080fa97
--- /dev/null
@@ -0,0 +1,9 @@
+ALTER TABLE /*_*/job
+    ADD COLUMN job_random integer unsigned NOT NULL default 0,
+    ADD COLUMN job_token varbinary(32) NOT NULL default '',
+    ADD COLUMN job_token_timestamp varbinary(14) NULL default NULL,
+    ADD COLUMN job_sha1 varbinary(32) NOT NULL default '';
+
+CREATE INDEX /*i*/job_sha1 ON /*_*/job (job_sha1);
+CREATE INDEX /*i*/job_cmd_token ON /*_*/job (job_cmd,job_token,job_random);
+
index e909bc0..80a2278 100644 (file)
@@ -52,6 +52,7 @@ class RunJobs extends Maintenance {
 
        public function execute() {
                global $wgTitle;
+
                if ( $this->hasOption( 'procs' ) ) {
                        $procs = intval( $this->getOption( 'procs' ) );
                        if ( $procs < 1 || $procs > 1000 ) {
@@ -70,26 +71,17 @@ class RunJobs extends Maintenance {
                $dbw = wfGetDB( DB_MASTER );
                $n = 0;
 
-               if ( $type === false ) {
-                       $conds = Job::defaultQueueConditions( );
-               } else {
-                       $conds = array( 'job_cmd' => $type );
-               }
-
-               while ( $dbw->selectField( 'job', 'job_id', $conds, 'runJobs.php' ) ) {
-                       $offset = 0;
-                       for ( ; ; ) {
-                               $job = !$type ? Job::pop( $offset ) : Job::pop_type( $type );
-
-                               if ( !$job ) {
-                                       break;
-                               }
-
-                               wfWaitForSlaves();
+               $group = JobQueueGroup::singleton();
+               do {
+                       $job = ( $type === false )
+                               ? $group->pop() // job from any queue
+                               : $group->get( $type )->pop(); // job from a single queue
+                       if ( $job ) { // found a job
+                               // Perform the job (logging success/failure and runtime)...
                                $t = microtime( true );
-                               $offset = $job->id;
                                $this->runJobsLog( $job->toString() . " STARTING" );
                                $status = $job->run();
+                               $group->ack( $job ); // done
                                $t = microtime( true ) - $t;
                                $timeMs = intval( $t * 1000 );
                                if ( !$status ) {
@@ -97,15 +89,17 @@ class RunJobs extends Maintenance {
                                } else {
                                        $this->runJobsLog( $job->toString() . " t=$timeMs good" );
                                }
-
-                               if ( $maxJobs && ++$n > $maxJobs ) {
+                               // Break out if we hit the job count or wall time limits...
+                               if ( $maxJobs && ++$n >= $maxJobs ) {
                                        break 2;
                                }
-                               if ( $maxTime && time() - $startTime > $maxTime ) {
+                               if ( $maxTime && ( time() - $startTime ) > $maxTime ) {
                                        break 2;
                                }
+                               // Don't let any slaves/backups fall behind...
+                               $group->get( $type )->waitForBackups();
                        }
-               }
+               } while ( $job ); // stop when there are no jobs
        }
 
        /**
diff --git a/maintenance/sqlite/archives/patch-job_token.sql b/maintenance/sqlite/archives/patch-job_token.sql
new file mode 100644 (file)
index 0000000..4e4d28f
--- /dev/null
@@ -0,0 +1,8 @@
+ALTER TABLE /*_*/job ADD COLUMN job_random integer unsigned NOT NULL default 0;
+ALTER TABLE /*_*/job ADD COLUMN job_token varbinary(32) NOT NULL default '';
+ALTER TABLE /*_*/job ADD COLUMN job_sha1 varbinary(32) NOT NULL default '';
+ALTER TABLE /*_*/job ADD COLUMN job_token_timestamp varbinary(14) NULL default NULL;
+
+CREATE INDEX /*i*/job_sha1 ON /*_*/job (job_sha1);
+CREATE INDEX /*i*/job_cmd_token ON /*_*/job (job_cmd,job_token,job_random);
+
index a06c21c..197151b 100644 (file)
@@ -1289,9 +1289,23 @@ CREATE TABLE /*_*/job (
 
   -- Any other parameters to the command
   -- Stored as a PHP serialized array, or an empty string if there are no parameters
-  job_params blob NOT NULL
+  job_params blob NOT NULL,
+
+  -- Random, non-unique, number used for concurrent job acquisition
+  job_random integer unsigned NOT NULL default 0,
+
+  -- Field that conveys process locks on rows via process UUIDs
+  job_token varbinary(32) NOT NULL default '',
+
+  -- Timestamp when the job was locked
+  job_token_timestamp varbinary(14) NULL default NULL,
+
+  -- Base 36 SHA1 of the job parameters relevant to detecting duplicates
+  job_sha1 varbinary(32) NOT NULL default ''
 ) /*$wgDBTableOptions*/;
 
+CREATE INDEX /*i*/job_sha1 ON /*_*/job (job_sha1);
+CREATE INDEX /*i*/job_cmd_token ON /*_*/job (job_cmd,job_token,job_random);
 CREATE INDEX /*i*/job_cmd ON /*_*/job (job_cmd, job_namespace, job_title, job_params(128));
 CREATE INDEX /*i*/job_timestamp ON /*_*/job (job_timestamp);