Merge "Normalize as and or language names"
[lhc/web/wiklou.git] / includes / job / JobQueueDB.php
index 4b22e94..ae4576c 100644 (file)
@@ -34,6 +34,9 @@ class JobQueueDB extends JobQueue {
        const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
        const MAX_OFFSET = 255; // integer; maximum number of rows to skip
 
+       /** @var BagOStuff */
+       protected $cache;
+
        protected $cluster = false; // string; name of an external DB cluster
 
        /**
@@ -45,8 +48,13 @@ class JobQueueDB extends JobQueue {
         * @param $params array
         */
        protected function __construct( array $params ) {
+               global $wgMemc;
+
                parent::__construct( $params );
+
                $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
+               // Make sure that we don't use the SQL cache, which would be harmful
+               $this->cache = ( $wgMemc instanceof SqlBagOStuff ) ? new EmptyBagOStuff() : $wgMemc;
        }
 
        protected function supportedOrders() {
@@ -62,11 +70,9 @@ class JobQueueDB extends JobQueue {
         * @return bool
         */
        protected function doIsEmpty() {
-               global $wgMemc;
-
                $key = $this->getCacheKey( 'empty' );
 
-               $isEmpty = $wgMemc->get( $key );
+               $isEmpty = $this->cache->get( $key );
                if ( $isEmpty === 'true' ) {
                        return true;
                } elseif ( $isEmpty === 'false' ) {
@@ -77,7 +83,7 @@ class JobQueueDB extends JobQueue {
                $found = $dbr->selectField( // unclaimed job
                        'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
                );
-               $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
+               $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
 
                return !$found;
        }
@@ -87,11 +93,9 @@ class JobQueueDB extends JobQueue {
         * @return integer
         */
        protected function doGetSize() {
-               global $wgMemc;
-
                $key = $this->getCacheKey( 'size' );
 
-               $size = $wgMemc->get( $key );
+               $size = $this->cache->get( $key );
                if ( is_int( $size ) ) {
                        return $size;
                }
@@ -101,7 +105,7 @@ class JobQueueDB extends JobQueue {
                        array( 'job_cmd' => $this->type, 'job_token' => '' ),
                        __METHOD__
                );
-               $wgMemc->set( $key, $size, self::CACHE_TTL_SHORT );
+               $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
 
                return $size;
        }
@@ -111,13 +115,40 @@ class JobQueueDB extends JobQueue {
         * @return integer
         */
        protected function doGetAcquiredCount() {
+               if ( $this->claimTTL <= 0 ) {
+                       return 0; // no acknowledgements
+               }
+
+               $key = $this->getCacheKey( 'acquiredcount' );
+
+               $count = $this->cache->get( $key );
+               if ( is_int( $count ) ) {
+                       return $count;
+               }
+
+               list( $dbr, $scope ) = $this->getSlaveDB();
+               $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+                       array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
+                       __METHOD__
+               );
+               $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+
+               return $count;
+       }
+
+       /**
+        * @see JobQueue::doGetAbandonedCount()
+        * @return integer
+        * @throws MWException
+        */
+       protected function doGetAbandonedCount() {
                global $wgMemc;
 
                if ( $this->claimTTL <= 0 ) {
                        return 0; // no acknowledgements
                }
 
-               $key = $this->getCacheKey( 'acquiredcount' );
+               $key = $this->getCacheKey( 'abandonedcount' );
 
                $count = $wgMemc->get( $key );
                if ( is_int( $count ) ) {
@@ -126,7 +157,11 @@ class JobQueueDB extends JobQueue {
 
                list( $dbr, $scope ) = $this->getSlaveDB();
                $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
-                       array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
+                       array(
+                               'job_cmd' => $this->type,
+                               "job_token != {$dbr->addQuotes( '' )}",
+                               "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
+                       ),
                        __METHOD__
                );
                $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
@@ -158,13 +193,12 @@ class JobQueueDB extends JobQueue {
                        }
 
                        $key = $this->getCacheKey( 'empty' );
-                       $atomic = ( $flags & self::QoS_Atomic );
+                       $atomic = ( $flags & self::QOS_ATOMIC );
+                       $cache = $this->cache;
 
                        $dbw->onTransactionIdle(
-                               function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $scope
+                               function() use ( $dbw, $cache, $rowSet, $rowList, $atomic, $key, $scope
                        ) {
-                               global $wgMemc;
-
                                if ( $atomic ) {
                                        $dbw->begin( __METHOD__ ); // wrap all the job additions in one transaction
                                }
@@ -203,7 +237,7 @@ class JobQueueDB extends JobQueue {
                                        $dbw->commit( __METHOD__ );
                                }
 
-                               $wgMemc->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
+                               $cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
                        } );
                }
 
@@ -215,9 +249,7 @@ class JobQueueDB extends JobQueue {
         * @return Job|bool
         */
        protected function doPop() {
-               global $wgMemc;
-
-               if ( $wgMemc->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
+               if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
                        return false; // queue is empty
                }
 
@@ -237,7 +269,7 @@ class JobQueueDB extends JobQueue {
                        }
                        // Check if we found a row to reserve...
                        if ( !$row ) {
-                               $wgMemc->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
+                               $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
                                break; // nothing to do
                        }
                        wfIncrStats( 'job-pop' );
@@ -266,11 +298,9 @@ class JobQueueDB extends JobQueue {
         * @return Row|false
         */
        protected function claimRandom( $uuid, $rand, $gte ) {
-               global $wgMemc;
-
                list( $dbw, $scope ) = $this->getMasterDB();
                // Check cache to see if the queue has <= OFFSET items
-               $tinyQueue = $wgMemc->get( $this->getCacheKey( 'small' ) );
+               $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
 
                $row = false; // the row acquired
                $invertedDirection = false; // whether one job_random direction was already scanned
@@ -311,7 +341,7 @@ class JobQueueDB extends JobQueue {
                                );
                                if ( !$row ) {
                                        $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
-                                       $wgMemc->set( $this->getCacheKey( 'small' ), 1, 30 );
+                                       $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
                                        continue; // use job_random
                                }
                        }
@@ -404,8 +434,6 @@ class JobQueueDB extends JobQueue {
         * @return integer Number of jobs recycled/deleted
         */
        public function recycleAndDeleteStaleJobs() {
-               global $wgMemc;
-
                $now = time();
                list( $dbw, $scope ) = $this->getMasterDB();
                $count = 0; // affected rows
@@ -443,7 +471,7 @@ class JobQueueDB extends JobQueue {
                                );
                                $count += $dbw->affectedRows();
                                wfIncrStats( 'job-recycle', $dbw->affectedRows() );
-                               $wgMemc->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
+                               $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
                        }
                }
 
@@ -512,16 +540,15 @@ class JobQueueDB extends JobQueue {
                // maintained. Having only the de-duplication registration succeed would cause
                // jobs to become no-ops without any actual jobs that made them redundant.
                list( $dbw, $scope ) = $this->getMasterDB();
-               $dbw->onTransactionIdle( function() use ( $params, $key, $scope ) {
-                       global $wgMemc;
-
-                       $timestamp = $wgMemc->get( $key ); // current last timestamp of this job
+               $cache = $this->cache;
+               $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $scope ) {
+                       $timestamp = $cache->get( $key ); // current last timestamp of this job
                        if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
                                return true; // a newer version of this root job was enqueued
                        }
 
                        // Update the timestamp of the last root job started at the location...
-                       return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+                       return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
                } );
 
                return true;
@@ -551,10 +578,8 @@ class JobQueueDB extends JobQueue {
         * @return void
         */
        protected function doFlushCaches() {
-               global $wgMemc;
-
                foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
-                       $wgMemc->delete( $this->getCacheKey( $type ) );
+                       $this->cache->delete( $this->getCacheKey( $type ) );
                }
        }