[JobQueue] Added support for approximate FIFO job queues.
[lhc/web/wiklou.git] / includes / job / JobQueueDB.php
index bea4a6f..f6003b2 100644 (file)
@@ -121,38 +121,44 @@ class JobQueueDB extends JobQueue {
                $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
                try {
                        do { // retry when our row is invalid or deleted as a duplicate
-                               $row = false; // row claimed
-                               $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
-                               $gte  = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
-                               // Try to reserve a DB row...
-                               if ( $this->claim( $uuid, $rand, $gte ) || $this->claim( $uuid, $rand, !$gte ) ) {
-                                       // Fetch any row that we just reserved...
-                                       $row = $dbw->selectRow( 'job', '*',
-                                               array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ );
-                                       // Check if another process deleted it as a duplicate
-                                       if ( !$row ) {
-                                               wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." );
-                                               continue; // try again
-                                       }
-                                       // Get the job object from the row...
-                                       $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
-                                       if ( !$title ) {
-                                               $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
-                                               wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." );
-                                               continue; // try again
-                                       }
-                                       $job = Job::factory( $row->job_cmd, $title,
-                                               self::extractBlob( $row->job_params ), $row->job_id );
-                                       // Delete any *other* duplicate jobs in the queue...
-                                       if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) {
-                                               $dbw->delete( 'job',
-                                                       array( 'job_sha1' => $row->job_sha1,
-                                                               "job_id != {$dbw->addQuotes( $row->job_id )}" ),
-                                                       __METHOD__
-                                               );
-                                       }
-                               } else {
+                               // Try to reserve a row in the DB...
+                               if ( $this->order === 'timestamp' ) { // oldest first
+                                       $found = $this->claim( $uuid, 0, true );
+                               } else { // random first
+                                       $rand  = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
+                                       $gte   = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
+                                       $found = $this->claim( $uuid, $rand, $gte )
+                                               || $this->claim( $uuid, $rand, !$gte ); // try both directions
+                               }
+                               // Check if we found a row to reserve...
+                               if ( !$found ) {
                                        $wgMemc->set( $this->getEmptinessCacheKey(), 'true', self::CACHE_TTL );
+                                       break; // nothing to do
+                               }
+                               // Fetch any row that we just reserved...
+                               $row = $dbw->selectRow( 'job', '*',
+                                       array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ );
+                               // Check if another process deleted it as a duplicate
+                               if ( !$row ) {
+                                       wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." );
+                                       continue; // try again
+                               }
+                               // Get the job object from the row...
+                               $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
+                               if ( !$title ) {
+                                       $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
+                                       wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." );
+                                       continue; // try again
+                               }
+                               $job = Job::factory( $row->job_cmd, $title,
+                                       self::extractBlob( $row->job_params ), $row->job_id );
+                               // Delete any *other* duplicate jobs in the queue...
+                               if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) {
+                                       $dbw->delete( 'job',
+                                               array( 'job_sha1' => $row->job_sha1,
+                                                       "job_id != {$dbw->addQuotes( $row->job_id )}" ),
+                                               __METHOD__
+                                       );
                                }
                                break; // done
                        } while( true );
@@ -274,6 +280,11 @@ class JobQueueDB extends JobQueue {
                        'job_params'    => self::makeBlob( $job->getParams() ),
                );
                // Additional job metadata
+               if ( $this->order === 'timestamp' ) { // oldest first
+                       $random = time() - 1325376000; // seconds since "January 1, 2012"
+               } else { // random first
+                       $random = mt_rand( 0, self::MAX_JOB_RANDOM );
+               }
                $dbw = $this->getMasterDB();
                $metaFields = array(
                        'job_id'        => $dbw->nextSequenceValue( 'job_job_id_seq' ),