Merge "Normalize as and or language names"
[lhc/web/wiklou.git] / includes / job / JobQueueDB.php
index fee1aaf..ae4576c 100644 (file)
  * @since 1.21
  */
 class JobQueueDB extends JobQueue {
-       const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
        const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
        const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
        const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
        const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
        const MAX_OFFSET = 255; // integer; maximum number of rows to skip
 
+       /** @var BagOStuff */
+       protected $cache;
+
        protected $cluster = false; // string; name of an external DB cluster
 
        /**
@@ -46,8 +48,21 @@ class JobQueueDB extends JobQueue {
         * @param $params array
         */
        protected function __construct( array $params ) {
+               global $wgMemc;
+
                parent::__construct( $params );
+
                $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
+               // Make sure that we don't use the SQL cache, which would be harmful
+               $this->cache = ( $wgMemc instanceof SqlBagOStuff ) ? new EmptyBagOStuff() : $wgMemc;
+       }
+
+       protected function supportedOrders() {
+               return array( 'random', 'timestamp', 'fifo' );
+       }
+
+       protected function optimalOrder() {
+               return 'random';
        }
 
        /**
@@ -55,11 +70,9 @@ class JobQueueDB extends JobQueue {
         * @return bool
         */
        protected function doIsEmpty() {
-               global $wgMemc;
-
                $key = $this->getCacheKey( 'empty' );
 
-               $isEmpty = $wgMemc->get( $key );
+               $isEmpty = $this->cache->get( $key );
                if ( $isEmpty === 'true' ) {
                        return true;
                } elseif ( $isEmpty === 'false' ) {
@@ -70,7 +83,7 @@ class JobQueueDB extends JobQueue {
                $found = $dbr->selectField( // unclaimed job
                        'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
                );
-               $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
+               $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
 
                return !$found;
        }
@@ -80,11 +93,9 @@ class JobQueueDB extends JobQueue {
         * @return integer
         */
        protected function doGetSize() {
-               global $wgMemc;
-
                $key = $this->getCacheKey( 'size' );
 
-               $size = $wgMemc->get( $key );
+               $size = $this->cache->get( $key );
                if ( is_int( $size ) ) {
                        return $size;
                }
@@ -94,7 +105,7 @@ class JobQueueDB extends JobQueue {
                        array( 'job_cmd' => $this->type, 'job_token' => '' ),
                        __METHOD__
                );
-               $wgMemc->set( $key, $size, self::CACHE_TTL_SHORT );
+               $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
 
                return $size;
        }
@@ -104,13 +115,40 @@ class JobQueueDB extends JobQueue {
         * @return integer
         */
        protected function doGetAcquiredCount() {
+               if ( $this->claimTTL <= 0 ) {
+                       return 0; // no acknowledgements
+               }
+
+               $key = $this->getCacheKey( 'acquiredcount' );
+
+               $count = $this->cache->get( $key );
+               if ( is_int( $count ) ) {
+                       return $count;
+               }
+
+               list( $dbr, $scope ) = $this->getSlaveDB();
+               $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+                       array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
+                       __METHOD__
+               );
+               $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+
+               return $count;
+       }
+
+       /**
+        * @see JobQueue::doGetAbandonedCount()
+        * @return integer
+        * @throws MWException
+        */
+       protected function doGetAbandonedCount() {
                global $wgMemc;
 
                if ( $this->claimTTL <= 0 ) {
                        return 0; // no acknowledgements
                }
 
-               $key = $this->getCacheKey( 'acquiredcount' );
+               $key = $this->getCacheKey( 'abandonedcount' );
 
                $count = $wgMemc->get( $key );
                if ( is_int( $count ) ) {
@@ -119,7 +157,11 @@ class JobQueueDB extends JobQueue {
 
                list( $dbr, $scope ) = $this->getSlaveDB();
                $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
-                       array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
+                       array(
+                               'job_cmd' => $this->type,
+                               "job_token != {$dbr->addQuotes( '' )}",
+                               "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
+                       ),
                        __METHOD__
                );
                $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
@@ -151,13 +193,12 @@ class JobQueueDB extends JobQueue {
                        }
 
                        $key = $this->getCacheKey( 'empty' );
-                       $atomic = ( $flags & self::QoS_Atomic );
+                       $atomic = ( $flags & self::QOS_ATOMIC );
+                       $cache = $this->cache;
 
                        $dbw->onTransactionIdle(
-                               function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $scope
+                               function() use ( $dbw, $cache, $rowSet, $rowList, $atomic, $key, $scope
                        ) {
-                               global $wgMemc;
-
                                if ( $atomic ) {
                                        $dbw->begin( __METHOD__ ); // wrap all the job additions in one transaction
                                }
@@ -196,7 +237,7 @@ class JobQueueDB extends JobQueue {
                                        $dbw->commit( __METHOD__ );
                                }
 
-                               $wgMemc->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
+                               $cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
                        } );
                }
 
@@ -208,9 +249,7 @@ class JobQueueDB extends JobQueue {
         * @return Job|bool
         */
        protected function doPop() {
-               global $wgMemc;
-
-               if ( $wgMemc->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
+               if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
                        return false; // queue is empty
                }
 
@@ -230,7 +269,7 @@ class JobQueueDB extends JobQueue {
                        }
                        // Check if we found a row to reserve...
                        if ( !$row ) {
-                               $wgMemc->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
+                               $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
                                break; // nothing to do
                        }
                        wfIncrStats( 'job-pop' );
@@ -244,11 +283,6 @@ class JobQueueDB extends JobQueue {
                        $job = Job::factory( $row->job_cmd, $title,
                                self::extractBlob( $row->job_params ), $row->job_id );
                        $job->id = $row->job_id; // XXX: work around broken subclasses
-                       // Flag this job as an old duplicate based on its "root" job...
-                       if ( $this->isRootJobOldDuplicate( $job ) ) {
-                               wfIncrStats( 'job-pop-duplicate' );
-                               $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
-                       }
                        break; // done
                } while( true );
 
@@ -258,17 +292,15 @@ class JobQueueDB extends JobQueue {
        /**
         * Reserve a row with a single UPDATE without holding row locks over RTTs...
         *
-        * @param $uuid string 32 char hex string
+        * @param string $uuid 32 char hex string
         * @param $rand integer Random unsigned integer (31 bits)
-        * @param $gte bool Search for job_random >= $random (otherwise job_random <= $random)
+        * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
         * @return Row|false
         */
        protected function claimRandom( $uuid, $rand, $gte ) {
-               global $wgMemc;
-
                list( $dbw, $scope ) = $this->getMasterDB();
                // Check cache to see if the queue has <= OFFSET items
-               $tinyQueue = $wgMemc->get( $this->getCacheKey( 'small' ) );
+               $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
 
                $row = false; // the row acquired
                $invertedDirection = false; // whether one job_random direction was already scanned
@@ -309,7 +341,7 @@ class JobQueueDB extends JobQueue {
                                );
                                if ( !$row ) {
                                        $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
-                                       $wgMemc->set( $this->getCacheKey( 'small' ), 1, 30 );
+                                       $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
                                        continue; // use job_random
                                }
                        }
@@ -338,7 +370,7 @@ class JobQueueDB extends JobQueue {
        /**
         * Reserve a row with a single UPDATE without holding row locks over RTTs...
         *
-        * @param $uuid string 32 char hex string
+        * @param string $uuid 32 char hex string
         * @return Row|false
         */
        protected function claimOldest( $uuid ) {
@@ -402,8 +434,6 @@ class JobQueueDB extends JobQueue {
         * @return integer Number of jobs recycled/deleted
         */
        public function recycleAndDeleteStaleJobs() {
-               global $wgMemc;
-
                $now = time();
                list( $dbw, $scope ) = $this->getMasterDB();
                $count = 0; // affected rows
@@ -441,7 +471,7 @@ class JobQueueDB extends JobQueue {
                                );
                                $count += $dbw->affectedRows();
                                wfIncrStats( 'job-recycle', $dbw->affectedRows() );
-                               $wgMemc->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
+                               $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
                        }
                }
 
@@ -510,45 +540,20 @@ class JobQueueDB extends JobQueue {
                // maintained. Having only the de-duplication registration succeed would cause
                // jobs to become no-ops without any actual jobs that made them redundant.
                list( $dbw, $scope ) = $this->getMasterDB();
-               $dbw->onTransactionIdle( function() use ( $params, $key, $scope ) {
-                       global $wgMemc;
-
-                       $timestamp = $wgMemc->get( $key ); // current last timestamp of this job
+               $cache = $this->cache;
+               $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $scope ) {
+                       $timestamp = $cache->get( $key ); // current last timestamp of this job
                        if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
                                return true; // a newer version of this root job was enqueued
                        }
 
                        // Update the timestamp of the last root job started at the location...
-                       return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+                       return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
                } );
 
                return true;
        }
 
-       /**
-        * Check if the "root" job of a given job has been superseded by a newer one
-        *
-        * @param $job Job
-        * @return bool
-        */
-       protected function isRootJobOldDuplicate( Job $job ) {
-               global $wgMemc;
-
-               $params = $job->getParams();
-               if ( !isset( $params['rootJobSignature'] ) ) {
-                       return false; // job has no de-deplication info
-               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
-                       trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." );
-                       return false;
-               }
-
-               // Get the last time this root job was enqueued
-               $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
-
-               // Check if a new root job was started at the location after this one's...
-               return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
-       }
-
        /**
         * @see JobQueue::doWaitForBackups()
         * @return void
@@ -573,13 +578,32 @@ class JobQueueDB extends JobQueue {
         * @return void
         */
        protected function doFlushCaches() {
-               global $wgMemc;
-
                foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
-                       $wgMemc->delete( $this->getCacheKey( $type ) );
+                       $this->cache->delete( $this->getCacheKey( $type ) );
                }
        }
 
+       /**
+        * @see JobQueue::getAllQueuedJobs()
+        * @return Iterator
+        */
+       public function getAllQueuedJobs() {
+               list( $dbr, $scope ) = $this->getSlaveDB();
+               return new MappedIterator(
+                       $dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
+                       function( $row ) use ( $scope ) {
+                               $job = Job::factory(
+                                       $row->job_cmd,
+                                       Title::makeTitle( $row->job_namespace, $row->job_title ),
+                                       strlen( $row->job_params ) ? unserialize( $row->job_params ) : false,
+                                       $row->job_id
+                               );
+                               $job->id = $row->job_id; // XXX: work around broken subclasses
+                               return $job;
+                       }
+               );
+       }
+
        /**
         * @return Array (DatabaseBase, ScopedCallback)
         */
@@ -642,15 +666,6 @@ class JobQueueDB extends JobQueue {
                return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
        }
 
-       /**
-        * @param string $signature Hash identifier of the root job
-        * @return string
-        */
-       private function getRootJobCacheKey( $signature ) {
-               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
-               return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
-       }
-
        /**
         * @param $params
         * @return string