Merge "[JobQueue] Added support for approximate FIFO job queues."
authorBrion VIBBER <brion@wikimedia.org>
Thu, 25 Oct 2012 21:07:07 +0000 (21:07 +0000)
committerGerrit Code Review <gerrit@wikimedia.org>
Thu, 25 Oct 2012 21:07:07 +0000 (21:07 +0000)
includes/DefaultSettings.php
includes/job/JobQueue.php
includes/job/JobQueueDB.php
includes/job/JobQueueGroup.php
maintenance/tables.sql

index c485b16..dd9d73b 100644 (file)
@@ -5440,7 +5440,7 @@ $wgJobTypesExcludedFromDefaultQueue = array();
  * These settings should be global to all wikis.
  */
 $wgJobTypeConf = array(
-       'default' => array( 'class' => 'JobQueueDB' ),
+       'default' => array( 'class' => 'JobQueueDB', 'order' => 'random' ),
 );
 
 /**
index 6409cff..4637bd2 100644 (file)
@@ -31,6 +31,7 @@
 abstract class JobQueue {
        protected $wiki; // string; wiki ID
        protected $type; // string; job type
+       protected $order; // string; job priority for pop()
 
        const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
 
@@ -38,8 +39,9 @@ abstract class JobQueue {
         * @param $params array
         */
        protected function __construct( array $params ) {
-               $this->wiki = $params['wiki'];
-               $this->type = $params['type'];
+               $this->wiki  = $params['wiki'];
+               $this->type  = $params['type'];
+               $this->order = isset( $params['order'] ) ? $params['order'] : 'random';
        }
 
        /**
@@ -48,6 +50,12 @@ abstract class JobQueue {
         *     class : what job class to use (determines job type)
         *     wiki  : wiki ID of the wiki the jobs are for (defaults to current wiki)
         *     type  : The name of the job types this queue handles
+        *     order : Order that pop() selects jobs, either "timestamp" or "random".
+        *             If "timestamp" is used, the queue will effectively be FIFO. Note that
+        *             pop() will not be exactly FIFO, and even if it was, job completion would
+        *             not appear to be exactly FIFO since jobs can take different times to finish.
+        *             If "random" is used, pop() will pick jobs in random order. This might be
+        *             useful for improving concurrency depending on the queue storage medium.
         *
         * @param $params array
         * @return JobQueue
index bea4a6f..f6003b2 100644 (file)
@@ -121,38 +121,44 @@ class JobQueueDB extends JobQueue {
                $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
                try {
                        do { // retry when our row is invalid or deleted as a duplicate
-                               $row = false; // row claimed
-                               $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
-                               $gte  = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
-                               // Try to reserve a DB row...
-                               if ( $this->claim( $uuid, $rand, $gte ) || $this->claim( $uuid, $rand, !$gte ) ) {
-                                       // Fetch any row that we just reserved...
-                                       $row = $dbw->selectRow( 'job', '*',
-                                               array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ );
-                                       // Check if another process deleted it as a duplicate
-                                       if ( !$row ) {
-                                               wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." );
-                                               continue; // try again
-                                       }
-                                       // Get the job object from the row...
-                                       $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
-                                       if ( !$title ) {
-                                               $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
-                                               wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." );
-                                               continue; // try again
-                                       }
-                                       $job = Job::factory( $row->job_cmd, $title,
-                                               self::extractBlob( $row->job_params ), $row->job_id );
-                                       // Delete any *other* duplicate jobs in the queue...
-                                       if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) {
-                                               $dbw->delete( 'job',
-                                                       array( 'job_sha1' => $row->job_sha1,
-                                                               "job_id != {$dbw->addQuotes( $row->job_id )}" ),
-                                                       __METHOD__
-                                               );
-                                       }
-                               } else {
+                               // Try to reserve a row in the DB...
+                               if ( $this->order === 'timestamp' ) { // oldest first
+                                       $found = $this->claim( $uuid, 0, true );
+                               } else { // random first
+                                       $rand  = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
+                                       $gte   = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
+                                       $found = $this->claim( $uuid, $rand, $gte )
+                                               || $this->claim( $uuid, $rand, !$gte ); // try both directions
+                               }
+                               // Check if we found a row to reserve...
+                               if ( !$found ) {
                                        $wgMemc->set( $this->getEmptinessCacheKey(), 'true', self::CACHE_TTL );
+                                       break; // nothing to do
+                               }
+                               // Fetch any row that we just reserved...
+                               $row = $dbw->selectRow( 'job', '*',
+                                       array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ );
+                               // Check if another process deleted it as a duplicate
+                               if ( !$row ) {
+                                       wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." );
+                                       continue; // try again
+                               }
+                               // Get the job object from the row...
+                               $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
+                               if ( !$title ) {
+                                       $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
+                                       wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." );
+                                       continue; // try again
+                               }
+                               $job = Job::factory( $row->job_cmd, $title,
+                                       self::extractBlob( $row->job_params ), $row->job_id );
+                               // Delete any *other* duplicate jobs in the queue...
+                               if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) {
+                                       $dbw->delete( 'job',
+                                               array( 'job_sha1' => $row->job_sha1,
+                                                       "job_id != {$dbw->addQuotes( $row->job_id )}" ),
+                                               __METHOD__
+                                       );
                                }
                                break; // done
                        } while( true );
@@ -274,6 +280,11 @@ class JobQueueDB extends JobQueue {
                        'job_params'    => self::makeBlob( $job->getParams() ),
                );
                // Additional job metadata
+               if ( $this->order === 'timestamp' ) { // oldest first
+                       $random = time() - 1325376000; // seconds since "January 1, 2012"
+               } else { // random first
+                       $random = mt_rand( 0, self::MAX_JOB_RANDOM );
+               }
                $dbw = $this->getMasterDB();
                $metaFields = array(
                        'job_id'        => $dbw->nextSequenceValue( 'job_job_id_seq' ),
index 69bcf01..4ebd531 100644 (file)
@@ -62,18 +62,14 @@ class JobQueueGroup {
        public function get( $type ) {
                global $wgJobTypeConf;
 
-               $conf = false;
+               $conf = array( 'wiki' => $this->wiki, 'type' => $type );
                if ( isset( $wgJobTypeConf[$type] ) ) {
-                       $conf = $wgJobTypeConf[$type];
+                       $conf = $conf + $wgJobTypeConf[$type];
                } else {
-                       $conf = $wgJobTypeConf['default'];
+                       $conf = $conf + $wgJobTypeConf['default'];
                }
 
-               return JobQueue::factory( array(
-                       'class' => $conf['class'],
-                       'wiki'  => $this->wiki,
-                       'type'  => $type,
-               ) );
+               return JobQueue::factory( $conf );
        }
 
        /**
index 197151b..a4cdefd 100644 (file)
@@ -1291,7 +1291,8 @@ CREATE TABLE /*_*/job (
   -- Stored as a PHP serialized array, or an empty string if there are no parameters
   job_params blob NOT NULL,
 
-  -- Random, non-unique, number used for concurrent job acquisition
+  -- Random, non-unique, number used for job acquisition
+  -- Either a simple timestamp or a totally random number (for lock concurrency)
   job_random integer unsigned NOT NULL default 0,
 
   -- Field that conveys process locks on rows via process UUIDs