[JobQueue] Made showJobs.php work for non-DB queues.
authorAaron Schulz <aschulz@wikimedia.org>
Fri, 14 Dec 2012 18:47:06 +0000 (10:47 -0800)
committerGerrit Code Review <gerrit@wikimedia.org>
Thu, 3 Jan 2013 20:31:47 +0000 (20:31 +0000)
Change-Id: Idada1e3ca8278898de6e53fdcc5dee4786d5bae8

includes/job/JobQueue.php
includes/job/JobQueueDB.php
maintenance/showJobs.php

index ffd5d95..ae80370 100644 (file)
@@ -96,7 +96,7 @@ abstract class JobQueue {
        }
 
        /**
-        * Quickly check if the queue is empty.
+        * Quickly check if the queue is empty (has no available jobs).
         * Queue classes should use caching if they are any slower without memcached.
         *
         * @return bool
@@ -114,6 +114,44 @@ abstract class JobQueue {
         */
        abstract protected function doIsEmpty();
 
+       /**
+        * Get the number of available jobs in the queue.
+        * Queue classes should use caching if they are any slower without memcached.
+        *
+        * @return integer
+        */
+       final public function getSize() {
+               wfProfileIn( __METHOD__ );
+               $res = $this->doGetSize();
+               wfProfileOut( __METHOD__ );
+               return $res;
+       }
+
+       /**
+        * @see JobQueue::getSize()
+        * @return integer
+        */
+       abstract protected function doGetSize();
+
+       /**
+        * Get the number of acquired jobs (these are temporarily out of the queue).
+        * Queue classes should use caching if they are any slower without memcached.
+        *
+        * @return integer
+        */
+       final public function getAcquiredCount() {
+               wfProfileIn( __METHOD__ );
+               $res = $this->doGetAcquiredCount();
+               wfProfileOut( __METHOD__ );
+               return $res;
+       }
+
+       /**
+        * @see JobQueue::getAcquiredCount()
+        * @return integer
+        */
+       abstract protected function doGetAcquiredCount();
+
        /**
         * Push a batch of jobs into the queue
         *
index 14c1dca..2917d48 100644 (file)
  * @since 1.21
  */
 class JobQueueDB extends JobQueue {
-       const CACHE_TTL      = 300; // integer; seconds to cache queue information
-       const MAX_AGE_PRUNE  = 604800; // integer; seconds a job can live once claimed
-       const MAX_ATTEMPTS   = 3; // integer; number of times to try a job
-       const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
+       const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
+       const CACHE_TTL_LONG  = 300; // integer; seconds to cache info that is kept up to date
+       const MAX_AGE_PRUNE   = 604800; // integer; seconds a job can live once claimed
+       const MAX_ATTEMPTS    = 3; // integer; number of times to try a job
+       const MAX_JOB_RANDOM  = 2147483647; // integer; 2^31 - 1, used for job_random
 
        /**
         * @see JobQueue::doIsEmpty()
@@ -40,7 +41,7 @@ class JobQueueDB extends JobQueue {
        protected function doIsEmpty() {
                global $wgMemc;
 
-               $key = $this->getEmptinessCacheKey();
+               $key = $this->getCacheKey( 'empty' );
 
                $isEmpty = $wgMemc->get( $key );
                if ( $isEmpty === 'true' ) {
@@ -52,11 +53,59 @@ class JobQueueDB extends JobQueue {
                $found = $this->getSlaveDB()->selectField( // unclaimed job
                        'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
                );
-               $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL );
+               $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
 
                return !$found;
        }
 
+       /**
+        * @see JobQueue::doGetSize()
+        * @return integer
+        */
+       protected function doGetSize() {
+               global $wgMemc;
+
+               $key = $this->getCacheKey( 'size' );
+
+               $size = $wgMemc->get( $key );
+               if ( is_int( $size ) ) {
+                       return $size;
+               }
+
+               $dbr = $this->getSlaveDB();
+               $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
+                       array( 'job_cmd' => $this->type, 'job_token' => '' ),
+                       __METHOD__
+               );
+               $wgMemc->set( $key, $size, self::CACHE_TTL_SHORT );
+
+               return $size;
+       }
+
+       /**
+        * @see JobQueue::doGetAcquiredCount()
+        * @return integer
+        */
+       protected function doGetAcquiredCount() {
+               global $wgMemc;
+
+               $key = $this->getCacheKey( 'acquiredcount' );
+
+               $count = $wgMemc->get( $key );
+               if ( is_int( $count ) ) {
+                       return $count;
+               }
+
+               $dbr = $this->getSlaveDB();
+               $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+                       array( 'job_cmd' => $this->type, "job_token !={$dbr->addQuotes('')}" ),
+                       __METHOD__
+               );
+               $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
+
+               return $count;
+       }
+
        /**
         * @see JobQueue::doBatchPush()
         * @param array $jobs
@@ -81,8 +130,8 @@ class JobQueueDB extends JobQueue {
                        }
 
                        $atomic = ( $flags & self::QoS_Atomic );
-                       $key    = $this->getEmptinessCacheKey();
-                       $ttl    = self::CACHE_TTL;
+                       $key    = $this->getCacheKey( 'empty' );
+                       $ttl    = self::CACHE_TTL_LONG;
 
                        $dbw->onTransactionIdle(
                                function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $ttl
@@ -139,7 +188,7 @@ class JobQueueDB extends JobQueue {
        protected function doPop() {
                global $wgMemc;
 
-               if ( $wgMemc->get( $this->getEmptinessCacheKey() ) === 'true' ) {
+               if ( $wgMemc->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
                        return false; // queue is empty
                }
 
@@ -166,7 +215,7 @@ class JobQueueDB extends JobQueue {
                        }
                        // Check if we found a row to reserve...
                        if ( !$row ) {
-                               $wgMemc->set( $this->getEmptinessCacheKey(), 'true', self::CACHE_TTL );
+                               $wgMemc->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
                                break; // nothing to do
                        }
                        wfIncrStats( 'job-pop' );
@@ -495,9 +544,9 @@ class JobQueueDB extends JobQueue {
        /**
         * @return string
         */
-       private function getEmptinessCacheKey() {
+       private function getCacheKey( $property ) {
                list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
-               return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'isempty' );
+               return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
        }
 
        /**
index 1dceb79..8b49517 100644 (file)
@@ -39,21 +39,24 @@ class ShowJobs extends Maintenance {
                $this->mDescription = "Show number of jobs waiting in master database";
                $this->addOption( 'group', 'Show number of jobs per job type' );
        }
+
        public function execute() {
-               $dbw = wfGetDB( DB_MASTER );
+               $group = JobQueueGroup::singleton();
                if ( $this->hasOption( 'group' ) ) {
-                       $res = $dbw->select(
-                               'job',
-                               array( 'job_cmd', 'count(*) as count' ),
-                               array(),
-                               __METHOD__,
-                               array( 'GROUP BY' => 'job_cmd' )
-                       );
-                       foreach ( $res as $row ) {
-                               $this->output( $row->job_cmd . ': ' . $row->count . "\n" );
+                       foreach ( $group->getQueueTypes() as $type ) {
+                               $queue   = $group->get( $type );
+                               $pending = $queue->getSize();
+                               $claimed = $queue->getAcquiredCount();
+                               if ( ( $pending + $claimed ) > 0 ) {
+                                       $this->output( "{$type}: $pending queued; $claimed acquired\n" );
+                               }
                        }
                } else {
-                       $this->output( $dbw->selectField( 'job', 'count(*)', '', __METHOD__ ) . "\n" );
+                       $count = 0;
+                       foreach ( $group->getQueueTypes() as $type ) {
+                               $count += $group->get( $type )->getSize();
+                       }
+                       $this->output( "$count\n" );
                }
        }
 }